blob: fdc143621c398b8be23c987ee803a09fbe736f47 [file] [log] [blame]
/*
* Copyright (C) 2011, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
* under the terms of the Eclipse Distribution License v1.0 which
* accompanies this distribution, is reproduced below, and is
* available at http://www.eclipse.org/org/documents/edl-v10.php
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* - Neither the name of the Eclipse Foundation, Inc. nor the
* names of its contributors may be used to endorse or promote
* products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package org.eclipse.jgit.storage.dht;
import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
import static org.eclipse.jgit.lib.Constants.OBJ_COMMIT;
import static org.eclipse.jgit.lib.Constants.OBJ_OFS_DELTA;
import static org.eclipse.jgit.lib.Constants.OBJ_REF_DELTA;
import static org.eclipse.jgit.lib.Constants.OBJ_TAG;
import static org.eclipse.jgit.lib.Constants.OBJ_TREE;
import static org.eclipse.jgit.storage.dht.ChunkInfo.OBJ_MIXED;
import java.io.IOException;
import java.io.InputStream;
import java.security.MessageDigest;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.eclipse.jgit.generated.storage.dht.proto.GitStore;
import org.eclipse.jgit.generated.storage.dht.proto.GitStore.CachedPackInfo;
import org.eclipse.jgit.generated.storage.dht.proto.GitStore.ChunkMeta;
import org.eclipse.jgit.lib.AnyObjectId;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.MutableObjectId;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectIdSubclassMap;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.storage.dht.spi.Context;
import org.eclipse.jgit.storage.dht.spi.Database;
import org.eclipse.jgit.storage.dht.spi.WriteBuffer;
import org.eclipse.jgit.storage.file.PackLock;
import org.eclipse.jgit.transport.PackParser;
import org.eclipse.jgit.transport.PackedObjectInfo;
import org.eclipse.jgit.treewalk.CanonicalTreeParser;
import org.eclipse.jgit.util.LongList;
import com.google.protobuf.ByteString;
/** Parses the pack stream into chunks, and indexes the chunks for lookup. */
public class DhtPackParser extends PackParser {
private final DhtObjDatabase objdb;
private final RepositoryKey repo;
private final Database db;
private final DhtInserterOptions options;
private final MessageDigest chunkKeyDigest;
/** Number of objects to write to the global index at once. */
private final int linkBatchSize;
private Boolean saveAsCachedPack;
private WriteBuffer dbWriteBuffer;
/** Chunk writers for the 4 major object types, keyed by object type code. */
private ChunkFormatter[] openChunks;
/** Edges for current chunks. */
private Edges[] openEdges;
/** Prior chunks that were written, keyed by object type code. */
private List<ChunkKey>[] chunkByOrder;
/** Information on chunks already written out. */
private Map<ChunkKey, ChunkInfo> infoByKey;
/** Information on chunks already written out. */
private Map<ChunkKey, ChunkMeta> chunkMeta;
/** ChunkMeta that needs to be written out again, as it was modified. */
private Map<ChunkKey, ChunkMeta> dirtyMeta;
private Map<ChunkKey, Edges> chunkEdges;
// Correlated lists, sorted by object stream position.
private LongList objStreamPos;
private LongList objChunkPtrs;
/** Formatter handling the current object's data stream. */
private ChunkFormatter currChunk;
/** Current type of the object, if known. */
private int currType;
/** Position of the current object in the chunks we create. */
private long currChunkPtr;
/** If using OFS_DELTA, location of the base object in chunk space. */
private long currBasePtr;
/** Starting byte of the object data (aka end of the object header). */
private int currDataPos;
/** Total number of bytes in the object representation. */
private long currPackedSize;
/** Total number of bytes in the entire inflated object. */
private long currInflatedSize;
/** If the current object is fragmented, the list of chunks holding it. */
private List<ChunkKey> currFragments;
/** Previously written chunk that is being re-read during delta resolution. */
private PackChunk dbChunk;
/** Current read position in {@link #dbChunk}. */
private int dbPtr;
/** Recent chunks that were written, or recently read. */
private LinkedHashMap<ChunkKey, PackChunk> chunkReadBackCache;
/** Objects parsed from the stream, sorted by SHA-1. */
private List<DhtInfo> objectListByName;
/** Objects parsed from the stream, sorted by chunk (aka offset). */
private List<DhtInfo> objectListByChunk;
/** Iterators to write {@link #objectListByName} into the global index. */
private ListIterator<DhtInfo>[] linkIterators;
/** If the pack stream was self-contained, the cached pack info record key. */
private CachedPackKey cachedPackKey;
private CanonicalTreeParser treeParser;
private final MutableObjectId idBuffer;
private ObjectIdSubclassMap<DhtInfo> objectMap;
DhtPackParser(DhtObjDatabase objdb, InputStream in) {
super(objdb, in);
// Disable collision checking. DhtReader performs some magic to look
// only at old objects, so a colliding replacement will be ignored until
// its removed during garbage collection.
//
setCheckObjectCollisions(false);
this.objdb = objdb;
this.repo = objdb.getRepository().getRepositoryKey();
this.db = objdb.getDatabase();
this.options = objdb.getInserterOptions();
this.chunkKeyDigest = Constants.newMessageDigest();
dbWriteBuffer = db.newWriteBuffer();
openChunks = new ChunkFormatter[5];
openEdges = new Edges[5];
chunkByOrder = newListArray(5);
infoByKey = new HashMap<ChunkKey, ChunkInfo>();
dirtyMeta = new HashMap<ChunkKey, ChunkMeta>();
chunkMeta = new HashMap<ChunkKey, ChunkMeta>();
chunkEdges = new HashMap<ChunkKey, Edges>();
treeParser = new CanonicalTreeParser();
idBuffer = new MutableObjectId();
objectMap = new ObjectIdSubclassMap<DhtInfo>();
final int max = options.getParserCacheSize();
chunkReadBackCache = new LinkedHashMap<ChunkKey, PackChunk>(max, 0.75f, true) {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(Entry<ChunkKey, PackChunk> e) {
return max < size();
}
};
// The typical WriteBuffer flushes at 512 KiB increments, and
// the typical ObjectInfo record is around 180 bytes. Use these
// figures to come up with a rough estimate for how many links
// to construct in one region of the DHT before moving onto a
// different region in order to increase parallelism on large
// object imports.
//
linkBatchSize = 512 * 1024 / 180;
}
@SuppressWarnings("unchecked")
private static <T> List<T>[] newListArray(int size) {
return new List[size];
}
/** @return if true, the pack stream is marked as a cached pack. */
public boolean isSaveAsCachedPack() {
return saveAsCachedPack != null && saveAsCachedPack.booleanValue();
}
/**
* Enable saving the pack stream as a cached pack.
*
* @param save
* if true, the stream is saved.
*/
public void setSaveAsCachedPack(boolean save) {
saveAsCachedPack = Boolean.valueOf(save);
}
@Override
public PackLock parse(ProgressMonitor receiving, ProgressMonitor resolving)
throws IOException {
boolean success = false;
try {
PackLock lock = super.parse(receiving, resolving);
chunkReadBackCache = null;
openChunks = null;
openEdges = null;
treeParser = null;
final int objCnt = getObjectCount();
if (objCnt == 0) {
// If no objects were received, no chunks were created. Leaving
// success to false and doing a rollback is a good way to make
// sure this is true.
//
return lock;
}
createObjectLists();
if (isSaveAsCachedPack())
putCachedPack();
computeChunkEdges();
putChunkIndexes();
putDirtyMeta();
chunkMeta = null;
chunkEdges = null;
dirtyMeta = null;
objectMap = null;
objectListByChunk = null;
dbWriteBuffer.flush();
putGlobalIndex(resolving);
dbWriteBuffer.flush();
success = true;
return lock;
} finally {
openChunks = null;
openEdges = null;
objStreamPos = null;
objChunkPtrs = null;
currChunk = null;
currFragments = null;
dbChunk = null;
chunkReadBackCache = null;
infoByKey = null;
chunkMeta = null;
chunkEdges = null;
treeParser = null;
if (!success)
rollback();
chunkByOrder = null;
objectListByName = null;
objectListByChunk = null;
linkIterators = null;
dbWriteBuffer = null;
}
}
@SuppressWarnings("unchecked")
private void createObjectLists() {
List objs = getSortedObjectList(null /* by name */);
objectListByName = objs;
int cnt = objectListByName.size();
DhtInfo[] copy = objectListByName.toArray(new DhtInfo[cnt]);
Arrays.sort(copy, new Comparator<PackedObjectInfo>() {
public int compare(PackedObjectInfo o1, PackedObjectInfo o2) {
DhtInfo a = (DhtInfo) o1;
DhtInfo b = (DhtInfo) o2;
return Long.signum(a.chunkPtr - b.chunkPtr);
}
});
objectListByChunk = Arrays.asList(copy);
}
private void putCachedPack() throws DhtException {
CachedPackInfo.Builder info = CachedPackInfo.newBuilder();
for (DhtInfo obj : objectMap) {
if (!obj.isInPack())
return;
if (!obj.isReferenced())
info.getTipListBuilder().addObjectName(obj.name());
}
MessageDigest version = Constants.newMessageDigest();
addChunkList(info, version, chunkByOrder[OBJ_TAG]);
addChunkList(info, version, chunkByOrder[OBJ_COMMIT]);
addChunkList(info, version, chunkByOrder[OBJ_TREE]);
addChunkList(info, version, chunkByOrder[OBJ_BLOB]);
info.setName(computePackName().name());
info.setVersion(ObjectId.fromRaw(version.digest()).name());
cachedPackKey = CachedPackKey.fromInfo(info.build());
for (List<ChunkKey> list : chunkByOrder) {
if (list == null)
continue;
for (ChunkKey key : list) {
ChunkInfo oldInfo = infoByKey.get(key);
GitStore.ChunkInfo.Builder b =
GitStore.ChunkInfo.newBuilder(oldInfo.getData());
b.setCachedPackKey(cachedPackKey.asString());
ChunkInfo newInfo = new ChunkInfo(key, b.build());
infoByKey.put(key, newInfo);
// A fragment was already put, and has to be re-put.
// Non-fragments will put later and do not put now.
if (newInfo.getData().getIsFragment())
db.repository().put(repo, newInfo, dbWriteBuffer);
}
}
db.repository().put(repo, info.build(), dbWriteBuffer);
}
private void addChunkList(CachedPackInfo.Builder info,
MessageDigest version, List<ChunkKey> list) {
if (list == null)
return;
long bytesTotal = info.getBytesTotal();
long objectsTotal = info.getObjectsTotal();
long objectsDelta = info.getObjectsDelta();
byte[] buf = new byte[Constants.OBJECT_ID_LENGTH];
for (ChunkKey key : list) {
ChunkInfo chunkInfo = infoByKey.get(key);
GitStore.ChunkInfo c = chunkInfo.getData();
int len = c.getChunkSize() - ChunkFormatter.TRAILER_SIZE;
bytesTotal += len;
objectsTotal += c.getObjectCounts().getTotal();
objectsDelta += c.getObjectCounts().getOfsDelta();
objectsDelta += c.getObjectCounts().getRefDelta();
info.getChunkListBuilder().addChunkKey(
chunkInfo.getChunkKey().asString());
chunkInfo.getChunkKey().getChunkHash().copyRawTo(buf, 0);
version.update(buf);
}
info.setBytesTotal(bytesTotal);
info.setObjectsTotal(objectsTotal);
info.setObjectsDelta(objectsDelta);
}
private ObjectId computePackName() {
byte[] buf = new byte[Constants.OBJECT_ID_LENGTH];
MessageDigest md = Constants.newMessageDigest();
for (DhtInfo otp : objectListByName) {
otp.copyRawTo(buf, 0);
md.update(buf);
}
return ObjectId.fromRaw(md.digest());
}
private void rollback() throws DhtException {
try {
dbWriteBuffer.abort();
dbWriteBuffer = db.newWriteBuffer();
if (cachedPackKey != null)
db.repository().remove(repo, cachedPackKey, dbWriteBuffer);
if (linkIterators != null) {
boolean removed = true;
while (removed) {
removed = false;
for (ListIterator<DhtInfo> itr : linkIterators) {
int cnt = 0;
while (itr.hasPrevious() && cnt < linkBatchSize) {
DhtInfo oe = itr.previous();
db.objectIndex().remove( //
ObjectIndexKey.create(repo, oe), //
chunkOf(oe.chunkPtr), //
dbWriteBuffer);
cnt++;
}
if (0 < cnt)
removed = true;
}
}
}
deleteChunks(chunkByOrder[OBJ_COMMIT]);
deleteChunks(chunkByOrder[OBJ_TREE]);
deleteChunks(chunkByOrder[OBJ_BLOB]);
deleteChunks(chunkByOrder[OBJ_TAG]);
dbWriteBuffer.flush();
} catch (Throwable err) {
throw new DhtException(DhtText.get().packParserRollbackFailed, err);
}
}
private void deleteChunks(List<ChunkKey> list) throws DhtException {
if (list != null) {
for (ChunkKey key : list) {
db.chunk().remove(key, dbWriteBuffer);
db.repository().remove(repo, key, dbWriteBuffer);
}
}
}
private void putGlobalIndex(ProgressMonitor pm) throws DhtException {
int objcnt = objectListByName.size();
pm.beginTask(DhtText.get().recordingObjects, objcnt);
int segments = Math.max(1, Math.min(objcnt / linkBatchSize, 32));
linkIterators = newListIteratorArray(segments);
int objsPerSegment = objcnt / segments;
int beginIdx = 0;
for (int i = 0; i < segments - 1; i++) {
int endIdx = Math.min(beginIdx + objsPerSegment, objcnt);
linkIterators[i] = objectListByName.subList(beginIdx, endIdx)
.listIterator();
beginIdx = endIdx;
}
linkIterators[segments - 1] = objectListByName
.subList(beginIdx, objcnt).listIterator();
boolean inserted = true;
while (inserted) {
inserted = false;
for (ListIterator<DhtInfo> itr : linkIterators) {
int cnt = 0;
while (itr.hasNext() && cnt < linkBatchSize) {
DhtInfo oe = itr.next();
db.objectIndex().add( //
ObjectIndexKey.create(repo, oe), //
oe.info(chunkOf(oe.chunkPtr)), //
dbWriteBuffer);
cnt++;
}
if (0 < cnt) {
pm.update(cnt);
inserted = true;
}
}
}
pm.endTask();
}
@SuppressWarnings("unchecked")
private static ListIterator<DhtInfo>[] newListIteratorArray(int size) {
return new ListIterator[size];
}
private void computeChunkEdges() throws DhtException {
List<DhtInfo> objs = objectListByChunk;
int beginIdx = 0;
ChunkKey key = chunkOf(objs.get(0).chunkPtr);
int type = typeOf(objs.get(0).chunkPtr);
int objIdx = 1;
for (; objIdx < objs.size(); objIdx++) {
DhtInfo oe = objs.get(objIdx);
ChunkKey oeKey = chunkOf(oe.chunkPtr);
if (!key.equals(oeKey)) {
computeEdges(objs.subList(beginIdx, objIdx), key, type);
beginIdx = objIdx;
key = oeKey;
type = typeOf(oe.chunkPtr);
}
if (type != OBJ_MIXED && type != typeOf(oe.chunkPtr))
type = OBJ_MIXED;
}
computeEdges(objs.subList(beginIdx, objs.size()), key, type);
}
private void computeEdges(List<DhtInfo> objs, ChunkKey key, int type)
throws DhtException {
Edges edges = chunkEdges.get(key);
if (edges == null)
return;
for (DhtInfo obj : objs)
edges.remove(obj);
switch (type) {
case OBJ_COMMIT:
edges.commitEdges = toChunkList(edges.commitIds);
break;
case OBJ_TREE:
// TODO prefetch tree edges
break;
}
edges.commitIds = null;
}
private List<ChunkKey> toChunkList(Set<DhtInfo> objects)
throws DhtException {
if (objects == null || objects.isEmpty())
return null;
Map<ChunkKey, ChunkOrderingEntry> map = new HashMap<ChunkKey, ChunkOrderingEntry>();
for (DhtInfo obj : objects) {
if (!obj.isInPack())
continue;
long chunkPtr = obj.chunkPtr;
ChunkKey key = chunkOf(chunkPtr);
ChunkOrderingEntry e = map.get(key);
if (e == null) {
e = new ChunkOrderingEntry();
e.key = key;
e.order = chunkIdx(chunkPtr);
map.put(key, e);
} else {
e.order = Math.min(e.order, chunkIdx(chunkPtr));
}
}
ChunkOrderingEntry[] tmp = map.values().toArray(
new ChunkOrderingEntry[map.size()]);
Arrays.sort(tmp);
ChunkKey[] out = new ChunkKey[tmp.length];
for (int i = 0; i < tmp.length; i++)
out[i] = tmp[i].key;
return Arrays.asList(out);
}
private static final class ChunkOrderingEntry implements
Comparable<ChunkOrderingEntry> {
ChunkKey key;
int order;
public int compareTo(ChunkOrderingEntry o) {
return order - o.order;
}
}
private void putChunkIndexes() throws DhtException {
List<DhtInfo> objs = objectListByChunk;
int sIdx = 0;
DhtInfo oe = objs.get(0);
oe.setOffset(offsetOf(oe.chunkPtr));
ChunkKey key = chunkOf(oe.chunkPtr);
int type = typeOf(oe.chunkPtr);
int objIdx = 1;
for (; objIdx < objs.size(); objIdx++) {
oe = objs.get(objIdx);
oe.setOffset(offsetOf(oe.chunkPtr));
ChunkKey oeKey = chunkOf(oe.chunkPtr);
if (!key.equals(oeKey)) {
putChunkIndex(objs.subList(sIdx, objIdx), key, type);
sIdx = objIdx;
key = oeKey;
type = typeOf(oe.chunkPtr);
}
if (type != OBJ_MIXED && type != typeOf(oe.chunkPtr))
type = OBJ_MIXED;
}
putChunkIndex(objs.subList(sIdx, objs.size()), key, type);
}
private void putChunkIndex(List<DhtInfo> objectList, ChunkKey key, int type)
throws DhtException {
ChunkInfo oldInfo = infoByKey.get(key);
GitStore.ChunkInfo.Builder info
= GitStore.ChunkInfo.newBuilder(oldInfo.getData());
PackChunk.Members builder = new PackChunk.Members();
builder.setChunkKey(key);
byte[] index = ChunkIndex.create(objectList);
info.setIndexSize(index.length);
builder.setChunkIndex(index);
ChunkMeta meta = dirtyMeta.remove(key);
if (meta == null)
meta = chunkMeta.get(key);
switch (type) {
case OBJ_COMMIT: {
Edges edges = chunkEdges.get(key);
List<ChunkKey> e = edges != null ? edges.commitEdges : null;
List<ChunkKey> s = sequentialHint(key, OBJ_COMMIT);
if (e == null)
e = Collections.emptyList();
if (s == null)
s = Collections.emptyList();
if (!e.isEmpty() || !s.isEmpty()) {
ChunkMeta.Builder m = edit(meta);
ChunkMeta.PrefetchHint.Builder h = m.getCommitPrefetchBuilder();
for (ChunkKey k : e)
h.addEdge(k.asString());
for (ChunkKey k : s)
h.addSequential(k.asString());
meta = m.build();
}
break;
}
case OBJ_TREE: {
List<ChunkKey> s = sequentialHint(key, OBJ_TREE);
if (s == null)
s = Collections.emptyList();
if (!s.isEmpty()) {
ChunkMeta.Builder m = edit(meta);
ChunkMeta.PrefetchHint.Builder h = m.getTreePrefetchBuilder();
for (ChunkKey k : s)
h.addSequential(k.asString());
meta = m.build();
}
break;
}
}
if (meta != null) {
info.setMetaSize(meta.getSerializedSize());
builder.setMeta(meta);
}
ChunkInfo newInfo = new ChunkInfo(key, info.build());
infoByKey.put(key, newInfo);
db.repository().put(repo, newInfo, dbWriteBuffer);
db.chunk().put(builder, dbWriteBuffer);
}
private static ChunkMeta.Builder edit(ChunkMeta meta) {
if (meta != null)
return ChunkMeta.newBuilder(meta);
return ChunkMeta.newBuilder();
}
private List<ChunkKey> sequentialHint(ChunkKey key, int typeCode) {
List<ChunkKey> all = chunkByOrder[typeCode];
if (all == null)
return null;
int idx = all.indexOf(key);
if (0 <= idx) {
int max = options.getPrefetchDepth();
int end = Math.min(idx + 1 + max, all.size());
return all.subList(idx + 1, end);
}
return null;
}
private void putDirtyMeta() throws DhtException {
for (Map.Entry<ChunkKey, ChunkMeta> meta : dirtyMeta.entrySet()) {
PackChunk.Members builder = new PackChunk.Members();
builder.setChunkKey(meta.getKey());
builder.setMeta(meta.getValue());
db.chunk().put(builder, dbWriteBuffer);
}
}
@Override
protected PackedObjectInfo newInfo(AnyObjectId id, UnresolvedDelta delta,
ObjectId baseId) {
DhtInfo obj = objectMap.addIfAbsent(new DhtInfo(id));
if (delta != null) {
DhtDelta d = (DhtDelta) delta;
obj.chunkPtr = d.chunkPtr;
obj.packedSize = d.packedSize;
obj.inflatedSize = d.inflatedSize;
obj.base = baseId;
obj.setType(d.getType());
if (d.isFragmented())
obj.setFragmented();
}
return obj;
}
@Override
protected void onPackHeader(long objCnt) throws IOException {
if (Integer.MAX_VALUE < objCnt) {
throw new DhtException(MessageFormat.format(
DhtText.get().tooManyObjectsInPack, Long.valueOf(objCnt)));
}
objStreamPos = new LongList((int) objCnt);
objChunkPtrs = new LongList((int) objCnt);
if (saveAsCachedPack == null)
setSaveAsCachedPack(1000 < objCnt);
}
@Override
protected void onBeginWholeObject(long streamPosition, int type,
long inflatedSize) throws IOException {
ChunkFormatter w = begin(type);
if (!w.whole(type, inflatedSize)) {
endChunk(type);
w = begin(type);
if (!w.whole(type, inflatedSize))
throw panicCannotInsert();
}
currType = type;
currDataPos = w.position();
currPackedSize = 0;
currInflatedSize = inflatedSize;
objStreamPos.add(streamPosition);
}
@Override
protected void onEndWholeObject(PackedObjectInfo info) throws IOException {
boolean fragmented = currFragments != null;
endOneObject();
DhtInfo oe = (DhtInfo) info;
oe.chunkPtr = currChunkPtr;
oe.packedSize = currPackedSize;
oe.inflatedSize = currInflatedSize;
oe.setType(currType);
if (fragmented)
oe.setFragmented();
}
private void endOneObject() throws DhtException {
if (currFragments != null)
endFragmentedObject();
objChunkPtrs.add(currChunkPtr);
}
@Override
protected void onBeginOfsDelta(long deltaPos, long basePos,
long inflatedSize) throws IOException {
long basePtr = objChunkPtrs.get(findStreamIndex(basePos));
int type = typeOf(basePtr);
currType = type;
currPackedSize = 0;
currInflatedSize = inflatedSize;
currBasePtr = basePtr;
objStreamPos.add(deltaPos);
ChunkFormatter w = begin(type);
if (isInCurrentChunk(basePtr)) {
if (w.ofsDelta(inflatedSize, w.position() - offsetOf(basePtr))) {
currDataPos = w.position();
return;
}
endChunk(type);
w = begin(type);
}
if (!longOfsDelta(w, inflatedSize, basePtr)) {
endChunk(type);
w = begin(type);
if (!longOfsDelta(w, inflatedSize, basePtr))
throw panicCannotInsert();
}
currDataPos = w.position();
}
@Override
protected void onBeginRefDelta(long deltaPos, AnyObjectId baseId,
long inflatedSize) throws IOException {
// Try to get the base type, but only if it was seen before in this
// pack stream. If not assume worst-case of BLOB type.
//
int typeCode;
DhtInfo baseInfo = objectMap.get(baseId);
if (baseInfo != null && baseInfo.isInPack()) {
typeCode = baseInfo.getType();
currType = typeCode;
} else {
typeCode = OBJ_BLOB;
currType = -1;
}
ChunkFormatter w = begin(typeCode);
if (!w.refDelta(inflatedSize, baseId)) {
endChunk(typeCode);
w = begin(typeCode);
if (!w.refDelta(inflatedSize, baseId))
throw panicCannotInsert();
}
currDataPos = w.position();
currPackedSize = 0;
currInflatedSize = inflatedSize;
objStreamPos.add(deltaPos);
}
@Override
protected DhtDelta onEndDelta() throws IOException {
boolean fragmented = currFragments != null;
endOneObject();
DhtDelta delta = new DhtDelta();
delta.chunkPtr = currChunkPtr;
delta.packedSize = currPackedSize;
delta.inflatedSize = currInflatedSize;
if (0 < currType)
delta.setType(currType);
if (fragmented)
delta.setFragmented();
return delta;
}
@Override
protected void onObjectData(Source src, byte[] raw, int pos, int len)
throws IOException {
if (src != Source.INPUT)
return;
if (currChunk.append(raw, pos, len)) {
currPackedSize += len;
return;
}
if (currFragments == null && currChunk.getObjectCount() == 1)
currFragments = new LinkedList<ChunkKey>();
if (currFragments != null) {
appendToFragment(raw, pos, len);
return;
}
// Everything between dataPos and dataEnd must be saved.
//
final int dataPos = currDataPos;
final int dataEnd = currChunk.position();
final int hdrPos = offsetOf(currChunkPtr);
final int hdrLen = dataPos - hdrPos;
final int type = typeOf(currChunkPtr);
byte[] dataOld = currChunk.getRawChunkDataArray();
final int typeOld = currChunk.getCurrentObjectType();
currChunk.rollback();
endChunk(type);
final ChunkFormatter w = begin(type);
switch (typeOld) {
case OBJ_COMMIT:
case OBJ_BLOB:
case OBJ_TREE:
case OBJ_TAG:
case OBJ_REF_DELTA:
w.adjustObjectCount(1, typeOld);
if (!w.append(dataOld, hdrPos, hdrLen))
throw panicCannotInsert();
break;
case OBJ_OFS_DELTA:
if (!longOfsDelta(w, currInflatedSize, currBasePtr))
throw panicCannotInsert();
break;
default:
throw new DhtException("Internal programming error: " + typeOld);
}
currDataPos = w.position();
if (dataPos < dataEnd && !w.append(dataOld, dataPos, dataEnd - dataPos))
throw panicCannotInsert();
dataOld = null;
if (w.append(raw, pos, len)) {
currPackedSize += len;
} else {
currFragments = new LinkedList<ChunkKey>();
appendToFragment(raw, pos, len);
}
}
private boolean longOfsDelta(ChunkFormatter w, long infSize, long basePtr) {
final int type = typeOf(basePtr);
final List<ChunkKey> infoList = chunkByOrder[type];
final int baseIdx = chunkIdx(basePtr);
final ChunkInfo baseInfo = infoByKey.get(infoList.get(baseIdx));
// Go backwards to the start of the base's chunk.
long relativeChunkStart = 0;
for (int i = infoList.size() - 1; baseIdx <= i; i--) {
GitStore.ChunkInfo info = infoByKey.get(infoList.get(i)).getData();
int packSize = info.getChunkSize() - ChunkFormatter.TRAILER_SIZE;
relativeChunkStart += packSize;
}
// Offset to the base goes back to start of our chunk, then start of
// the base chunk, but slide forward the distance of the base within
// its own chunk.
//
long ofs = w.position() + relativeChunkStart - offsetOf(basePtr);
if (w.ofsDelta(infSize, ofs)) {
w.useBaseChunk(relativeChunkStart, baseInfo.getChunkKey());
return true;
}
return false;
}
private void appendToFragment(byte[] raw, int pos, int len)
throws DhtException {
while (0 < len) {
if (currChunk.free() == 0) {
int typeCode = typeOf(currChunkPtr);
currChunk.setFragment();
currFragments.add(endChunk(typeCode));
currChunk = openChunk(typeCode);
}
int n = Math.min(len, currChunk.free());
currChunk.append(raw, pos, n);
currPackedSize += n;
pos += n;
len -= n;
}
}
private void endFragmentedObject() throws DhtException {
currChunk.setFragment();
ChunkKey lastKey = endChunk(typeOf(currChunkPtr));
if (lastKey != null)
currFragments.add(lastKey);
ChunkMeta.Builder protoBuilder = ChunkMeta.newBuilder();
for (ChunkKey key : currFragments)
protoBuilder.addFragment(key.asString());
ChunkMeta protoMeta = protoBuilder.build();
for (ChunkKey key : currFragments) {
ChunkMeta oldMeta = chunkMeta.get(key);
if (oldMeta != null) {
ChunkMeta.Builder newMeta = ChunkMeta.newBuilder(oldMeta);
newMeta.clearFragment();
newMeta.mergeFrom(protoMeta);
ChunkMeta meta = newMeta.build();
dirtyMeta.put(key, meta);
chunkMeta.put(key, meta);
} else {
dirtyMeta.put(key, protoMeta);
chunkMeta.put(key, protoMeta);
}
}
currFragments = null;
}
@Override
protected void onInflatedObjectData(PackedObjectInfo obj, int typeCode,
byte[] data) throws IOException {
DhtInfo info = (DhtInfo) obj;
info.inflatedSize = data.length;
info.setType(typeCode);
switch (typeCode) {
case OBJ_COMMIT:
onCommit(info, data);
break;
case OBJ_TREE:
onTree(info, data);
break;
case OBJ_TAG:
onTag(info, data);
break;
}
}
private void onCommit(DhtInfo obj, byte[] raw) throws DhtException {
Edges edges = edges(obj.chunkPtr);
edges.remove(obj);
// TODO compute hints for trees.
if (isSaveAsCachedPack()) {
idBuffer.fromString(raw, 5);
lookupByName(idBuffer).setReferenced();
}
int ptr = 46;
while (raw[ptr] == 'p') {
idBuffer.fromString(raw, ptr + 7);
DhtInfo p = lookupByName(idBuffer);
p.setReferenced();
edges.commit(p);
ptr += 48;
}
}
private void onTree(DhtInfo obj, byte[] data) {
if (isSaveAsCachedPack()) {
treeParser.reset(data);
while (!treeParser.eof()) {
idBuffer.fromRaw(treeParser.idBuffer(), treeParser.idOffset());
lookupByName(idBuffer).setReferenced();
treeParser.next();
}
}
}
private void onTag(DhtInfo obj, byte[] data) {
if (isSaveAsCachedPack()) {
idBuffer.fromString(data, 7); // "object $sha1"
lookupByName(idBuffer).setReferenced();
}
}
private DhtInfo lookupByName(AnyObjectId obj) {
DhtInfo info = objectMap.get(obj);
if (info == null) {
info = new DhtInfo(obj);
objectMap.add(info);
}
return info;
}
private Edges edges(long chunkPtr) throws DhtException {
if (isInCurrentChunk(chunkPtr)) {
int type = typeOf(chunkPtr);
Edges s = openEdges[type];
if (s == null) {
s = new Edges();
openEdges[type] = s;
}
return s;
} else {
ChunkKey key = chunkOf(chunkPtr);
Edges s = chunkEdges.get(key);
if (s == null) {
s = new Edges();
chunkEdges.put(key, s);
}
return s;
}
}
private static class Edges {
Set<DhtInfo> commitIds;
List<ChunkKey> commitEdges;
void commit(DhtInfo id) {
if (commitIds == null)
commitIds = new HashSet<DhtInfo>();
commitIds.add(id);
}
void remove(DhtInfo id) {
if (commitIds != null)
commitIds.remove(id);
}
}
@Override
protected ObjectTypeAndSize seekDatabase(PackedObjectInfo obj,
ObjectTypeAndSize info) throws IOException {
return seekDatabase(((DhtInfo) obj).chunkPtr, info);
}
@Override
protected ObjectTypeAndSize seekDatabase(UnresolvedDelta delta,
ObjectTypeAndSize info) throws IOException {
return seekDatabase(((DhtDelta) delta).chunkPtr, info);
}
private ObjectTypeAndSize seekDatabase(long chunkPtr, ObjectTypeAndSize info)
throws DhtException {
seekChunk(chunkOf(chunkPtr), true);
dbPtr = dbChunk.readObjectTypeAndSize(offsetOf(chunkPtr), info);
return info;
}
@Override
protected int readDatabase(byte[] dst, int pos, int cnt) throws IOException {
int n = dbChunk.read(dbPtr, dst, pos, cnt);
if (0 < n) {
dbPtr += n;
return n;
}
// ChunkMeta for fragments is delayed writing, so it isn't available
// on the chunk if the chunk was read-back from the database. Use
// our copy of ChunkMeta instead of the PackChunk's copy.
ChunkMeta meta = chunkMeta.get(dbChunk.getChunkKey());
if (meta == null)
return 0;
ChunkKey next = ChunkMetaUtil.getNextFragment(meta, dbChunk.getChunkKey());
if (next == null)
return 0;
seekChunk(next, false);
n = dbChunk.read(0, dst, pos, cnt);
dbPtr = n;
return n;
}
private void seekChunk(ChunkKey key, boolean cache) throws DhtException,
DhtTimeoutException {
if (dbChunk == null || !dbChunk.getChunkKey().equals(key)) {
dbChunk = chunkReadBackCache.get(key);
if (dbChunk == null) {
dbWriteBuffer.flush();
Collection<PackChunk.Members> found;
Context opt = Context.READ_REPAIR;
Sync<Collection<PackChunk.Members>> sync = Sync.create();
db.chunk().get(opt, Collections.singleton(key), sync);
try {
found = sync.get(objdb.getReaderOptions().getTimeout());
} catch (InterruptedException e) {
throw new DhtTimeoutException(e);
} catch (TimeoutException e) {
throw new DhtTimeoutException(e);
}
if (found.isEmpty()) {
throw new DhtException(MessageFormat.format(
DhtText.get().missingChunk, key));
}
dbChunk = found.iterator().next().build();
if (cache)
chunkReadBackCache.put(key, dbChunk);
}
}
}
@Override
protected boolean onAppendBase(int typeCode, byte[] data,
PackedObjectInfo info) throws IOException {
return false; // This implementation does not copy base objects.
}
@Override
protected void onEndThinPack() throws IOException {
// Do nothing, this event is not relevant.
}
@Override
protected void onPackFooter(byte[] hash) throws IOException {
// TODO Combine together fractional chunks to reduce overhead.
// Fractional chunks are common for single-commit pushes since
// they are broken out by object type.
// TODO Try to combine the chunk data and its index into a single
// put call for the last chunk of each type. This would break the
// read back we do in seekDatabase during delta resolution.
// If there are deltas to be resolved the pending chunks
// will need to be reloaded later. Ensure they are stored.
//
endChunk(OBJ_COMMIT);
endChunk(OBJ_TREE);
endChunk(OBJ_BLOB);
endChunk(OBJ_TAG);
// These are only necessary during initial parsing. Drop them now.
//
objStreamPos = null;
objChunkPtrs = null;
}
@Override
protected void onObjectHeader(Source src, byte[] raw, int pos, int len)
throws IOException {
// Do nothing, the original stream headers are not used.
}
@Override
protected void onStoreStream(byte[] raw, int pos, int len)
throws IOException {
// Do nothing, the stream is being sliced and cannot be stored as-is.
}
@Override
protected boolean checkCRC(int oldCRC) {
return true; // Don't bother to check CRCs, assume the chunk is OK.
}
private ChunkFormatter begin(int typeCode) throws DhtException {
ChunkFormatter w = openChunk(typeCode);
currChunk = w;
currChunkPtr = makeObjectPointer(w, typeCode);
return w;
}
private ChunkFormatter openChunk(int typeCode) throws DhtException {
if (typeCode == 0)
throw new DhtException("Invalid internal typeCode 0");
ChunkFormatter w = openChunks[typeCode];
if (w == null) {
w = new ChunkFormatter(repo, options);
w.setSource(GitStore.ChunkInfo.Source.RECEIVE);
w.setObjectType(typeCode);
openChunks[typeCode] = w;
}
return w;
}
private ChunkKey endChunk(int typeCode) throws DhtException {
ChunkFormatter w = openChunks[typeCode];
if (w == null)
return null;
openChunks[typeCode] = null;
currChunk = null;
if (w.isEmpty())
return null;
ChunkKey key = w.end(chunkKeyDigest);
ChunkInfo info = w.getChunkInfo();
if (chunkByOrder[typeCode] == null)
chunkByOrder[typeCode] = new ArrayList<ChunkKey>();
chunkByOrder[typeCode].add(key);
infoByKey.put(key, info);
if (w.getChunkMeta() != null)
chunkMeta.put(key, w.getChunkMeta());
Edges e = openEdges[typeCode];
if (e != null) {
chunkEdges.put(key, e);
openEdges[typeCode] = null;
}
if (currFragments == null)
chunkReadBackCache.put(key, w.getPackChunk());
w.unsafePut(db, dbWriteBuffer);
return key;
}
private int findStreamIndex(long streamPosition) throws DhtException {
int high = objStreamPos.size();
int low = 0;
do {
final int mid = (low + high) >>> 1;
final long pos = objStreamPos.get(mid);
if (streamPosition < pos)
high = mid;
else if (streamPosition == pos)
return mid;
else
low = mid + 1;
} while (low < high);
throw new DhtException(MessageFormat.format(
DhtText.get().noSavedTypeForBase, Long.valueOf(streamPosition)));
}
private long makeObjectPointer(ChunkFormatter w, int typeCode) {
List<ChunkKey> list = chunkByOrder[typeCode];
int idx = list == null ? 0 : list.size();
int ptr = w.position();
return (((long) typeCode) << 61) | (((long) idx) << 32) | ptr;
}
private static int typeOf(long objectPtr) {
return (int) (objectPtr >>> 61);
}
private static int chunkIdx(long objectPtr) {
return ((int) ((objectPtr << 3) >>> (32 + 3)));
}
private static int offsetOf(long objectPtr) {
return (int) objectPtr;
}
private boolean isInCurrentChunk(long objectPtr) {
List<ChunkKey> list = chunkByOrder[typeOf(objectPtr)];
if (list == null)
return chunkIdx(objectPtr) == 0;
return chunkIdx(objectPtr) == list.size();
}
private ChunkKey chunkOf(long objectPtr) throws DhtException {
List<ChunkKey> list = chunkByOrder[typeOf(objectPtr)];
int idx = chunkIdx(objectPtr);
if (list == null || list.size() <= idx) {
throw new DhtException(MessageFormat.format(
DhtText.get().packParserInvalidPointer, //
Constants.typeString(typeOf(objectPtr)), //
Integer.valueOf(idx), //
Integer.valueOf(offsetOf(objectPtr))));
}
return list.get(idx);
}
private static DhtException panicCannotInsert() {
// This exception should never happen.
return new DhtException(DhtText.get().cannotInsertObject);
}
static class DhtInfo extends PackedObjectInfo {
private static final int REFERENCED = 1 << 3;
static final int FRAGMENTED = 1 << 4;
long chunkPtr;
long packedSize;
long inflatedSize;
ObjectId base;
DhtInfo(AnyObjectId id) {
super(id);
}
boolean isInPack() {
return chunkPtr != 0;
}
boolean isReferenced() {
return (getCRC() & REFERENCED) != 0;
}
void setReferenced() {
setCRC(getCRC() | REFERENCED);
}
boolean isFragmented() {
return (getCRC() & FRAGMENTED) != 0;
}
void setFragmented() {
setCRC(getCRC() | FRAGMENTED);
}
int getType() {
return getCRC() & 7;
}
void setType(int type) {
setCRC((getCRC() & ~7) | type);
}
ObjectInfo info(ChunkKey chunkKey) {
GitStore.ObjectInfo.Builder b = GitStore.ObjectInfo.newBuilder();
b.setObjectType(GitStore.ObjectInfo.ObjectType.valueOf(getType()));
b.setOffset(offsetOf(chunkPtr));
b.setPackedSize(packedSize);
b.setInflatedSize(inflatedSize);
if (base != null) {
byte[] t = new byte[Constants.OBJECT_ID_LENGTH];
base.copyRawTo(t, 0);
b.setDeltaBase(ByteString.copyFrom(t));
}
if (isFragmented())
b.setIsFragmented(true);
return new ObjectInfo(chunkKey, b.build());
}
}
static class DhtDelta extends UnresolvedDelta {
long chunkPtr;
long packedSize;
long inflatedSize;
int getType() {
return getCRC() & 7;
}
void setType(int type) {
setCRC((getCRC() & ~7) | type);
}
boolean isFragmented() {
return (getCRC() & DhtInfo.FRAGMENTED) != 0;
}
void setFragmented() {
setCRC(getCRC() | DhtInfo.FRAGMENTED);
}
}
}