Merge branch 'stable-3.1'

* stable-3.1:
  Don't reset storage when rescheduling for collisions

Change-Id: I395aedd271fdadf2aaeabcfb740f81cf42c0c33a
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index fe5dbad..782ff4f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -67,6 +67,11 @@
   }
 
   @Override
+  public int getDistributionInterval() {
+    return currentConfig.getDistributionInterval();
+  }
+
+  @Override
   public synchronized int getMaxRefsToLog() {
     return currentConfig.getMaxRefsToLog();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java
index bb9097e..66251a5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.flogger.FluentLogger;
@@ -53,6 +55,11 @@
         continue;
       }
 
+      if (!c.getFetchRefSpecs().isEmpty()) {
+        repLog.atInfo().log("Ignore '%s' endpoint: not a 'push' target", c.getName());
+        continue;
+      }
+
       // If destination for push is not set assume equal to source.
       for (RefSpec ref : c.getPushRefSpecs()) {
         if (ref.getDestination() == null) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 53fb392..d53b9c0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.google.gerrit.server.project.ProjectCache.noSuchProject;
 import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName;
 import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
 import static org.eclipse.jgit.transport.RemoteRefUpdate.Status.NON_EXISTING;
@@ -30,6 +31,7 @@
 import com.google.gerrit.entities.BranchNameKey;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.exceptions.StorageException;
 import com.google.gerrit.extensions.config.FactoryModule;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
@@ -65,6 +67,7 @@
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -307,16 +310,12 @@
               () -> {
                 ProjectState projectState;
                 try {
-                  projectState = projectCache.checkedGet(project);
-                } catch (IOException e) {
+                  projectState = projectCache.get(project).orElseThrow(noSuchProject(project));
+                } catch (StorageException e) {
                   repLog.atWarning().withCause(e).log(
                       "Error reading project %s from cache", project);
                   return false;
                 }
-                if (projectState == null) {
-                  repLog.atFine().log("Project %s does not exist", project);
-                  throw new NoSuchProjectException(project);
-                }
                 if (!projectState.statePermitsRead()) {
                   repLog.atFine().log("Project %s does not permit read", project);
                   return false;
@@ -359,13 +358,10 @@
               () -> {
                 ProjectState projectState;
                 try {
-                  projectState = projectCache.checkedGet(project);
-                } catch (IOException e) {
+                  projectState = projectCache.get(project).orElseThrow(noSuchProject(project));
+                } catch (StorageException e) {
                   return false;
                 }
-                if (projectState == null) {
-                  throw new NoSuchProjectException(project);
-                }
                 return shouldReplicate(projectState, userProvider.get());
               })
           .call();
@@ -578,7 +574,9 @@
       if (inFlightOp != null) {
         return RunwayStatus.denied(inFlightOp.getId());
       }
-      replicationTasksStorage.get().start(op);
+      if (!replicationTasksStorage.get().start(op)) {
+        return RunwayStatus.deniedExternal();
+      }
       inFlight.put(op.getURI(), op);
     }
     return RunwayStatus.allowed();
@@ -591,6 +589,17 @@
     }
   }
 
+  public Set<String> getPrunableTaskNames() {
+    Set<String> names = new HashSet<>();
+    for (PushOne push : pending.values()) {
+      if (!replicationTasksStorage.get().isWaiting(push)) {
+        repLog.atFine().log("No longer isWaiting, can prune %s", push.getURI());
+        names.add(push.toString());
+      }
+    }
+    return names;
+  }
+
   boolean wouldPushProject(Project.NameKey project) {
     if (!shouldReplicate(project)) {
       repLog.atFine().log("Skipping replication of project %s", project.get());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
index 4cc9974..5b24bf5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
@@ -152,6 +152,11 @@
   }
 
   @Override
+  public int getDistributionInterval() {
+    return replicationConfig.getDistributionInterval();
+  }
+
+  @Override
   public String getVersion() {
     Hasher hasher = Hashing.murmur3_128().newHasher();
     hasher.putString(replicationConfig.getVersion(), UTF_8);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 4cff68e..199acce 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -54,6 +54,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.eclipse.jgit.errors.NoRemoteRepositoryException;
@@ -319,6 +320,8 @@
       if (status.isCanceled()) {
         repLog.atInfo().log(
             "PushOp for replication to %s was canceled and thus won't be rescheduled", uri);
+      } else if (status.isExternalInflight()) {
+        repLog.atInfo().log("PushOp for replication to %s was denied externally", uri);
       } else {
         repLog.atInfo().log(
             "Rescheduling replication to %s to avoid collision with the in-flight push [%s].",
@@ -483,8 +486,8 @@
 
   private List<RemoteRefUpdate> generateUpdates(Transport tn)
       throws IOException, PermissionBackendException {
-    ProjectState projectState = projectCache.checkedGet(projectName);
-    if (projectState == null) {
+    Optional<ProjectState> projectState = projectCache.get(projectName);
+    if (!projectState.isPresent()) {
       return Collections.emptyList();
     }
 
@@ -493,7 +496,7 @@
     boolean filter;
     PermissionBackend.ForProject forProject = permissionBackend.currentUser().project(projectName);
     try {
-      projectState.checkStatePermitsRead();
+      projectState.get().checkStatePermitsRead();
       forProject.check(ProjectPermission.READ);
       filter = false;
     } catch (AuthException | ResourceConflictException e) {
@@ -513,7 +516,11 @@
         }
         local = n;
       }
-      local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build());
+      local =
+          forProject
+              .filter(local.values(), git, RefFilterOptions.builder().setFilterMeta(true).build())
+              .stream()
+              .collect(toMap(Ref::getName, r -> r));
     }
 
     List<RemoteRefUpdate> remoteUpdatesList =
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index d3d64db..99e5ee4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -45,6 +45,13 @@
   boolean isDefaultForceUpdate();
 
   /**
+   * Returns the interval in seconds for running task distribution.
+   *
+   * @return number of seconds, zero if never.
+   */
+  int getDistributionInterval();
+
+  /**
    * Returns the maximum number of ref-specs to log into the replication_log whenever a push
    * operation is completed against a replication end.
    *
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index bea30d4..2631cbe 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -86,6 +86,11 @@
   }
 
   @Override
+  public int getDistributionInterval() {
+    return config.getInt("replication", "distributionInterval", 0);
+  }
+
+  @Override
   public int getMaxRefsToLog() {
     return maxRefsToLog;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 3807e00..a8ffeec 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
@@ -35,6 +37,8 @@
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import org.eclipse.jgit.transport.URIish;
 
 /** Manages automatic replication to remote repositories. */
@@ -49,6 +53,7 @@
 
   private final ReplicationStateListener stateLog;
 
+  private final ReplicationConfig replConfig;
   private final WorkQueue workQueue;
   private final DynamicItem<EventDispatcher> dispatcher;
   private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency
@@ -56,14 +61,17 @@
   private volatile boolean running;
   private volatile boolean replaying;
   private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
+  private Distributor distributor;
 
   @Inject
   ReplicationQueue(
+      ReplicationConfig rc,
       WorkQueue wq,
       Provider<ReplicationDestinations> rd,
       DynamicItem<EventDispatcher> dis,
       ReplicationStateListeners sl,
       ReplicationTasksStorage rts) {
+    replConfig = rc;
     workQueue = wq;
     dispatcher = dis;
     destinations = rd;
@@ -80,12 +88,14 @@
       replicationTasksStorage.resetAll();
       firePendingEvents();
       fireBeforeStartupEvents();
+      distributor = new Distributor(workQueue);
     }
   }
 
   @Override
   public void stop() {
     running = false;
+    distributor.stop();
     int discarded = destinations.get().shutdown();
     if (discarded > 0) {
       repLog.atWarning().log("Canceled %d replication events during shutdown", discarded);
@@ -109,17 +119,17 @@
   @VisibleForTesting
   public void scheduleFullSync(
       Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
-    fire(project, urlMatch, PushOne.ALL_REFS, state, now);
+    fire(project, urlMatch, PushOne.ALL_REFS, state, now, false);
   }
 
   @Override
   public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
-    fire(event.getProjectName(), event.getRefName());
+    fire(event.getProjectName(), event.getRefName(), false);
   }
 
-  private void fire(String projectName, String refName) {
+  private void fire(String projectName, String refName, boolean isPersisted) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
-    fire(Project.nameKey(projectName), null, refName, state, false);
+    fire(Project.nameKey(projectName), null, refName, state, false, isPersisted);
     state.markAllPushTasksScheduled();
   }
 
@@ -128,7 +138,8 @@
       String urlMatch,
       String refName,
       ReplicationState state,
-      boolean now) {
+      boolean now,
+      boolean isPersisted) {
     if (!running) {
       stateLog.warn(
           "Replication plugin did not finish startup before event, event replication is postponed",
@@ -140,8 +151,10 @@
     for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
       if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
         for (URIish uri : cfg.getURIs(project, urlMatch)) {
-          replicationTasksStorage.create(
-              new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+          if (!isPersisted) {
+            replicationTasksStorage.create(
+                new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+          }
           cfg.schedule(project, refName, uri, state, now);
         }
       } else {
@@ -159,7 +172,7 @@
         String eventKey = String.format("%s:%s", t.project, t.ref);
         if (!eventsReplayed.contains(eventKey)) {
           repLog.atInfo().log("Firing pending task %s", eventKey);
-          fire(t.project, t.ref);
+          fire(t.project, t.ref, true);
           eventsReplayed.add(eventKey);
         }
       }
@@ -168,6 +181,30 @@
     }
   }
 
+  private void pruneCompleted() {
+    // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes.
+    // We also cannot access them by taskId since PushOnes don't have a taskId, they do have
+    // and Id, but it not the id assigned to the task in the queues. The tasks in the queue
+    // do use the same name as returned by toString() though, so that be used to correlate
+    // PushOnes with queue tasks despite their wrappers.
+    Set<String> prunableTaskNames = new HashSet<>();
+    for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
+      prunableTaskNames.addAll(destination.getPrunableTaskNames());
+    }
+
+    for (WorkQueue.Task<?> task : workQueue.getTasks()) {
+      WorkQueue.Task.State state = task.getState();
+      if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) {
+        if (task instanceof WorkQueue.ProjectTask) {
+          if (prunableTaskNames.contains(task.toString())) {
+            repLog.atFine().log("Pruning externally completed task: %s", task);
+            task.cancel(false);
+          }
+        }
+      }
+    }
+  }
+
   @Override
   public void onProjectDeleted(ProjectDeletedListener.Event event) {
     Project.NameKey p = Project.nameKey(event.getProjectName());
@@ -188,7 +225,7 @@
       String eventKey = String.format("%s:%s", event.projectName(), event.refName());
       if (!eventsReplayed.contains(eventKey)) {
         repLog.atInfo().log("Firing pending task %s", event);
-        fire(event.projectName(), event.refName());
+        fire(event.projectName(), event.refName(), false);
         eventsReplayed.add(eventKey);
       }
     }
@@ -205,4 +242,49 @@
 
     public abstract String refName();
   }
+
+  protected class Distributor implements WorkQueue.CancelableRunnable {
+    public ScheduledThreadPoolExecutor executor;
+    public ScheduledFuture<?> future;
+
+    public Distributor(WorkQueue wq) {
+      int distributionInterval = replConfig.getDistributionInterval();
+      if (distributionInterval > 0) {
+        executor = wq.createQueue(1, "Replication Distribution", false);
+        future =
+            executor.scheduleWithFixedDelay(
+                this, distributionInterval, distributionInterval, SECONDS);
+      }
+    }
+
+    @Override
+    public void run() {
+      if (!running) {
+        return;
+      }
+      try {
+        firePendingEvents();
+        pruneCompleted();
+      } catch (Exception e) {
+        repLog.atSevere().withCause(e).log("error distributing tasks");
+      }
+    }
+
+    @Override
+    public void cancel() {
+      future.cancel(true);
+    }
+
+    public void stop() {
+      if (executor != null) {
+        cancel();
+        executor.getQueue().remove(this);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "Replication Distributor";
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 3e6c4d4..554f8bb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -25,6 +25,7 @@
 import com.google.inject.Singleton;
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
@@ -46,13 +47,19 @@
  * task:
  *
  * <p><code>
- *   .../building/<tmp_name>  new replication tasks under construction
- *   .../running/<sha1>       running replication tasks
- *   .../waiting/<sha1>       outstanding replication tasks
+ *   .../building/<tmp_name>                       new replication tasks under construction
+ *   .../running/<uri_sha1>/                       lock for URI
+ *   .../running/<uri_sha1>/<task_sha1>            running replication tasks
+ *   .../waiting/<task_sha1_NN_shard>/<task_sha1>  outstanding replication tasks
  * </code>
  *
+ * <p>The URI lock is acquired by creating the directory and released by removing it.
+ *
  * <p>Tasks are moved atomically via a rename between those directories to indicate the current
  * state of each task.
+ *
+ * <p>Note: The .../waiting/<task_sha1_NN_shard> directories are never removed. This helps prevent
+ * failures when moving tasks to and from the shard directories from different hosts concurrently.
  */
 @Singleton
 public class ReplicationTasksStorage {
@@ -60,26 +67,50 @@
 
   private boolean disableDeleteForTesting;
 
-  public static class ReplicateRefUpdate {
-    public final String project;
+  public static class ReplicateRefUpdate extends UriUpdate {
     public final String ref;
-    public final String uri;
-    public final String remote;
 
-    public ReplicateRefUpdate(PushOne push, String ref) {
-      this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName());
+    public ReplicateRefUpdate(UriUpdate update, String ref) {
+      this(update.project, ref, update.uri, update.remote);
     }
 
     public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
-      this.project = project;
+      this(project, ref, uri.toASCIIString(), remote);
+    }
+
+    protected ReplicateRefUpdate(String project, String ref, String uri, String remote) {
+      super(project, uri, remote);
       this.ref = ref;
-      this.uri = uri.toASCIIString();
+    }
+
+    @Override
+    public String toString() {
+      return "ref-update " + ref + " (" + super.toString() + ")";
+    }
+  }
+
+  public static class UriUpdate {
+    public final String project;
+    public final String uri;
+    public final String remote;
+
+    public UriUpdate(PushOne push) {
+      this(push.getProjectNameKey().get(), push.getURI(), push.getRemoteName());
+    }
+
+    public UriUpdate(String project, URIish uri, String remote) {
+      this(project, uri.toASCIIString(), remote);
+    }
+
+    public UriUpdate(String project, String uri, String remote) {
+      this.project = project;
+      this.uri = uri;
       this.remote = remote;
     }
 
     @Override
     public String toString() {
-      return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
+      return "uri-update " + project + " uri:" + uri + " remote:" + remote;
     }
   }
 
@@ -107,28 +138,60 @@
     this.disableDeleteForTesting = deleteDisabled;
   }
 
-  public synchronized void start(PushOne push) {
-    for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).start();
+  public synchronized boolean start(PushOne push) {
+    UriLock lock = new UriLock(push);
+    if (!lock.acquire()) {
+      return false;
     }
+
+    boolean started = false;
+    for (String ref : push.getRefs()) {
+      started = new Task(lock, ref).start() || started;
+    }
+
+    if (!started) { // No tasks left, likely replicated externally
+      lock.release();
+    }
+    return started;
   }
 
   public synchronized void reset(PushOne push) {
+    UriLock lock = new UriLock(push);
     for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).reset();
+      new Task(lock, ref).reset();
     }
+    lock.release();
   }
 
   public synchronized void resetAll() {
-    for (ReplicateRefUpdate r : listRunning()) {
-      new Task(r).reset();
+    try (DirectoryStream<Path> dirs = Files.newDirectoryStream(createDir(runningUpdates))) {
+      for (Path dir : dirs) {
+        UriLock lock = null;
+        for (ReplicateRefUpdate u : list(dir)) {
+          if (lock == null) {
+            lock = new UriLock(u);
+          }
+          new Task(u).reset();
+        }
+        if (lock != null) {
+          lock.release();
+        }
+      }
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Error while aborting running tasks");
     }
   }
 
-  public synchronized void finish(PushOne push) {
-    for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).finish();
+  public boolean isWaiting(PushOne push) {
+    return push.getRefs().stream().map(ref -> new Task(push, ref)).anyMatch(Task::isWaiting);
+  }
+
+  public void finish(PushOne push) {
+    UriLock lock = new UriLock(push);
+    for (ReplicateRefUpdate r : list(lock.runningDir)) {
+      new Task(lock, r.ref).finish();
     }
+    lock.release();
   }
 
   public synchronized List<ReplicateRefUpdate> listWaiting() {
@@ -180,20 +243,73 @@
     }
   }
 
+  private class UriLock {
+    public final UriUpdate update;
+    public final String uriKey;
+    public final Path runningDir;
+
+    public UriLock(PushOne push) {
+      this(new UriUpdate(push));
+    }
+
+    public UriLock(UriUpdate update) {
+      this.update = update;
+      uriKey = sha1(update.uri).name();
+      runningDir = createDir(runningUpdates).resolve(uriKey);
+    }
+
+    public boolean acquire() {
+      try {
+        logger.atFine().log("MKDIR %s %s", runningDir, updateLog());
+        Files.createDirectory(runningDir);
+        return true;
+      } catch (FileAlreadyExistsException e) {
+        return false; // already running, likely externally
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while starting uri %s", uriKey);
+        return true; // safer to risk a duplicate than to skip it
+      }
+    }
+
+    public void release() {
+      if (disableDeleteForTesting) {
+        logger.atFine().log("DELETE %s %s DISABLED", runningDir, updateLog());
+        return;
+      }
+
+      try {
+        logger.atFine().log("DELETE %s %s", runningDir, updateLog());
+        Files.delete(runningDir);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while releasing uri %s", uriKey);
+      }
+    }
+
+    private String updateLog() {
+      return String.format("(%s => %s)", update.project, update.uri);
+    }
+  }
+
   private class Task {
     public final ReplicateRefUpdate update;
-    public final String json;
     public final String taskKey;
     public final Path running;
     public final Path waiting;
 
-    public Task(ReplicateRefUpdate update) {
-      this.update = update;
-      json = GSON.toJson(update) + "\n";
+    public Task(ReplicateRefUpdate r) {
+      this(new UriLock(r), r.ref);
+    }
+
+    public Task(PushOne push, String ref) {
+      this(new UriLock(push), ref);
+    }
+
+    public Task(UriLock lock, String ref) {
+      update = new ReplicateRefUpdate(lock.update, ref);
       String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
       taskKey = sha1(key).name();
-      running = createDir(runningUpdates).resolve(taskKey);
-      waiting = createDir(waitingUpdates).resolve(taskKey);
+      running = lock.runningDir.resolve(taskKey);
+      waiting = createDir(waitingUpdates.resolve(taskKey.substring(0, 2))).resolve(taskKey);
     }
 
     public String create() {
@@ -201,6 +317,7 @@
         return taskKey;
       }
 
+      String json = GSON.toJson(update) + "\n";
       try {
         Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null);
         logger.atFine().log("CREATE %s %s", tmp, updateLog());
@@ -213,14 +330,19 @@
       return taskKey;
     }
 
-    public void start() {
+    public boolean start() {
       rename(waiting, running);
+      return Files.exists(running);
     }
 
     public void reset() {
       rename(running, waiting);
     }
 
+    public boolean isWaiting() {
+      return Files.exists(waiting);
+    }
+
     public void finish() {
       if (disableDeleteForTesting) {
         logger.atFine().log("DELETE %s %s DISABLED", running, updateLog());
@@ -231,7 +353,7 @@
         logger.atFine().log("DELETE %s %s", running, updateLog());
         Files.delete(running);
       } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+        logger.atSevere().withCause(e).log("Error while finishing task %s", taskKey);
       }
     }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
index bcb1e2f..f7071d8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
@@ -27,6 +27,10 @@
     return new RunwayStatus(false, inFlightPushId);
   }
 
+  public static RunwayStatus deniedExternal() {
+    return new RunwayStatus(false, -1);
+  }
+
   private final boolean allowed;
   private final int inFlightPushId;
 
@@ -43,6 +47,10 @@
     return !allowed && inFlightPushId == 0;
   }
 
+  public boolean isExternalInflight() {
+    return !allowed && inFlightPushId == -1;
+  }
+
   public int getInFlightPushId() {
     return inFlightPushId;
   }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 0aad73b..fc3352d 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -94,6 +94,20 @@
 :	Timeout for SSH connections. If 0, there is no timeout and
         the client waits indefinitely. By default, 2 minutes.
 
+replication.distributionInterval
+:	Interval in seconds for running the replication distributor. When
+	run, the replication distributor will add all persisted waiting tasks
+	to the queue to ensure that externally loaded tasks are visible to
+	the current process. If zero, turn off the replication distributor. By
+	default, zero.
+
+	Turning this on is likely only useful when there are other processes
+	(such as other masters in the same cluster) writing to the same
+	persistence store. To ensure that updates are seen well before their
+	replicationDelay expires when the distributor is used, the recommended
+	value for this is approximately the smallest remote.NAME.replicationDelay
+	divided by 5.
+
 replication.lockErrorMaxRetries
 :	Number of times to retry a replication operation if a lock
 	error is detected.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index c09fcd1..94f0dc4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -41,6 +41,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -245,9 +246,9 @@
     return push;
   }
 
-  private void setupProjectCacheMock() throws IOException {
+  private void setupProjectCacheMock() {
     projectCacheMock = mock(ProjectCache.class);
-    when(projectCacheMock.checkedGet(projectNameKey)).thenReturn(projectStateMock);
+    when(projectCacheMock.get(projectNameKey)).thenReturn(Optional.of(projectStateMock));
   }
 
   private void setupTransportMock() throws NotSupportedException, TransportException {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
index efacae7..79b05cc 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
@@ -65,4 +65,25 @@
     assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
     assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
   }
+
+  @Test
+  public void shouldSkipFetchRefSpecs() throws Exception {
+    FileBasedConfig config = newReplicationConfig();
+    String pushRemote = "pushRemote";
+    final String aRemoteURL = "ssh://somewhere/${name}.git";
+    config.setString("remote", pushRemote, "url", aRemoteURL);
+
+    String fetchRemote = "fetchRemote";
+    config.setString("remote", fetchRemote, "url", aRemoteURL);
+    config.setString("remote", fetchRemote, "fetch", "refs/*:refs/*");
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(newReplicationFileBasedConfig());
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+
+    assertThatIsDestination(destinations.get(0), pushRemote, aRemoteURL);
+  }
 }