blob: 743f1f59445b053adbb5c604449098f03898afb2 [file] [log] [blame]
/*
* Copyright (C) 2011, Google Inc.
* and other copyright owners as documented in the project's IP log.
*
* This program and the accompanying materials are made available
* under the terms of the Eclipse Distribution License v1.0 which
* accompanies this distribution, is reproduced below, and is
* available at http://www.eclipse.org/org/documents/edl-v10.php
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* - Neither the name of the Eclipse Foundation, Inc. nor the
* names of its contributors may be used to endorse or promote
* products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package org.eclipse.jgit.storage.dht;
import static org.eclipse.jgit.lib.Constants.OBJ_COMMIT;
import static org.eclipse.jgit.lib.Constants.OBJ_TREE;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.eclipse.jgit.errors.MissingObjectException;
import org.eclipse.jgit.generated.storage.dht.proto.GitStore.ChunkMeta;
import org.eclipse.jgit.lib.AnyObjectId;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevTree;
import org.eclipse.jgit.storage.dht.DhtReader.ChunkAndOffset;
import org.eclipse.jgit.storage.dht.spi.Context;
import org.eclipse.jgit.storage.dht.spi.Database;
class Prefetcher implements StreamingCallback<Collection<PackChunk.Members>> {
private static enum Status {
ON_QUEUE, LOADING, WAITING, READY, DONE;
}
private final Database db;
private final DhtReader.Statistics stats;
private final int objectType;
private final HashMap<ChunkKey, PackChunk> ready;
private final HashMap<ChunkKey, Status> status;
private final LinkedList<ChunkKey> queue;
private final boolean followEdgeHints;
private final int averageChunkSize;
private final int highWaterMark;
private final int lowWaterMark;
private boolean first = true;
private boolean automaticallyPushHints = true;
private ChunkKey stopAt;
private int bytesReady;
private int bytesLoading;
private DhtException error;
Prefetcher(DhtReader reader, int objectType) {
this.db = reader.getDatabase();
this.stats = reader.getStatistics();
this.objectType = objectType;
this.ready = new HashMap<ChunkKey, PackChunk>();
this.status = new HashMap<ChunkKey, Status>();
this.queue = new LinkedList<ChunkKey>();
this.followEdgeHints = reader.getOptions().isPrefetchFollowEdgeHints();
this.averageChunkSize = reader.getInserterOptions().getChunkSize();
this.highWaterMark = reader.getOptions().getPrefetchLimit();
int lwm = (highWaterMark / averageChunkSize) - 4;
if (lwm <= 0)
lwm = (highWaterMark / averageChunkSize) / 2;
lowWaterMark = lwm * averageChunkSize;
}
boolean isType(int type) {
return objectType == type;
}
void push(DhtReader ctx, Collection<RevCommit> roots) {
// Approximate walk by using hints from the most recent commit.
// Since the commits were recently parsed by the reader, we can
// ask the reader for their chunk locations and most likely get
// cache hits.
int time = -1;
PackChunk chunk = null;
for (RevCommit cmit : roots) {
if (time < cmit.getCommitTime()) {
ChunkAndOffset p = ctx.getChunkGently(cmit);
if (p != null && p.chunk.getMeta() != null) {
time = cmit.getCommitTime();
chunk = p.chunk;
}
}
}
if (chunk != null) {
synchronized (this) {
status.put(chunk.getChunkKey(), Status.DONE);
push(chunk.getMeta());
}
}
}
void push(DhtReader ctx, RevTree start, RevTree end) throws DhtException,
MissingObjectException {
// Unlike commits, trees aren't likely to be loaded when they
// are pushed into the prefetcher. Find the tree and load it
// as necessary to get the prefetch meta established.
//
Sync<Map<ObjectIndexKey, Collection<ObjectInfo>>> sync = Sync.create();
Set<ObjectIndexKey> toFind = new HashSet<ObjectIndexKey>();
toFind.add(ObjectIndexKey.create(ctx.getRepositoryKey(), start));
toFind.add(ObjectIndexKey.create(ctx.getRepositoryKey(), end));
db.objectIndex().get(Context.READ_REPAIR, toFind, sync);
Map<ObjectIndexKey, Collection<ObjectInfo>> trees;
try {
trees = sync.get(ctx.getOptions().getTimeout());
} catch (InterruptedException e) {
throw new DhtTimeoutException(e);
} catch (TimeoutException e) {
throw new DhtTimeoutException(e);
}
ChunkKey startKey = chunk(trees.get(start));
if (startKey == null)
throw DhtReader.missing(start, OBJ_TREE);
ChunkKey endKey = chunk(trees.get(end));
if (endKey == null)
throw DhtReader.missing(end, OBJ_TREE);
synchronized (this) {
stopAt = endKey;
push(startKey);
maybeStartGet();
}
}
private static ChunkKey chunk(Collection<ObjectInfo> info) {
if (info == null || info.isEmpty())
return null;
List<ObjectInfo> infoList = new ArrayList<ObjectInfo>(info);
ObjectInfo.sort(infoList);
return infoList.get(0).getChunkKey();
}
void push(ChunkKey key) {
push(Collections.singleton(key));
}
void push(ChunkMeta meta) {
if (meta == null)
return;
ChunkMeta.PrefetchHint hint;
switch (objectType) {
case OBJ_COMMIT:
hint = meta.getCommitPrefetch();
break;
case OBJ_TREE:
hint = meta.getTreePrefetch();
break;
default:
return;
}
if (hint != null) {
synchronized (this) {
if (followEdgeHints && 0 < hint.getEdgeCount())
push(hint.getEdgeList());
else
push(hint.getSequentialList());
}
}
}
private void push(List<String> list) {
List<ChunkKey> keys = new ArrayList<ChunkKey>(list.size());
for (String keyString : list)
keys.add(ChunkKey.fromString(keyString));
push(keys);
}
void push(Iterable<ChunkKey> list) {
synchronized (this) {
for (ChunkKey key : list) {
if (status.containsKey(key))
continue;
status.put(key, Status.ON_QUEUE);
queue.add(key);
if (key.equals(stopAt)) {
automaticallyPushHints = false;
break;
}
}
if (!first)
maybeStartGet();
}
}
synchronized ChunkAndOffset find(RepositoryKey repo, AnyObjectId objId) {
for (PackChunk c : ready.values()) {
int p = c.findOffset(repo, objId);
if (0 <= p)
return new ChunkAndOffset(useReadyChunk(c.getChunkKey()), p);
}
return null;
}
synchronized PackChunk get(ChunkKey key) throws DhtException {
GET: for (;;) {
if (error != null)
throw error;
Status chunkStatus = status.get(key);
if (chunkStatus == null)
return null;
switch (chunkStatus) {
case ON_QUEUE:
if (queue.isEmpty()) {
// Should never happen, but let the caller load.
status.put(key, Status.DONE);
return null;
} else if (bytesReady + bytesLoading < highWaterMark) {
// Make sure its first in the queue, start, and wait.
if (!queue.getFirst().equals(key)) {
int idx = queue.indexOf(key);
if (first && objectType == OBJ_COMMIT) {
// If the prefetcher has not started yet, skip all
// chunks up to this first request. Assume this
// initial out-of-order get occurred because the
// RevWalk has already parsed all of the commits
// up to this point and does not need them again.
//
for (; 0 < idx; idx--)
status.put(queue.removeFirst(), Status.DONE);
forceStartGet();
continue GET;
}
stats.access(key).cntPrefetcher_OutOfOrder++;
queue.remove(idx);
queue.addFirst(key);
}
forceStartGet();
continue GET;
} else {
// It cannot be moved up to the front of the queue
// without violating the prefetch size. Let the
// caller load the chunk out of order.
stats.access(key).cntPrefetcher_OutOfOrder++;
status.put(key, Status.DONE);
return null;
}
case LOADING: // Wait for a prefetch that is already started.
status.put(key, Status.WAITING);
//$FALL-THROUGH$
case WAITING:
stats.access(key).cntPrefetcher_WaitedForLoad++;
try {
wait();
} catch (InterruptedException e) {
throw new DhtTimeoutException(e);
}
continue GET;
case READY:
return useReadyChunk(key);
case DONE:
stats.access(key).cntPrefetcher_Revisited++;
return null;
default:
throw new IllegalStateException(key + " " + chunkStatus);
}
}
}
private PackChunk useReadyChunk(ChunkKey key) {
PackChunk chunk = ready.remove(key);
status.put(chunk.getChunkKey(), Status.DONE);
bytesReady -= chunk.getTotalSize();
if (automaticallyPushHints) {
push(chunk.getMeta());
maybeStartGet();
}
return chunk;
}
private void maybeStartGet() {
if (!queue.isEmpty() && bytesReady + bytesLoading <= lowWaterMark)
forceStartGet();
}
private void forceStartGet() {
// Use a LinkedHashSet so insertion order is iteration order.
// This may help a provider that loads sequentially in the
// set's iterator order to load in the order we want data.
//
LinkedHashSet<ChunkKey> toLoad = new LinkedHashSet<ChunkKey>();
while (bytesReady + bytesLoading < highWaterMark && !queue.isEmpty()) {
ChunkKey key = queue.removeFirst();
stats.access(key).cntPrefetcher_Load++;
toLoad.add(key);
status.put(key, Status.LOADING);
bytesLoading += averageChunkSize;
// For the first chunk, start immediately to reduce the
// startup latency associated with additional chunks.
if (first)
break;
}
if (!toLoad.isEmpty() && error == null)
db.chunk().get(Context.LOCAL, toLoad, this);
if (first) {
first = false;
maybeStartGet();
}
}
public synchronized void onPartialResult(Collection<PackChunk.Members> res) {
try {
bytesLoading -= averageChunkSize * res.size();
for (PackChunk.Members builder : res)
chunkIsReady(builder.build());
} catch (DhtException loadError) {
onError(loadError);
}
}
private void chunkIsReady(PackChunk chunk) {
ChunkKey key = chunk.getChunkKey();
ready.put(key, chunk);
bytesReady += chunk.getTotalSize();
if (status.put(key, Status.READY) == Status.WAITING)
notifyAll();
}
public synchronized void onSuccess(Collection<PackChunk.Members> result) {
if (result != null && !result.isEmpty())
onPartialResult(result);
}
public synchronized void onFailure(DhtException asyncError) {
onError(asyncError);
}
private void onError(DhtException asyncError) {
if (error == null) {
error = asyncError;
notifyAll();
}
}
}