blob: 160a3d6316884be6f988e2dee27dd3ee6bfa13ee [file] [log] [blame]
// Copyright 2008 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.codereview.manager.unpack;
import com.google.codereview.internal.CompletePatchset.CompletePatchsetRequest;
import com.google.codereview.internal.CompletePatchset.CompletePatchsetResponse;
import com.google.codereview.internal.UploadPatchsetFile.UploadPatchsetFileRequest;
import com.google.codereview.internal.UploadPatchsetFile.UploadPatchsetFileResponse;
import com.google.codereview.manager.Backend;
import com.google.codereview.manager.StopProcessingException;
import com.google.codereview.rpc.SimpleController;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcCallback;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.spearce.jgit.lib.Constants;
import org.spearce.jgit.lib.ObjectId;
import org.spearce.jgit.lib.ObjectLoader;
import org.spearce.jgit.lib.Repository;
import org.spearce.jgit.revwalk.RevCommit;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.security.MessageDigest;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
class PatchSetUploader implements Runnable {
private static final Log LOG = LogFactory.getLog(PatchSetUploader.class);
private static final int MAX_DATA_SIZE = 1022 * 1024; // bytes
private static final String EMPTY_BLOB_ID;
private static final ByteString EMPTY_DEFLATE;
static {
final MessageDigest md = Constants.newMessageDigest();
md.update(Constants.encodeASCII("blob 0\0"));
EMPTY_BLOB_ID = ObjectId.fromRaw(md.digest()).name();
EMPTY_DEFLATE = deflate(new byte[0]);
}
private final Backend server;
private final Repository db;
private final RevCommit commit;
private final String commitName;
private final String patchsetKey;
private ByteString.Output compressedFilenames;
private Writer filenameOut;
PatchSetUploader(final Backend be, final Repository sourceRepo,
final RevCommit sourceCommit, final String destPatchsetKey) {
server = be;
db = sourceRepo;
commit = sourceCommit;
commitName = commit.getId().name();
patchsetKey = destPatchsetKey;
}
private String logkey() {
return db.getDirectory().getAbsolutePath() + " " + commitName;
}
public void run() {
LOG.debug(logkey() + " begin");
try {
runImpl();
} catch (RuntimeException e) {
LOG.fatal(logkey() + " failure", e);
} catch (Error e) {
LOG.fatal(logkey() + " failure", e);
}
}
private void runImpl() {
try {
compressedFilenames = ByteString.newOutput();
filenameOut =
new OutputStreamWriter(new DeflaterOutputStream(compressedFilenames,
new Deflater(Deflater.DEFAULT_COMPRESSION)), "UTF-8");
} catch (IOException e) {
LOG.error(logkey() + " cannot initialize filename compression", e);
return;
}
try {
final DiffReader dr = new DiffReader(db, commit);
try {
boolean first = true;
FileDiff file;
while ((file = dr.next()) != null) {
storeOneDiff(file);
if (first) {
first = false;
} else {
filenameOut.write('\0');
}
filenameOut.write(file.getFilename());
}
} finally {
dr.close();
}
filenameOut.close();
} catch (StopProcessingException halt) {
return;
} catch (IOException err) {
LOG.error(logkey() + " diff failed", err);
return;
}
final CompletePatchsetRequest.Builder req;
req = CompletePatchsetRequest.newBuilder();
req.setPatchsetKey(patchsetKey);
req.setCompressedFilenames(compressedFilenames.toByteString());
final SimpleController ctrl = new SimpleController();
server.getChangeService().completePatchset(ctrl, req.build(),
new RpcCallback<CompletePatchsetResponse>() {
public void run(final CompletePatchsetResponse rsp) {
LOG.debug(logkey() + " complete");
}
});
if (ctrl.failed()) {
final String why = ctrl.errorText();
LOG.error(logkey() + " completing failed: " + why);
}
}
private void storeOneDiff(final FileDiff diff) throws StopProcessingException {
final UploadPatchsetFileRequest req = toFileRequest(diff);
final SimpleController ctrl = new SimpleController();
server.getChangeService().uploadPatchsetFile(ctrl, req,
new RpcCallback<UploadPatchsetFileResponse>() {
public void run(final UploadPatchsetFileResponse rsp) {
final UploadPatchsetFileResponse.CodeType sc = rsp.getStatusCode();
final String fn = req.getFileName();
final String pk = req.getPatchsetKey();
if (sc == UploadPatchsetFileResponse.CodeType.CREATED) {
LOG.debug(logkey() + " uploaded " + fn);
} else if (sc == UploadPatchsetFileResponse.CodeType.CLOSED) {
ctrl.setFailed("patchset closed " + pk);
} else if (sc == UploadPatchsetFileResponse.CodeType.UNKNOWN_PATCHSET) {
ctrl.setFailed("patchset unknown " + pk);
} else if (sc == UploadPatchsetFileResponse.CodeType.PATCHING_ERROR) {
ctrl.setFailed("server cannot apply patch");
} else {
ctrl.setFailed("Unknown status " + sc.name() + " " + pk);
}
}
});
if (ctrl.failed()) {
final String fn = req.getFileName();
final String why = ctrl.errorText();
LOG.error(logkey() + " uploading " + fn + " failed: " + why);
throw new StopProcessingException(why);
}
}
private UploadPatchsetFileRequest toFileRequest(final FileDiff diff) {
final UploadPatchsetFileRequest.Builder req;
req = UploadPatchsetFileRequest.newBuilder();
req.setPatchsetKey(patchsetKey);
req.setFileName(diff.getFilename());
req.setStatus(diff.getStatus());
ByteString patchz = deflate(diff.getPatch());
if (!diff.isBinary() && !diff.isTruncated()) {
final ObjectId baseId = diff.getBaseId();
if (baseId == null || ObjectId.equals(baseId, ObjectId.zeroId())) {
req.setBaseId(EMPTY_BLOB_ID);
req.setBaseZ(EMPTY_DEFLATE);
} else {
try {
final ObjectLoader ldr = db.openBlob(baseId);
if (ldr == null) {
LOG.fatal(logkey() + " missing " + baseId.name());
throw new StopProcessingException("No " + baseId.name());
}
final ByteString basez = deflate(ldr.getCachedBytes());
if (basez.size() + patchz.size() > MAX_DATA_SIZE) {
diff.truncatePatch();
patchz = deflate(diff.getPatch());
} else {
req.setBaseId(baseId.name());
req.setBaseZ(basez);
}
} catch (IOException err) {
LOG.fatal(logkey() + " cannot read base " + baseId.name(), err);
throw new StopProcessingException("No " + baseId.name());
}
}
}
if (!diff.isBinary() && !diff.isTruncated()) {
final ObjectId finalId = diff.getFinalId();
if (finalId == null || ObjectId.equals(finalId, ObjectId.zeroId())) {
req.setFinalId(EMPTY_BLOB_ID);
} else {
req.setFinalId(finalId.name());
}
}
req.setPatchZ(patchz);
req.setPatchId(hashOfInflated(patchz));
return req.build();
}
private static ByteString deflate(final byte[] buf) {
final ByteString.Output r = ByteString.newOutput();
final DeflaterOutputStream out = new DeflaterOutputStream(r);
try {
out.write(buf);
out.close();
} catch (IOException err) {
// This should not happen.
throw new StopProcessingException("Unexpected IO error", err);
}
return r.toByteString();
}
private static String hashOfInflated(final ByteString in) {
final MessageDigest md = Constants.newMessageDigest();
final byte[] tmp = new byte[512];
final InflaterInputStream iis = new InflaterInputStream(in.newInput());
int cnt;
try {
while ((cnt = iis.read(tmp)) > 0) {
md.update(tmp, 0, cnt);
}
} catch (IOException err) {
// This should not happen.
throw new StopProcessingException("Unexpected IO error", err);
}
return ObjectId.fromRaw(md.digest()).name();
}
}