Merge branch 'stable-3.1'

* stable-3.1:
  Clarify the limitations of gerrit+ssh in replication.config
  PushOne: Don't log refs to push at ERROR level
  PushOne: Remove redundant 'throws' declarations
  Destination: Further improve the debug logs when not pushing
  Improve logging of why a project or ref is not replicated
  ReplicationQueue: Add handling of null ReplicationTasksStorage.ReplicateRefUpdate
  Destination: Extract repeated string to a constant
  Fix flaky ReplicationConfig tests
  FanoutReplicationConfig: Make methods static where possible
  Factor out and simplify the check if Path is a *.config file
  Consistently use Files.list in FanoutReplicationConfig
  Add multiple replication configuration file support
  Use ReplicationConfig interface instead of ReplicationFileBasedConfig
  Fix failing AutoReloadConfigDecorator tests
  Move replication config parsing out of DestinationsCollection
  Extract destinations logic into a new class
  ReplicationQueue: Migrate to Flogger

Adapt existing replication logging code in master to Flogger,
to allow the merged code to build successfully on master.

Change-Id: Ice4e54ab5c1a1a53894432c3687137e4b3603c41
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index fe5dbad..782ff4f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -67,6 +67,11 @@
   }
 
   @Override
+  public int getDistributionInterval() {
+    return currentConfig.getDistributionInterval();
+  }
+
+  @Override
   public synchronized int getMaxRefsToLog() {
     return currentConfig.getMaxRefsToLog();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java
index bb9097e..66251a5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.flogger.FluentLogger;
@@ -53,6 +55,11 @@
         continue;
       }
 
+      if (!c.getFetchRefSpecs().isEmpty()) {
+        repLog.atInfo().log("Ignore '%s' endpoint: not a 'push' target", c.getName());
+        continue;
+      }
+
       // If destination for push is not set assume equal to source.
       for (RefSpec ref : c.getPushRefSpecs()) {
         if (ref.getDestination() == null) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 467db2c..4aa74c7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.google.gerrit.server.project.ProjectCache.noSuchProject;
 import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName;
 import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
 import static org.eclipse.jgit.transport.RemoteRefUpdate.Status.NON_EXISTING;
@@ -30,6 +31,7 @@
 import com.google.gerrit.entities.BranchNameKey;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.exceptions.StorageException;
 import com.google.gerrit.extensions.config.FactoryModule;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
@@ -65,6 +67,7 @@
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -307,16 +310,12 @@
               () -> {
                 ProjectState projectState;
                 try {
-                  projectState = projectCache.checkedGet(project);
-                } catch (IOException e) {
+                  projectState = projectCache.get(project).orElseThrow(noSuchProject(project));
+                } catch (StorageException e) {
                   repLog.atWarning().withCause(e).log(
                       "Error reading project %s from cache", project);
                   return false;
                 }
-                if (projectState == null) {
-                  repLog.atFine().log("Project %s does not exist", project);
-                  throw new NoSuchProjectException(project);
-                }
                 if (!projectState.statePermitsRead()) {
                   repLog.atFine().log("Project %s does not permit read", project);
                   return false;
@@ -359,13 +358,10 @@
               () -> {
                 ProjectState projectState;
                 try {
-                  projectState = projectCache.checkedGet(project);
-                } catch (IOException e) {
+                  projectState = projectCache.get(project).orElseThrow(noSuchProject(project));
+                } catch (StorageException e) {
                   return false;
                 }
-                if (projectState == null) {
-                  throw new NoSuchProjectException(project);
-                }
                 return shouldReplicate(projectState, userProvider.get());
               })
           .call();
@@ -579,7 +575,9 @@
       if (inFlightOp != null) {
         return RunwayStatus.denied(inFlightOp.getId());
       }
-      replicationTasksStorage.get().start(op);
+      if (!replicationTasksStorage.get().start(op)) {
+        return RunwayStatus.deniedExternal();
+      }
       inFlight.put(op.getURI(), op);
     }
     return RunwayStatus.allowed();
@@ -592,6 +590,17 @@
     }
   }
 
+  public Set<String> getPrunableTaskNames() {
+    Set<String> names = new HashSet<>();
+    for (PushOne push : pending.values()) {
+      if (!replicationTasksStorage.get().isWaiting(push)) {
+        repLog.atFine().log("No longer isWaiting, can prune %s", push.getURI());
+        names.add(push.toString());
+      }
+    }
+    return names;
+  }
+
   boolean wouldPushProject(Project.NameKey project) {
     if (!shouldReplicate(project)) {
       repLog.atFine().log("Skipping replication of project %s", project.get());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
index 4cc9974..5b24bf5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
@@ -152,6 +152,11 @@
   }
 
   @Override
+  public int getDistributionInterval() {
+    return replicationConfig.getDistributionInterval();
+  }
+
+  @Override
   public String getVersion() {
     Hasher hasher = Hashing.murmur3_128().newHasher();
     hasher.putString(replicationConfig.getVersion(), UTF_8);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 4be2742..36f52fa 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -54,6 +54,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.eclipse.jgit.errors.NoRemoteRepositoryException;
@@ -319,6 +320,8 @@
       if (status.isCanceled()) {
         repLog.atInfo().log(
             "PushOp for replication to %s was canceled and thus won't be rescheduled", uri);
+      } else if (status.isExternalInflight()) {
+        repLog.atInfo().log("PushOp for replication to %s was denied externally", uri);
       } else {
         repLog.atInfo().log(
             "Rescheduling replication to %s to avoid collision with the in-flight push [%s].",
@@ -486,8 +489,8 @@
 
   private List<RemoteRefUpdate> generateUpdates(Transport tn)
       throws IOException, PermissionBackendException {
-    ProjectState projectState = projectCache.checkedGet(projectName);
-    if (projectState == null) {
+    Optional<ProjectState> projectState = projectCache.get(projectName);
+    if (!projectState.isPresent()) {
       return Collections.emptyList();
     }
 
@@ -496,7 +499,7 @@
     boolean filter;
     PermissionBackend.ForProject forProject = permissionBackend.currentUser().project(projectName);
     try {
-      projectState.checkStatePermitsRead();
+      projectState.get().checkStatePermitsRead();
       forProject.check(ProjectPermission.READ);
       filter = false;
     } catch (AuthException | ResourceConflictException e) {
@@ -516,7 +519,11 @@
         }
         local = n;
       }
-      local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build());
+      local =
+          forProject
+              .filter(local.values(), git, RefFilterOptions.builder().setFilterMeta(true).build())
+              .stream()
+              .collect(toMap(Ref::getName, r -> r));
     }
 
     List<RemoteRefUpdate> remoteUpdatesList =
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index d3d64db..99e5ee4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -45,6 +45,13 @@
   boolean isDefaultForceUpdate();
 
   /**
+   * Returns the interval in seconds for running task distribution.
+   *
+   * @return number of seconds, zero if never.
+   */
+  int getDistributionInterval();
+
+  /**
    * Returns the maximum number of ref-specs to log into the replication_log whenever a push
    * operation is completed against a replication end.
    *
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index bea30d4..2631cbe 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -86,6 +86,11 @@
   }
 
   @Override
+  public int getDistributionInterval() {
+    return config.getInt("replication", "distributionInterval", 0);
+  }
+
+  @Override
   public int getMaxRefsToLog() {
     return maxRefsToLog;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 3807e00..a8ffeec 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
@@ -35,6 +37,8 @@
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import org.eclipse.jgit.transport.URIish;
 
 /** Manages automatic replication to remote repositories. */
@@ -49,6 +53,7 @@
 
   private final ReplicationStateListener stateLog;
 
+  private final ReplicationConfig replConfig;
   private final WorkQueue workQueue;
   private final DynamicItem<EventDispatcher> dispatcher;
   private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency
@@ -56,14 +61,17 @@
   private volatile boolean running;
   private volatile boolean replaying;
   private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
+  private Distributor distributor;
 
   @Inject
   ReplicationQueue(
+      ReplicationConfig rc,
       WorkQueue wq,
       Provider<ReplicationDestinations> rd,
       DynamicItem<EventDispatcher> dis,
       ReplicationStateListeners sl,
       ReplicationTasksStorage rts) {
+    replConfig = rc;
     workQueue = wq;
     dispatcher = dis;
     destinations = rd;
@@ -80,12 +88,14 @@
       replicationTasksStorage.resetAll();
       firePendingEvents();
       fireBeforeStartupEvents();
+      distributor = new Distributor(workQueue);
     }
   }
 
   @Override
   public void stop() {
     running = false;
+    distributor.stop();
     int discarded = destinations.get().shutdown();
     if (discarded > 0) {
       repLog.atWarning().log("Canceled %d replication events during shutdown", discarded);
@@ -109,17 +119,17 @@
   @VisibleForTesting
   public void scheduleFullSync(
       Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
-    fire(project, urlMatch, PushOne.ALL_REFS, state, now);
+    fire(project, urlMatch, PushOne.ALL_REFS, state, now, false);
   }
 
   @Override
   public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
-    fire(event.getProjectName(), event.getRefName());
+    fire(event.getProjectName(), event.getRefName(), false);
   }
 
-  private void fire(String projectName, String refName) {
+  private void fire(String projectName, String refName, boolean isPersisted) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
-    fire(Project.nameKey(projectName), null, refName, state, false);
+    fire(Project.nameKey(projectName), null, refName, state, false, isPersisted);
     state.markAllPushTasksScheduled();
   }
 
@@ -128,7 +138,8 @@
       String urlMatch,
       String refName,
       ReplicationState state,
-      boolean now) {
+      boolean now,
+      boolean isPersisted) {
     if (!running) {
       stateLog.warn(
           "Replication plugin did not finish startup before event, event replication is postponed",
@@ -140,8 +151,10 @@
     for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
       if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
         for (URIish uri : cfg.getURIs(project, urlMatch)) {
-          replicationTasksStorage.create(
-              new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+          if (!isPersisted) {
+            replicationTasksStorage.create(
+                new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+          }
           cfg.schedule(project, refName, uri, state, now);
         }
       } else {
@@ -159,7 +172,7 @@
         String eventKey = String.format("%s:%s", t.project, t.ref);
         if (!eventsReplayed.contains(eventKey)) {
           repLog.atInfo().log("Firing pending task %s", eventKey);
-          fire(t.project, t.ref);
+          fire(t.project, t.ref, true);
           eventsReplayed.add(eventKey);
         }
       }
@@ -168,6 +181,30 @@
     }
   }
 
+  private void pruneCompleted() {
+    // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes.
+    // We also cannot access them by taskId since PushOnes don't have a taskId, they do have
+    // and Id, but it not the id assigned to the task in the queues. The tasks in the queue
+    // do use the same name as returned by toString() though, so that be used to correlate
+    // PushOnes with queue tasks despite their wrappers.
+    Set<String> prunableTaskNames = new HashSet<>();
+    for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
+      prunableTaskNames.addAll(destination.getPrunableTaskNames());
+    }
+
+    for (WorkQueue.Task<?> task : workQueue.getTasks()) {
+      WorkQueue.Task.State state = task.getState();
+      if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) {
+        if (task instanceof WorkQueue.ProjectTask) {
+          if (prunableTaskNames.contains(task.toString())) {
+            repLog.atFine().log("Pruning externally completed task: %s", task);
+            task.cancel(false);
+          }
+        }
+      }
+    }
+  }
+
   @Override
   public void onProjectDeleted(ProjectDeletedListener.Event event) {
     Project.NameKey p = Project.nameKey(event.getProjectName());
@@ -188,7 +225,7 @@
       String eventKey = String.format("%s:%s", event.projectName(), event.refName());
       if (!eventsReplayed.contains(eventKey)) {
         repLog.atInfo().log("Firing pending task %s", event);
-        fire(event.projectName(), event.refName());
+        fire(event.projectName(), event.refName(), false);
         eventsReplayed.add(eventKey);
       }
     }
@@ -205,4 +242,49 @@
 
     public abstract String refName();
   }
+
+  protected class Distributor implements WorkQueue.CancelableRunnable {
+    public ScheduledThreadPoolExecutor executor;
+    public ScheduledFuture<?> future;
+
+    public Distributor(WorkQueue wq) {
+      int distributionInterval = replConfig.getDistributionInterval();
+      if (distributionInterval > 0) {
+        executor = wq.createQueue(1, "Replication Distribution", false);
+        future =
+            executor.scheduleWithFixedDelay(
+                this, distributionInterval, distributionInterval, SECONDS);
+      }
+    }
+
+    @Override
+    public void run() {
+      if (!running) {
+        return;
+      }
+      try {
+        firePendingEvents();
+        pruneCompleted();
+      } catch (Exception e) {
+        repLog.atSevere().withCause(e).log("error distributing tasks");
+      }
+    }
+
+    @Override
+    public void cancel() {
+      future.cancel(true);
+    }
+
+    public void stop() {
+      if (executor != null) {
+        cancel();
+        executor.getQueue().remove(this);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "Replication Distributor";
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 3e6c4d4..554f8bb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -25,6 +25,7 @@
 import com.google.inject.Singleton;
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
@@ -46,13 +47,19 @@
  * task:
  *
  * <p><code>
- *   .../building/<tmp_name>  new replication tasks under construction
- *   .../running/<sha1>       running replication tasks
- *   .../waiting/<sha1>       outstanding replication tasks
+ *   .../building/<tmp_name>                       new replication tasks under construction
+ *   .../running/<uri_sha1>/                       lock for URI
+ *   .../running/<uri_sha1>/<task_sha1>            running replication tasks
+ *   .../waiting/<task_sha1_NN_shard>/<task_sha1>  outstanding replication tasks
  * </code>
  *
+ * <p>The URI lock is acquired by creating the directory and released by removing it.
+ *
  * <p>Tasks are moved atomically via a rename between those directories to indicate the current
  * state of each task.
+ *
+ * <p>Note: The .../waiting/<task_sha1_NN_shard> directories are never removed. This helps prevent
+ * failures when moving tasks to and from the shard directories from different hosts concurrently.
  */
 @Singleton
 public class ReplicationTasksStorage {
@@ -60,26 +67,50 @@
 
   private boolean disableDeleteForTesting;
 
-  public static class ReplicateRefUpdate {
-    public final String project;
+  public static class ReplicateRefUpdate extends UriUpdate {
     public final String ref;
-    public final String uri;
-    public final String remote;
 
-    public ReplicateRefUpdate(PushOne push, String ref) {
-      this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName());
+    public ReplicateRefUpdate(UriUpdate update, String ref) {
+      this(update.project, ref, update.uri, update.remote);
     }
 
     public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
-      this.project = project;
+      this(project, ref, uri.toASCIIString(), remote);
+    }
+
+    protected ReplicateRefUpdate(String project, String ref, String uri, String remote) {
+      super(project, uri, remote);
       this.ref = ref;
-      this.uri = uri.toASCIIString();
+    }
+
+    @Override
+    public String toString() {
+      return "ref-update " + ref + " (" + super.toString() + ")";
+    }
+  }
+
+  public static class UriUpdate {
+    public final String project;
+    public final String uri;
+    public final String remote;
+
+    public UriUpdate(PushOne push) {
+      this(push.getProjectNameKey().get(), push.getURI(), push.getRemoteName());
+    }
+
+    public UriUpdate(String project, URIish uri, String remote) {
+      this(project, uri.toASCIIString(), remote);
+    }
+
+    public UriUpdate(String project, String uri, String remote) {
+      this.project = project;
+      this.uri = uri;
       this.remote = remote;
     }
 
     @Override
     public String toString() {
-      return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
+      return "uri-update " + project + " uri:" + uri + " remote:" + remote;
     }
   }
 
@@ -107,28 +138,60 @@
     this.disableDeleteForTesting = deleteDisabled;
   }
 
-  public synchronized void start(PushOne push) {
-    for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).start();
+  public synchronized boolean start(PushOne push) {
+    UriLock lock = new UriLock(push);
+    if (!lock.acquire()) {
+      return false;
     }
+
+    boolean started = false;
+    for (String ref : push.getRefs()) {
+      started = new Task(lock, ref).start() || started;
+    }
+
+    if (!started) { // No tasks left, likely replicated externally
+      lock.release();
+    }
+    return started;
   }
 
   public synchronized void reset(PushOne push) {
+    UriLock lock = new UriLock(push);
     for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).reset();
+      new Task(lock, ref).reset();
     }
+    lock.release();
   }
 
   public synchronized void resetAll() {
-    for (ReplicateRefUpdate r : listRunning()) {
-      new Task(r).reset();
+    try (DirectoryStream<Path> dirs = Files.newDirectoryStream(createDir(runningUpdates))) {
+      for (Path dir : dirs) {
+        UriLock lock = null;
+        for (ReplicateRefUpdate u : list(dir)) {
+          if (lock == null) {
+            lock = new UriLock(u);
+          }
+          new Task(u).reset();
+        }
+        if (lock != null) {
+          lock.release();
+        }
+      }
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Error while aborting running tasks");
     }
   }
 
-  public synchronized void finish(PushOne push) {
-    for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).finish();
+  public boolean isWaiting(PushOne push) {
+    return push.getRefs().stream().map(ref -> new Task(push, ref)).anyMatch(Task::isWaiting);
+  }
+
+  public void finish(PushOne push) {
+    UriLock lock = new UriLock(push);
+    for (ReplicateRefUpdate r : list(lock.runningDir)) {
+      new Task(lock, r.ref).finish();
     }
+    lock.release();
   }
 
   public synchronized List<ReplicateRefUpdate> listWaiting() {
@@ -180,20 +243,73 @@
     }
   }
 
+  private class UriLock {
+    public final UriUpdate update;
+    public final String uriKey;
+    public final Path runningDir;
+
+    public UriLock(PushOne push) {
+      this(new UriUpdate(push));
+    }
+
+    public UriLock(UriUpdate update) {
+      this.update = update;
+      uriKey = sha1(update.uri).name();
+      runningDir = createDir(runningUpdates).resolve(uriKey);
+    }
+
+    public boolean acquire() {
+      try {
+        logger.atFine().log("MKDIR %s %s", runningDir, updateLog());
+        Files.createDirectory(runningDir);
+        return true;
+      } catch (FileAlreadyExistsException e) {
+        return false; // already running, likely externally
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while starting uri %s", uriKey);
+        return true; // safer to risk a duplicate than to skip it
+      }
+    }
+
+    public void release() {
+      if (disableDeleteForTesting) {
+        logger.atFine().log("DELETE %s %s DISABLED", runningDir, updateLog());
+        return;
+      }
+
+      try {
+        logger.atFine().log("DELETE %s %s", runningDir, updateLog());
+        Files.delete(runningDir);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while releasing uri %s", uriKey);
+      }
+    }
+
+    private String updateLog() {
+      return String.format("(%s => %s)", update.project, update.uri);
+    }
+  }
+
   private class Task {
     public final ReplicateRefUpdate update;
-    public final String json;
     public final String taskKey;
     public final Path running;
     public final Path waiting;
 
-    public Task(ReplicateRefUpdate update) {
-      this.update = update;
-      json = GSON.toJson(update) + "\n";
+    public Task(ReplicateRefUpdate r) {
+      this(new UriLock(r), r.ref);
+    }
+
+    public Task(PushOne push, String ref) {
+      this(new UriLock(push), ref);
+    }
+
+    public Task(UriLock lock, String ref) {
+      update = new ReplicateRefUpdate(lock.update, ref);
       String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
       taskKey = sha1(key).name();
-      running = createDir(runningUpdates).resolve(taskKey);
-      waiting = createDir(waitingUpdates).resolve(taskKey);
+      running = lock.runningDir.resolve(taskKey);
+      waiting = createDir(waitingUpdates.resolve(taskKey.substring(0, 2))).resolve(taskKey);
     }
 
     public String create() {
@@ -201,6 +317,7 @@
         return taskKey;
       }
 
+      String json = GSON.toJson(update) + "\n";
       try {
         Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null);
         logger.atFine().log("CREATE %s %s", tmp, updateLog());
@@ -213,14 +330,19 @@
       return taskKey;
     }
 
-    public void start() {
+    public boolean start() {
       rename(waiting, running);
+      return Files.exists(running);
     }
 
     public void reset() {
       rename(running, waiting);
     }
 
+    public boolean isWaiting() {
+      return Files.exists(waiting);
+    }
+
     public void finish() {
       if (disableDeleteForTesting) {
         logger.atFine().log("DELETE %s %s DISABLED", running, updateLog());
@@ -231,7 +353,7 @@
         logger.atFine().log("DELETE %s %s", running, updateLog());
         Files.delete(running);
       } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+        logger.atSevere().withCause(e).log("Error while finishing task %s", taskKey);
       }
     }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
index bcb1e2f..f7071d8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
@@ -27,6 +27,10 @@
     return new RunwayStatus(false, inFlightPushId);
   }
 
+  public static RunwayStatus deniedExternal() {
+    return new RunwayStatus(false, -1);
+  }
+
   private final boolean allowed;
   private final int inFlightPushId;
 
@@ -43,6 +47,10 @@
     return !allowed && inFlightPushId == 0;
   }
 
+  public boolean isExternalInflight() {
+    return !allowed && inFlightPushId == -1;
+  }
+
   public int getInFlightPushId() {
     return inFlightPushId;
   }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 0aad73b..fc3352d 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -94,6 +94,20 @@
 :	Timeout for SSH connections. If 0, there is no timeout and
         the client waits indefinitely. By default, 2 minutes.
 
+replication.distributionInterval
+:	Interval in seconds for running the replication distributor. When
+	run, the replication distributor will add all persisted waiting tasks
+	to the queue to ensure that externally loaded tasks are visible to
+	the current process. If zero, turn off the replication distributor. By
+	default, zero.
+
+	Turning this on is likely only useful when there are other processes
+	(such as other masters in the same cluster) writing to the same
+	persistence store. To ensure that updates are seen well before their
+	replicationDelay expires when the distributor is used, the recommended
+	value for this is approximately the smallest remote.NAME.replicationDelay
+	divided by 5.
+
 replication.lockErrorMaxRetries
 :	Number of times to retry a replication operation if a lock
 	error is detected.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index c09fcd1..94f0dc4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -41,6 +41,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -245,9 +246,9 @@
     return push;
   }
 
-  private void setupProjectCacheMock() throws IOException {
+  private void setupProjectCacheMock() {
     projectCacheMock = mock(ProjectCache.class);
-    when(projectCacheMock.checkedGet(projectNameKey)).thenReturn(projectStateMock);
+    when(projectCacheMock.get(projectNameKey)).thenReturn(Optional.of(projectStateMock));
   }
 
   private void setupTransportMock() throws NotSupportedException, TransportException {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
index efacae7..79b05cc 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
@@ -65,4 +65,25 @@
     assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
     assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
   }
+
+  @Test
+  public void shouldSkipFetchRefSpecs() throws Exception {
+    FileBasedConfig config = newReplicationConfig();
+    String pushRemote = "pushRemote";
+    final String aRemoteURL = "ssh://somewhere/${name}.git";
+    config.setString("remote", pushRemote, "url", aRemoteURL);
+
+    String fetchRemote = "fetchRemote";
+    config.setString("remote", fetchRemote, "url", aRemoteURL);
+    config.setString("remote", fetchRemote, "fetch", "refs/*:refs/*");
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(newReplicationFileBasedConfig());
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+
+    assertThatIsDestination(destinations.get(0), pushRemote, aRemoteURL);
+  }
 }