Merge branch 'stable-3.1'

* stable-3.1:
  Fix log statement formatting
  GerritRestApi: Use UTF_8 from StandardCharsets instead of Guava
  Fix NPE in PushResultProcessing
  Fix start --wait to track in-flight collisions and to not fail
  Improve URLmatching to match real Urls
  Convert PushResultProcessing to an interface
  ReplicationQueue: Revert back from Flogger to slf4j logging

The change "Revert back from Flogger to slf4j logging" is omitted from
this merge. Since change I0d18f8931 which was submitted earlier on the
master branch, Flogger logging is fixed.

Change-Id: Idb497116ee621b2fc2e984d1be679412c002fb48
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index fe5dbad..782ff4f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -67,6 +67,11 @@
   }
 
   @Override
+  public int getDistributionInterval() {
+    return currentConfig.getDistributionInterval();
+  }
+
+  @Override
   public synchronized int getMaxRefsToLog() {
     return currentConfig.getMaxRefsToLog();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java
index bb9097e..66251a5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.flogger.FluentLogger;
@@ -53,6 +55,11 @@
         continue;
       }
 
+      if (!c.getFetchRefSpecs().isEmpty()) {
+        repLog.atInfo().log("Ignore '%s' endpoint: not a 'push' target", c.getName());
+        continue;
+      }
+
       // If destination for push is not set assume equal to source.
       for (RefSpec ref : c.getPushRefSpecs()) {
         if (ref.getDestination() == null) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
index fa26e82..424648e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
@@ -63,7 +63,8 @@
       return true;
     }
 
-    repLog.warn("Cannot create new project {} on remote site {}.", projectName, replicateURI);
+    repLog.atWarning().log(
+        "Cannot create new project %s on remote site %s.", projectName, replicateURI);
     return false;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
index 4617672..8ea7227 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
@@ -55,7 +55,7 @@
       return;
     }
 
-    repLog.warn("Cannot delete project {} on remote site {}.", project, replicateURI);
+    repLog.atWarning().log("Cannot delete project %s on remote site %s.", project, replicateURI);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 029dc83..f485c95 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.google.gerrit.server.project.ProjectCache.noSuchProject;
 import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName;
 import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
 import static org.eclipse.jgit.transport.RemoteRefUpdate.Status.NON_EXISTING;
@@ -30,6 +31,7 @@
 import com.google.gerrit.entities.BranchNameKey;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.exceptions.StorageException;
 import com.google.gerrit.extensions.config.FactoryModule;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
@@ -51,6 +53,7 @@
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.gerrit.server.project.ProjectState;
 import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.util.logging.NamedFluentLogger;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.Provider;
@@ -64,10 +67,13 @@
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -81,10 +87,9 @@
 import org.eclipse.jgit.transport.RemoteConfig;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.URIish;
-import org.slf4j.Logger;
 
 public class Destination {
-  private static final Logger repLog = ReplicationQueue.repLog;
+  private static final NamedFluentLogger repLog = ReplicationQueue.repLog;
 
   private static final String PROJECT_NOT_AVAILABLE = "source project %s not available";
 
@@ -94,7 +99,9 @@
 
   private final ReplicationStateListener stateLog;
   private final Object stateLock = new Object();
-  private final Map<URIish, PushOne> pending = new HashMap<>();
+  // writes are covered by the stateLock, but some reads are still
+  // allowed without the lock
+  private final ConcurrentMap<URIish, PushOne> pending = new ConcurrentHashMap<>();
   private final Map<URIish, PushOne> inFlight = new HashMap<>();
   private final PushOne.Factory opFactory;
   private final DeleteProjectTask.Factory deleteProjectFactory;
@@ -156,7 +163,7 @@
           builder.add(g.getUUID());
           addRecursiveParents(g.getUUID(), builder, groupIncludeCache);
         } else {
-          repLog.warn("Group \"{}\" not recognized, removing from authGroup", name);
+          repLog.atWarning().log("Group \"%s\" not recognized, removing from authGroup", name);
         }
       }
       remoteUser = new RemoteSiteUser(new ListGroupMembership(builder.build()));
@@ -240,11 +247,9 @@
         int numInFlight = inFlight.size();
 
         if (numPending > 0 || numInFlight > 0) {
-          repLog.warn(
-              "Cancelling replication events (pending={}, inFlight={}) for destination {}",
-              numPending,
-              numInFlight,
-              getRemoteConfigName());
+          repLog.atWarning().log(
+              "Cancelling replication events (pending=%d, inFlight=%d) for destination %s",
+              numPending, numInFlight, getRemoteConfigName());
 
           foreachPushOp(
               pending,
@@ -280,7 +285,8 @@
     if (!config.replicateHiddenProjects()
         && state.getProject().getState()
             == com.google.gerrit.extensions.client.ProjectState.HIDDEN) {
-      repLog.debug("Project {} is hidden and replication of hidden projects is disabled", name);
+      repLog.atFine().log(
+          "Project %s is hidden and replication of hidden projects is disabled", name);
       return false;
     }
 
@@ -293,10 +299,9 @@
       permissionBackend.user(user).project(state.getNameKey()).check(permissionToCheck);
       return true;
     } catch (AuthException e) {
-      repLog.debug(
-          "Project {} is not visible to current user {}",
-          name,
-          user.getUserName().orElse("unknown"));
+      repLog.atFine().log(
+          "Project %s is not visible to current user %s",
+          name, user.getUserName().orElse("unknown"));
       return false;
     }
   }
@@ -309,21 +314,22 @@
               () -> {
                 ProjectState projectState;
                 try {
-                  projectState = projectCache.checkedGet(project);
-                } catch (IOException e) {
-                  repLog.warn("Error reading project {} from cache", project, e);
+                  projectState = projectCache.get(project).orElseThrow(noSuchProject(project));
+                } catch (StorageException e) {
+                  repLog.atWarning().withCause(e).log(
+                      "Error reading project %s from cache", project);
                   return false;
                 }
                 if (projectState == null) {
-                  repLog.debug("Project {} does not exist", project);
+                  repLog.atFine().log("Project %s does not exist", project);
                   throw new NoSuchProjectException(project);
                 }
                 if (!projectState.statePermitsRead()) {
-                  repLog.debug("Project {} does not permit read", project);
+                  repLog.atFine().log("Project %s does not permit read", project);
                   return false;
                 }
                 if (!shouldReplicate(projectState, userProvider.get())) {
-                  repLog.debug("Project {} should not be replicated", project);
+                  repLog.atFine().log("Project %s should not be replicated", project);
                   return false;
                 }
                 if (PushOne.ALL_REFS.equals(ref)) {
@@ -336,11 +342,9 @@
                       .ref(ref)
                       .check(RefPermission.READ);
                 } catch (AuthException e) {
-                  repLog.debug(
-                      "Ref {} on project {} is not visible to calling user {}",
-                      ref,
-                      project,
-                      userProvider.get().getUserName().orElse("unknown"));
+                  repLog.atFine().log(
+                      "Ref %s on project %s is not visible to calling user %s",
+                      ref, project, userProvider.get().getUserName().orElse("unknown"));
                   return false;
                 }
                 return true;
@@ -362,13 +366,10 @@
               () -> {
                 ProjectState projectState;
                 try {
-                  projectState = projectCache.checkedGet(project);
-                } catch (IOException e) {
+                  projectState = projectCache.get(project).orElseThrow(noSuchProject(project));
+                } catch (StorageException e) {
                   return false;
                 }
-                if (projectState == null) {
-                  throw new NoSuchProjectException(project);
-                }
                 return shouldReplicate(projectState, userProvider.get());
               })
           .call();
@@ -388,10 +389,10 @@
   void schedule(
       Project.NameKey project, String ref, URIish uri, ReplicationState state, boolean now) {
     if (!shouldReplicate(project, ref, state)) {
-      repLog.debug("Not scheduling replication {}:{} => {}", project, ref, uri);
+      repLog.atFine().log("Not scheduling replication %s:%s => %s", project, ref, uri);
       return;
     }
-    repLog.info("scheduling replication {}:{} => {}", project, ref, uri);
+    repLog.atInfo().log("scheduling replication %s:%s => %s", project, ref, uri);
 
     if (!config.replicatePermissions()) {
       PushOne e;
@@ -433,7 +434,8 @@
         task.addState(ref, state);
       }
       state.increasePushTaskCount(project.get(), ref);
-      repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, task, config.getDelay());
+      repLog.atInfo().log(
+          "scheduled %s:%s => %s to run after %ds", project, ref, task, config.getDelay());
     }
   }
 
@@ -580,7 +582,9 @@
       if (inFlightOp != null) {
         return RunwayStatus.denied(inFlightOp.getId());
       }
-      replicationTasksStorage.get().start(op);
+      if (!replicationTasksStorage.get().start(op)) {
+        return RunwayStatus.deniedExternal();
+      }
       inFlight.put(op.getURI(), op);
     }
     return RunwayStatus.allowed();
@@ -595,9 +599,20 @@
     }
   }
 
+  public Set<String> getPrunableTaskNames() {
+    Set<String> names = new HashSet<>();
+    for (PushOne push : pending.values()) {
+      if (!replicationTasksStorage.get().isWaiting(push)) {
+        repLog.atFine().log("No longer isWaiting, can prune %s", push.getURI());
+        names.add(push.toString());
+      }
+    }
+    return names;
+  }
+
   boolean wouldPushProject(Project.NameKey project) {
     if (!shouldReplicate(project)) {
-      repLog.debug("Skipping replication of project {}", project.get());
+      repLog.atFine().log("Skipping replication of project %s", project.get());
       return false;
     }
 
@@ -609,7 +624,8 @@
 
     boolean matches = (new ReplicationFilter(projects)).matches(project);
     if (!matches) {
-      repLog.debug("Skipping replication of project {}; does not match filter", project.get());
+      repLog.atFine().log(
+          "Skipping replication of project %s; does not match filter", project.get());
     }
     return matches;
   }
@@ -620,7 +636,7 @@
 
   boolean wouldPushRef(String ref) {
     if (!config.replicatePermissions() && RefNames.REFS_CONFIG.equals(ref)) {
-      repLog.debug("Skipping push of ref {}; it is a meta ref", ref);
+      repLog.atFine().log("Skipping push of ref %s; it is a meta ref", ref);
       return false;
     }
     if (PushOne.ALL_REFS.equals(ref)) {
@@ -631,7 +647,7 @@
         return true;
       }
     }
-    repLog.debug("Skipping push of ref {}; it does not match push ref specs", ref);
+    repLog.atFine().log("Skipping push of ref %s; it does not match push ref specs", ref);
     return false;
   }
 
@@ -671,7 +687,7 @@
     } else if (remoteNameStyle.equals("basenameOnly")) {
       name = FilenameUtils.getBaseName(name);
     } else if (!remoteNameStyle.equals("slash")) {
-      repLog.debug("Unknown remoteNameStyle: {}, falling back to slash", remoteNameStyle);
+      repLog.atFine().log("Unknown remoteNameStyle: %s, falling back to slash", remoteNameStyle);
     }
     String replacedPath = replaceName(template.getPath(), name, isSingleProjectMatch());
     return (replacedPath != null) ? template.setPath(replacedPath) : template;
@@ -757,7 +773,7 @@
       try {
         eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
       } catch (PermissionBackendException e) {
-        repLog.error("error posting event", e);
+        repLog.atSevere().withCause(e).log("error posting event");
       }
     }
   }
@@ -771,7 +787,7 @@
       try {
         eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
       } catch (PermissionBackendException e) {
-        repLog.error("error posting event", e);
+        repLog.atSevere().withCause(e).log("error posting event");
       }
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
index 71c38d0..d869e83 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -106,7 +106,7 @@
         try {
           uri = new URIish(url);
         } catch (URISyntaxException e) {
-          repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage());
+          repLog.atWarning().log("adminURL '%s' is invalid: %s", url, e.getMessage());
           continue;
         }
 
@@ -114,13 +114,14 @@
           String path =
               replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
           if (path == null) {
-            repLog.warn("adminURL {} does not contain ${name}", uri);
+            repLog.atWarning().log("adminURL %s does not contain ${name}", uri);
             continue;
           }
 
           uri = uri.setPath(path);
           if (!isSSH(uri)) {
-            repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri);
+            repLog.atWarning().log(
+                "adminURL '%s' is invalid: only SSH and HTTP are supported", uri);
             continue;
           }
         }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
index 4cc9974..5b24bf5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
@@ -152,6 +152,11 @@
   }
 
   @Override
+  public int getDistributionInterval() {
+    return replicationConfig.getDistributionInterval();
+  }
+
+  @Override
   public String getVersion() {
     Hasher hasher = Hashing.murmur3_128().newHasher();
     hasher.putString(replicationConfig.getVersion(), UTF_8);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
index 66130f9..fa81dd0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
@@ -62,34 +62,34 @@
 
   @Override
   public boolean createProject(Project.NameKey project, String head) {
-    repLog.info("Creating project {} on {}", project, uri);
+    repLog.atInfo().log("Creating project %s on %s", project, uri);
     String url = String.format("%s/a/projects/%s", toHttpUri(uri), Url.encode(project.get()));
     try {
       return httpClient
           .execute(new HttpPut(url), new HttpResponseHandler(), getContext())
           .isSuccessful();
     } catch (IOException e) {
-      repLog.error("Couldn't perform project creation on {}", uri, e);
+      repLog.atSevere().withCause(e).log("Couldn't perform project creation on %s", uri);
       return false;
     }
   }
 
   @Override
   public boolean deleteProject(Project.NameKey project) {
-    repLog.info("Deleting project {} on {}", project, uri);
+    repLog.atInfo().log("Deleting project %s on %s", project, uri);
     String url = String.format("%s/a/projects/%s", toHttpUri(uri), Url.encode(project.get()));
     try {
       httpClient.execute(new HttpDelete(url), new HttpResponseHandler(), getContext());
       return true;
     } catch (IOException e) {
-      repLog.error("Couldn't perform project deletion on {}", uri, e);
+      repLog.atSevere().withCause(e).log("Couldn't perform project deletion on %s", uri);
     }
     return false;
   }
 
   @Override
   public boolean updateHead(Project.NameKey project, String newHead) {
-    repLog.info("Updating head of {} on {}", project, uri);
+    repLog.atInfo().log("Updating head of %s on %s", project, uri);
     String url = String.format("%s/a/projects/%s/HEAD", toHttpUri(uri), Url.encode(project.get()));
     try {
       HttpPut req = new HttpPut(url);
@@ -98,7 +98,7 @@
       httpClient.execute(req, new HttpResponseHandler(), getContext());
       return true;
     } catch (IOException e) {
-      repLog.error("Couldn't perform update head on {}", uri, e);
+      repLog.atSevere().withCause(e).log("Couldn't perform update head on %s", uri);
     }
     return false;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
index da960e6..b092363 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -43,9 +43,9 @@
         u.disableRefLog();
         u.link(head);
       }
-      repLog.info("Created local repository: {}", uri);
+      repLog.atInfo().log("Created local repository: %s", uri);
     } catch (IOException e) {
-      repLog.error("Error creating local repository {}", uri.getPath(), e);
+      repLog.atSevere().withCause(e).log("Error creating local repository %s", uri.getPath());
       return false;
     }
     return true;
@@ -55,9 +55,9 @@
   public boolean deleteProject(Project.NameKey project) {
     try {
       recursivelyDelete(new File(uri.getPath()));
-      repLog.info("Deleted local repository: {}", uri);
+      repLog.atInfo().log("Deleted local repository: %s", uri);
     } catch (IOException e) {
-      repLog.error("Error deleting local repository {}:\n", uri.getPath(), e);
+      repLog.atSevere().withCause(e).log("Error deleting local repository %s:\n", uri.getPath());
       return false;
     }
     return true;
@@ -71,7 +71,8 @@
         u.link(newHead);
       }
     } catch (IOException e) {
-      repLog.error("Failed to update HEAD of repository {} to {}", uri.getPath(), newHead, e);
+      repLog.atSevere().withCause(e).log(
+          "Failed to update HEAD of repository %s to %s", uri.getPath(), newHead);
       return false;
     }
     return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 2cbd627..982e19e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -35,6 +35,7 @@
 import com.google.gerrit.server.git.ProjectRunnable;
 import com.google.gerrit.server.git.WorkQueue.CanceledWhileRunning;
 import com.google.gerrit.server.ioutil.HexFormat;
+import com.google.gerrit.server.logging.TraceContext;
 import com.google.gerrit.server.permissions.PermissionBackend;
 import com.google.gerrit.server.permissions.PermissionBackend.RefFilterOptions;
 import com.google.gerrit.server.permissions.PermissionBackendException;
@@ -54,6 +55,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.eclipse.jgit.errors.NoRemoteRepositoryException;
@@ -73,7 +75,6 @@
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.Transport;
 import org.eclipse.jgit.transport.URIish;
-import org.slf4j.MDC;
 
 /**
  * A push to remote operation started by {@link GitReferenceUpdatedListener}.
@@ -84,7 +85,7 @@
 class PushOne implements ProjectRunnable, CanceledWhileRunning {
   private final ReplicationStateListener stateLog;
   static final String ALL_REFS = "..all..";
-  static final String ID_MDC_KEY = "pushOneId";
+  static final String ID_KEY = "pushOneId";
 
   interface Factory {
     PushOne create(Project.NameKey d, URIish u);
@@ -166,17 +167,16 @@
 
   @Override
   public void cancel() {
-    repLog.info("Replication [{}] to {} was canceled", HexFormat.fromInt(id), getURI());
+    repLog.atInfo().log("Replication [%s] to %s was canceled", HexFormat.fromInt(id), getURI());
     canceledByReplication();
     pool.pushWasCanceled(this);
   }
 
   @Override
   public void setCanceledWhileRunning() {
-    repLog.info(
-        "Replication [{}] to {} was canceled while being executed",
-        HexFormat.fromInt(id),
-        getURI());
+    repLog.atInfo().log(
+        "Replication [%s] to %s was canceled while being executed",
+        HexFormat.fromInt(id), getURI());
     canceledWhileRunning.set(true);
   }
 
@@ -231,10 +231,10 @@
     if (ALL_REFS.equals(ref)) {
       delta.clear();
       pushAllRefs = true;
-      repLog.trace("Added all refs for replication to {}", uri);
+      repLog.atFinest().log("Added all refs for replication to %s", uri);
     } else if (!pushAllRefs) {
       delta.add(ref);
-      repLog.trace("Added ref {} for replication to {}", ref, uri);
+      repLog.atFinest().log("Added ref %s for replication to %s", ref, uri);
     }
   }
 
@@ -311,28 +311,35 @@
   }
 
   private void runPushOperation() {
+    try (TraceContext ctx = TraceContext.open().addTag(ID_KEY, HexFormat.fromInt(id))) {
+      doRunPushOperation();
+    }
+  }
+
+  private void doRunPushOperation() {
     // Lock the queue, and remove ourselves, so we can't be modified once
     // we start replication (instead a new instance, with the same URI, is
     // created and scheduled for a future point in time.)
     //
-    MDC.put(ID_MDC_KEY, HexFormat.fromInt(id));
     RunwayStatus status = pool.requestRunway(this);
     isCollision = false;
     if (!status.isAllowed()) {
       if (status.isCanceled()) {
-        repLog.info("PushOp for replication to {} was canceled and thus won't be rescheduled", uri);
+        repLog.atInfo().log(
+            "PushOp for replication to %s was canceled and thus won't be rescheduled", uri);
+      } else if (status.isExternalInflight()) {
+        repLog.atInfo().log("PushOp for replication to %s was denied externally", uri);
       } else {
-        repLog.info(
-            "Rescheduling replication to {} to avoid collision with the in-flight push [{}].",
-            uri,
-            HexFormat.fromInt(status.getInFlightPushId()));
+        repLog.atInfo().log(
+            "Rescheduling replication to %s to avoid collision with the in-flight push [%s].",
+            uri, HexFormat.fromInt(status.getInFlightPushId()));
         pool.reschedule(this, Destination.RetryReason.COLLISION);
         isCollision = true;
       }
       return;
     }
 
-    repLog.info("Replication to {} started...", uri);
+    repLog.atInfo().log("Replication to %s started...", uri);
     Timer1.Context<String> destinationContext = metrics.start(config.getName());
     try {
       long startedAt = destinationContext.getStartTime();
@@ -346,12 +353,9 @@
         metrics.recordSlowProjectReplication(
             config.getName(), projectName.get(), pool.getSlowLatencyThreshold(), elapsed);
       }
-      repLog.info(
-          "Replication to {} completed in {}ms, {}ms delay, {} retries",
-          uri,
-          elapsed,
-          delay,
-          retryCount);
+      repLog.atInfo().log(
+          "Replication to %s completed in %dms, %dms delay, %d retries",
+          uri, elapsed, delay, retryCount);
     } catch (RepositoryNotFoundException e) {
       stateLog.error(
           "Cannot replicate " + projectName + "; Local repository error: " + e.getMessage(),
@@ -368,7 +372,7 @@
           || msg.contains("unavailable")) {
         createRepository();
       } else {
-        repLog.error("Cannot replicate {}; Remote repository error: {}", projectName, msg);
+        repLog.atSevere().log("Cannot replicate %s; Remote repository error: %s", projectName, msg);
       }
 
     } catch (NoRemoteRepositoryException e) {
@@ -378,10 +382,10 @@
     } catch (TransportException e) {
       Throwable cause = e.getCause();
       if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) {
-        repLog.error("Cannot replicate to {}: {}", uri, cause.getMessage());
+        repLog.atSevere().log("Cannot replicate to %s: %s", uri, cause.getMessage());
       } else if (e instanceof LockFailureException) {
         lockRetryCount++;
-        repLog.error("Cannot replicate to {} due to lock failure", uri);
+        repLog.atSevere().log("Cannot replicate to %s due to lock failure", uri);
 
         // The remote push operation should be retried.
         if (lockRetryCount <= maxLockRetries) {
@@ -391,14 +395,14 @@
             pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR);
           }
         } else {
-          repLog.error(
-              "Giving up after {} lock failures during replication to {}", lockRetryCount, uri);
+          repLog.atSevere().log(
+              "Giving up after %d lock failures during replication to %s", lockRetryCount, uri);
         }
       } else {
         if (canceledWhileRunning.get()) {
           logCanceledWhileRunningException(e);
         } else {
-          repLog.error("Cannot replicate to {}", uri, e);
+          repLog.atSevere().withCause(e).log("Cannot replicate to %s", uri);
           // The remote push operation should be retried.
           pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR);
         }
@@ -416,7 +420,7 @@
   }
 
   private void logCanceledWhileRunningException(TransportException e) {
-    repLog.info("Cannot replicate to {}. It was canceled while running", uri, e);
+    repLog.atInfo().withCause(e).log("Cannot replicate to %s. It was canceled while running", uri);
   }
 
   private void createRepository() {
@@ -424,10 +428,11 @@
       try {
         Ref head = git.exactRef(Constants.HEAD);
         if (createProject(projectName, head != null ? getName(head) : null)) {
-          repLog.warn("Missing repository created; retry replication to {}", uri);
+          repLog.atWarning().log("Missing repository created; retry replication to %s", uri);
           pool.reschedule(this, Destination.RetryReason.REPOSITORY_MISSING);
         } else {
-          repLog.warn("Missing repository could not be created when replicating {}", uri);
+          repLog.atWarning().log(
+              "Missing repository could not be created when replicating %s", uri);
         }
       } catch (IOException ioe) {
         stateLog.error(
@@ -474,10 +479,10 @@
     }
 
     if (replConfig.getMaxRefsToLog() == 0 || todo.size() <= replConfig.getMaxRefsToLog()) {
-      repLog.info("Push to {} references: {}", uri, todo);
+      repLog.atInfo().log("Push to %s references: %s", uri, todo);
     } else {
-      repLog.info(
-          "Push to {} references (first {} of {} listed): {}",
+      repLog.atInfo().log(
+          "Push to %s references (first %d of %d listed): %s",
           uri,
           replConfig.getMaxRefsToLog(),
           todo.size(),
@@ -489,8 +494,8 @@
 
   private List<RemoteRefUpdate> generateUpdates(Transport tn)
       throws IOException, PermissionBackendException {
-    ProjectState projectState = projectCache.checkedGet(projectName);
-    if (projectState == null) {
+    Optional<ProjectState> projectState = projectCache.get(projectName);
+    if (!projectState.isPresent()) {
       return Collections.emptyList();
     }
 
@@ -499,7 +504,7 @@
     boolean filter;
     PermissionBackend.ForProject forProject = permissionBackend.currentUser().project(projectName);
     try {
-      projectState.checkStatePermitsRead();
+      projectState.get().checkStatePermitsRead();
       forProject.check(ProjectPermission.READ);
       filter = false;
     } catch (AuthException | ResourceConflictException e) {
@@ -519,7 +524,11 @@
         }
         local = n;
       }
-      local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build());
+      local =
+          forProject
+              .filter(local.values(), git, RefFilterOptions.builder().setFilterMeta(true).build())
+              .stream()
+              .collect(toMap(Ref::getName, r -> r));
     }
 
     List<RemoteRefUpdate> remoteUpdatesList =
@@ -536,7 +545,7 @@
     Map<String, Ref> remote = listRemote(tn);
     for (Ref src : local.values()) {
       if (!canPushRef(src.getName(), noPerms)) {
-        repLog.debug("Skipping push of ref {}", src.getName());
+        repLog.atFine().log("Skipping push of ref %s", src.getName());
         continue;
       }
 
@@ -553,7 +562,7 @@
     if (config.isMirror()) {
       for (Ref ref : remote.values()) {
         if (Constants.HEAD.equals(ref.getName())) {
-          repLog.debug("Skipping deletion of {}", ref.getName());
+          repLog.atFine().log("Skipping deletion of %s", ref.getName());
           continue;
         }
         RefSpec spec = matchDst(ref.getName());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
index 7538298..f96c157 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -42,17 +42,14 @@
     OutputStream errStream = sshHelper.newErrorBufferStream();
     try {
       sshHelper.executeRemoteSsh(uri, cmd, errStream);
-      repLog.info("Created remote repository: {}", uri);
+      repLog.atInfo().log("Created remote repository: %s", uri);
     } catch (IOException e) {
-      repLog.error(
-          "Error creating remote repository at {}:\n"
-              + "  Exception: {}\n"
-              + "  Command: {}\n"
-              + "  Output: {}",
-          uri,
-          e,
-          cmd,
-          errStream);
+      repLog.atSevere().log(
+          "Error creating remote repository at %s:\n"
+              + "  Exception: %s\n"
+              + "  Command: %s\n"
+              + "  Output: %s",
+          uri, e, cmd, errStream);
       return false;
     }
     return true;
@@ -65,17 +62,14 @@
     OutputStream errStream = sshHelper.newErrorBufferStream();
     try {
       sshHelper.executeRemoteSsh(uri, cmd, errStream);
-      repLog.info("Deleted remote repository: {}", uri);
+      repLog.atInfo().log("Deleted remote repository: %s", uri);
     } catch (IOException e) {
-      repLog.error(
-          "Error deleting remote repository at {}:\n"
-              + "  Exception: {}\n"
-              + "  Command: {}\n"
-              + "  Output: {}",
-          uri,
-          e,
-          cmd,
-          errStream);
+      repLog.atSevere().log(
+          "Error deleting remote repository at %s:\n"
+              + "  Exception: %s\n"
+              + "  Command: %s\n"
+              + "  Output: %s",
+          uri, e, cmd, errStream);
       return false;
     }
     return true;
@@ -90,16 +84,12 @@
     try {
       sshHelper.executeRemoteSsh(uri, cmd, errStream);
     } catch (IOException e) {
-      repLog.error(
-          "Error updating HEAD of remote repository at {} to {}:\n"
-              + "  Exception: {}\n"
-              + "  Command: {}\n"
-              + "  Output: {}",
-          uri,
-          newHead,
-          e,
-          cmd,
-          errStream);
+      repLog.atSevere().log(
+          "Error updating HEAD of remote repository at %s to %s:\n"
+              + "  Exception: %s\n"
+              + "  Command: %s\n"
+              + "  Output: %s",
+          uri, newHead, e, cmd, errStream);
       return false;
     }
     return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index d3d64db..99e5ee4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -45,6 +45,13 @@
   boolean isDefaultForceUpdate();
 
   /**
+   * Returns the interval in seconds for running task distribution.
+   *
+   * @return number of seconds, zero if never.
+   */
+  int getDistributionInterval();
+
+  /**
    * Returns the maximum number of ref-specs to log into the replication_log whenever a push
    * operation is completed against a replication end.
    *
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index e99f6b1..2631cbe 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -47,9 +47,9 @@
     try {
       config.load();
     } catch (ConfigInvalidException e) {
-      repLog.error("Config file {} is invalid: {}", cfgPath, e.getMessage(), e);
+      repLog.atSevere().withCause(e).log("Config file %s is invalid: %s", cfgPath, e.getMessage());
     } catch (IOException e) {
-      repLog.error("Cannot read {}: {}", cfgPath, e.getMessage(), e);
+      repLog.atSevere().withCause(e).log("Cannot read %s: %s", cfgPath, e.getMessage());
     }
     this.replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
     this.defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
@@ -86,6 +86,11 @@
   }
 
   @Override
+  public int getDistributionInterval() {
+    return config.getInt("replication", "distributionInterval", 0);
+  }
+
+  @Override
   public int getMaxRefsToLog() {
     return maxRefsToLog;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java
index fed09f9..60a9523 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java
@@ -28,6 +28,6 @@
         systemLog,
         serverInfo,
         ReplicationQueue.REPLICATION_LOG_NAME,
-        new PatternLayout("[%d] [%X{" + PushOne.ID_MDC_KEY + "}] %m%n"));
+        new PatternLayout("[%d] %m%n"));
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 869f728..a8ffeec 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
@@ -25,6 +27,7 @@
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.util.logging.NamedFluentLogger;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
@@ -34,9 +37,9 @@
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import org.eclipse.jgit.transport.URIish;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /** Manages automatic replication to remote repositories. */
 public class ReplicationQueue
@@ -46,10 +49,11 @@
         ProjectDeletedListener,
         HeadUpdatedListener {
   static final String REPLICATION_LOG_NAME = "replication_log";
-  static final Logger repLog = LoggerFactory.getLogger(REPLICATION_LOG_NAME);
+  static final NamedFluentLogger repLog = NamedFluentLogger.forName(REPLICATION_LOG_NAME);
 
   private final ReplicationStateListener stateLog;
 
+  private final ReplicationConfig replConfig;
   private final WorkQueue workQueue;
   private final DynamicItem<EventDispatcher> dispatcher;
   private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency
@@ -57,14 +61,17 @@
   private volatile boolean running;
   private volatile boolean replaying;
   private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
+  private Distributor distributor;
 
   @Inject
   ReplicationQueue(
+      ReplicationConfig rc,
       WorkQueue wq,
       Provider<ReplicationDestinations> rd,
       DynamicItem<EventDispatcher> dis,
       ReplicationStateListeners sl,
       ReplicationTasksStorage rts) {
+    replConfig = rc;
     workQueue = wq;
     dispatcher = dis;
     destinations = rd;
@@ -81,15 +88,17 @@
       replicationTasksStorage.resetAll();
       firePendingEvents();
       fireBeforeStartupEvents();
+      distributor = new Distributor(workQueue);
     }
   }
 
   @Override
   public void stop() {
     running = false;
+    distributor.stop();
     int discarded = destinations.get().shutdown();
     if (discarded > 0) {
-      repLog.warn("Canceled {} replication events during shutdown", discarded);
+      repLog.atWarning().log("Canceled %d replication events during shutdown", discarded);
     }
   }
 
@@ -110,17 +119,17 @@
   @VisibleForTesting
   public void scheduleFullSync(
       Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
-    fire(project, urlMatch, PushOne.ALL_REFS, state, now);
+    fire(project, urlMatch, PushOne.ALL_REFS, state, now, false);
   }
 
   @Override
   public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
-    fire(event.getProjectName(), event.getRefName());
+    fire(event.getProjectName(), event.getRefName(), false);
   }
 
-  private void fire(String projectName, String refName) {
+  private void fire(String projectName, String refName, boolean isPersisted) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
-    fire(Project.nameKey(projectName), null, refName, state, false);
+    fire(Project.nameKey(projectName), null, refName, state, false, isPersisted);
     state.markAllPushTasksScheduled();
   }
 
@@ -129,7 +138,8 @@
       String urlMatch,
       String refName,
       ReplicationState state,
-      boolean now) {
+      boolean now,
+      boolean isPersisted) {
     if (!running) {
       stateLog.warn(
           "Replication plugin did not finish startup before event, event replication is postponed",
@@ -141,12 +151,14 @@
     for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
       if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
         for (URIish uri : cfg.getURIs(project, urlMatch)) {
-          replicationTasksStorage.create(
-              new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+          if (!isPersisted) {
+            replicationTasksStorage.create(
+                new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+          }
           cfg.schedule(project, refName, uri, state, now);
         }
       } else {
-        repLog.debug("Skipping ref {} on project {}", refName, project.get());
+        repLog.atFine().log("Skipping ref %s on project %s", refName, project.get());
       }
     }
   }
@@ -159,8 +171,8 @@
       for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
         String eventKey = String.format("%s:%s", t.project, t.ref);
         if (!eventsReplayed.contains(eventKey)) {
-          repLog.info("Firing pending task {}", eventKey);
-          fire(t.project, t.ref);
+          repLog.atInfo().log("Firing pending task %s", eventKey);
+          fire(t.project, t.ref, true);
           eventsReplayed.add(eventKey);
         }
       }
@@ -169,6 +181,30 @@
     }
   }
 
+  private void pruneCompleted() {
+    // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes.
+    // We also cannot access them by taskId since PushOnes don't have a taskId, they do have
+    // and Id, but it not the id assigned to the task in the queues. The tasks in the queue
+    // do use the same name as returned by toString() though, so that be used to correlate
+    // PushOnes with queue tasks despite their wrappers.
+    Set<String> prunableTaskNames = new HashSet<>();
+    for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
+      prunableTaskNames.addAll(destination.getPrunableTaskNames());
+    }
+
+    for (WorkQueue.Task<?> task : workQueue.getTasks()) {
+      WorkQueue.Task.State state = task.getState();
+      if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) {
+        if (task instanceof WorkQueue.ProjectTask) {
+          if (prunableTaskNames.contains(task.toString())) {
+            repLog.atFine().log("Pruning externally completed task: %s", task);
+            task.cancel(false);
+          }
+        }
+      }
+    }
+  }
+
   @Override
   public void onProjectDeleted(ProjectDeletedListener.Event event) {
     Project.NameKey p = Project.nameKey(event.getProjectName());
@@ -188,8 +224,8 @@
     for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) {
       String eventKey = String.format("%s:%s", event.projectName(), event.refName());
       if (!eventsReplayed.contains(eventKey)) {
-        repLog.info("Firing pending task {}", event);
-        fire(event.projectName(), event.refName());
+        repLog.atInfo().log("Firing pending task %s", event);
+        fire(event.projectName(), event.refName(), false);
         eventsReplayed.add(eventKey);
       }
     }
@@ -206,4 +242,49 @@
 
     public abstract String refName();
   }
+
+  protected class Distributor implements WorkQueue.CancelableRunnable {
+    public ScheduledThreadPoolExecutor executor;
+    public ScheduledFuture<?> future;
+
+    public Distributor(WorkQueue wq) {
+      int distributionInterval = replConfig.getDistributionInterval();
+      if (distributionInterval > 0) {
+        executor = wq.createQueue(1, "Replication Distribution", false);
+        future =
+            executor.scheduleWithFixedDelay(
+                this, distributionInterval, distributionInterval, SECONDS);
+      }
+    }
+
+    @Override
+    public void run() {
+      if (!running) {
+        return;
+      }
+      try {
+        firePendingEvents();
+        pruneCompleted();
+      } catch (Exception e) {
+        repLog.atSevere().withCause(e).log("error distributing tasks");
+      }
+    }
+
+    @Override
+    public void cancel() {
+      future.cancel(true);
+    }
+
+    public void stop() {
+      if (executor != null) {
+        cancel();
+        executor.getQueue().remove(this);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "Replication Distributor";
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
index f2d55de..3e73033 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
@@ -31,19 +31,19 @@
   @Override
   public void warn(String msg, ReplicationState... states) {
     stateWriteErr("Warning: " + msg, states);
-    repLog.warn(msg);
+    repLog.atWarning().log(msg);
   }
 
   @Override
   public void error(String msg, ReplicationState... states) {
     stateWriteErr("Error: " + msg, states);
-    repLog.error(msg);
+    repLog.atSevere().log(msg);
   }
 
   @Override
   public void error(String msg, Throwable t, ReplicationState... states) {
     stateWriteErr("Error: " + msg, states);
-    repLog.error(msg, t);
+    repLog.atSevere().withCause(t).log(msg);
   }
 
   private void stateWriteErr(String msg, ReplicationState[] states) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 8d4b10a..c764161 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -26,6 +26,7 @@
 import java.io.IOException;
 import java.nio.file.DirectoryIteratorException;
 import java.nio.file.DirectoryStream;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
@@ -47,13 +48,19 @@
  * task:
  *
  * <p><code>
- *   .../building/<tmp_name>  new replication tasks under construction
- *   .../running/<sha1>       running replication tasks
- *   .../waiting/<sha1>       outstanding replication tasks
+ *   .../building/<tmp_name>                       new replication tasks under construction
+ *   .../running/<uri_sha1>/                       lock for URI
+ *   .../running/<uri_sha1>/<task_sha1>            running replication tasks
+ *   .../waiting/<task_sha1_NN_shard>/<task_sha1>  outstanding replication tasks
  * </code>
  *
+ * <p>The URI lock is acquired by creating the directory and released by removing it.
+ *
  * <p>Tasks are moved atomically via a rename between those directories to indicate the current
  * state of each task.
+ *
+ * <p>Note: The .../waiting/<task_sha1_NN_shard> directories are never removed. This helps prevent
+ * failures when moving tasks to and from the shard directories from different hosts concurrently.
  */
 @Singleton
 public class ReplicationTasksStorage {
@@ -61,26 +68,50 @@
 
   private boolean disableDeleteForTesting;
 
-  public static class ReplicateRefUpdate {
-    public final String project;
+  public static class ReplicateRefUpdate extends UriUpdate {
     public final String ref;
-    public final String uri;
-    public final String remote;
 
-    public ReplicateRefUpdate(PushOne push, String ref) {
-      this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName());
+    public ReplicateRefUpdate(UriUpdate update, String ref) {
+      this(update.project, ref, update.uri, update.remote);
     }
 
     public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
-      this.project = project;
+      this(project, ref, uri.toASCIIString(), remote);
+    }
+
+    protected ReplicateRefUpdate(String project, String ref, String uri, String remote) {
+      super(project, uri, remote);
       this.ref = ref;
-      this.uri = uri.toASCIIString();
+    }
+
+    @Override
+    public String toString() {
+      return "ref-update " + ref + " (" + super.toString() + ")";
+    }
+  }
+
+  public static class UriUpdate {
+    public final String project;
+    public final String uri;
+    public final String remote;
+
+    public UriUpdate(PushOne push) {
+      this(push.getProjectNameKey().get(), push.getURI(), push.getRemoteName());
+    }
+
+    public UriUpdate(String project, URIish uri, String remote) {
+      this(project, uri.toASCIIString(), remote);
+    }
+
+    public UriUpdate(String project, String uri, String remote) {
+      this.project = project;
+      this.uri = uri;
       this.remote = remote;
     }
 
     @Override
     public String toString() {
-      return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
+      return "uri-update " + project + " uri:" + uri + " remote:" + remote;
     }
   }
 
@@ -108,28 +139,60 @@
     this.disableDeleteForTesting = deleteDisabled;
   }
 
-  public synchronized void start(PushOne push) {
-    for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).start();
+  public synchronized boolean start(PushOne push) {
+    UriLock lock = new UriLock(push);
+    if (!lock.acquire()) {
+      return false;
     }
+
+    boolean started = false;
+    for (String ref : push.getRefs()) {
+      started = new Task(lock, ref).start() || started;
+    }
+
+    if (!started) { // No tasks left, likely replicated externally
+      lock.release();
+    }
+    return started;
   }
 
   public synchronized void reset(PushOne push) {
+    UriLock lock = new UriLock(push);
     for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).reset();
+      new Task(lock, ref).reset();
     }
+    lock.release();
   }
 
   public synchronized void resetAll() {
-    for (ReplicateRefUpdate r : listRunning()) {
-      new Task(r).reset();
+    try (DirectoryStream<Path> dirs = Files.newDirectoryStream(createDir(runningUpdates))) {
+      for (Path dir : dirs) {
+        UriLock lock = null;
+        for (ReplicateRefUpdate u : list(dir)) {
+          if (lock == null) {
+            lock = new UriLock(u);
+          }
+          new Task(u).reset();
+        }
+        if (lock != null) {
+          lock.release();
+        }
+      }
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Error while aborting running tasks");
     }
   }
 
-  public synchronized void finish(PushOne push) {
-    for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).finish();
+  public boolean isWaiting(PushOne push) {
+    return push.getRefs().stream().map(ref -> new Task(push, ref)).anyMatch(Task::isWaiting);
+  }
+
+  public void finish(PushOne push) {
+    UriLock lock = new UriLock(push);
+    for (ReplicateRefUpdate r : list(lock.runningDir)) {
+      new Task(lock, r.ref).finish();
     }
+    lock.release();
   }
 
   public synchronized List<ReplicateRefUpdate> listWaiting() {
@@ -186,20 +249,73 @@
     }
   }
 
+  private class UriLock {
+    public final UriUpdate update;
+    public final String uriKey;
+    public final Path runningDir;
+
+    public UriLock(PushOne push) {
+      this(new UriUpdate(push));
+    }
+
+    public UriLock(UriUpdate update) {
+      this.update = update;
+      uriKey = sha1(update.uri).name();
+      runningDir = createDir(runningUpdates).resolve(uriKey);
+    }
+
+    public boolean acquire() {
+      try {
+        logger.atFine().log("MKDIR %s %s", runningDir, updateLog());
+        Files.createDirectory(runningDir);
+        return true;
+      } catch (FileAlreadyExistsException e) {
+        return false; // already running, likely externally
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while starting uri %s", uriKey);
+        return true; // safer to risk a duplicate than to skip it
+      }
+    }
+
+    public void release() {
+      if (disableDeleteForTesting) {
+        logger.atFine().log("DELETE %s %s DISABLED", runningDir, updateLog());
+        return;
+      }
+
+      try {
+        logger.atFine().log("DELETE %s %s", runningDir, updateLog());
+        Files.delete(runningDir);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while releasing uri %s", uriKey);
+      }
+    }
+
+    private String updateLog() {
+      return String.format("(%s => %s)", update.project, update.uri);
+    }
+  }
+
   private class Task {
     public final ReplicateRefUpdate update;
-    public final String json;
     public final String taskKey;
     public final Path running;
     public final Path waiting;
 
-    public Task(ReplicateRefUpdate update) {
-      this.update = update;
-      json = GSON.toJson(update) + "\n";
+    public Task(ReplicateRefUpdate r) {
+      this(new UriLock(r), r.ref);
+    }
+
+    public Task(PushOne push, String ref) {
+      this(new UriLock(push), ref);
+    }
+
+    public Task(UriLock lock, String ref) {
+      update = new ReplicateRefUpdate(lock.update, ref);
       String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
       taskKey = sha1(key).name();
-      running = createDir(runningUpdates).resolve(taskKey);
-      waiting = createDir(waitingUpdates).resolve(taskKey);
+      running = lock.runningDir.resolve(taskKey);
+      waiting = createDir(waitingUpdates.resolve(taskKey.substring(0, 2))).resolve(taskKey);
     }
 
     public String create() {
@@ -207,6 +323,7 @@
         return taskKey;
       }
 
+      String json = GSON.toJson(update) + "\n";
       try {
         Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null);
         logger.atFine().log("CREATE %s %s", tmp, updateLog());
@@ -219,14 +336,19 @@
       return taskKey;
     }
 
-    public void start() {
+    public boolean start() {
       rename(waiting, running);
+      return Files.exists(running);
     }
 
     public void reset() {
       rename(running, waiting);
     }
 
+    public boolean isWaiting() {
+      return Files.exists(waiting);
+    }
+
     public void finish() {
       if (disableDeleteForTesting) {
         logger.atFine().log("DELETE %s %s DISABLED", running, updateLog());
@@ -237,7 +359,7 @@
         logger.atFine().log("DELETE %s %s", running, updateLog());
         Files.delete(running);
       } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+        logger.atSevere().withCause(e).log("Error while finishing task %s", taskKey);
       }
     }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
index bcb1e2f..f7071d8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
@@ -27,6 +27,10 @@
     return new RunwayStatus(false, inFlightPushId);
   }
 
+  public static RunwayStatus deniedExternal() {
+    return new RunwayStatus(false, -1);
+  }
+
   private final boolean allowed;
   private final int inFlightPushId;
 
@@ -43,6 +47,10 @@
     return !allowed && inFlightPushId == 0;
   }
 
+  public boolean isExternalInflight() {
+    return !allowed && inFlightPushId == -1;
+  }
+
   public int getInFlightPushId() {
     return inFlightPushId;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
index ffa6be1..fdbd5e7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
@@ -58,7 +58,8 @@
       return;
     }
 
-    repLog.warn("Cannot update HEAD of project {} on remote site {}.", project, replicateURI);
+    repLog.atWarning().log(
+        "Cannot update HEAD of project %s on remote site %s.", project, replicateURI);
   }
 
   @Override
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 0aad73b..fc3352d 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -94,6 +94,20 @@
 :	Timeout for SSH connections. If 0, there is no timeout and
         the client waits indefinitely. By default, 2 minutes.
 
+replication.distributionInterval
+:	Interval in seconds for running the replication distributor. When
+	run, the replication distributor will add all persisted waiting tasks
+	to the queue to ensure that externally loaded tasks are visible to
+	the current process. If zero, turn off the replication distributor. By
+	default, zero.
+
+	Turning this on is likely only useful when there are other processes
+	(such as other masters in the same cluster) writing to the same
+	persistence store. To ensure that updates are seen well before their
+	replicationDelay expires when the distributor is used, the recommended
+	value for this is approximately the smallest remote.NAME.replicationDelay
+	divided by 5.
+
 replication.lockErrorMaxRetries
 :	Number of times to retry a replication operation if a lock
 	error is detected.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index c09fcd1..94f0dc4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -41,6 +41,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -245,9 +246,9 @@
     return push;
   }
 
-  private void setupProjectCacheMock() throws IOException {
+  private void setupProjectCacheMock() {
     projectCacheMock = mock(ProjectCache.class);
-    when(projectCacheMock.checkedGet(projectNameKey)).thenReturn(projectStateMock);
+    when(projectCacheMock.get(projectNameKey)).thenReturn(Optional.of(projectStateMock));
   }
 
   private void setupTransportMock() throws NotSupportedException, TransportException {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
index efacae7..79b05cc 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
@@ -65,4 +65,25 @@
     assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
     assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
   }
+
+  @Test
+  public void shouldSkipFetchRefSpecs() throws Exception {
+    FileBasedConfig config = newReplicationConfig();
+    String pushRemote = "pushRemote";
+    final String aRemoteURL = "ssh://somewhere/${name}.git";
+    config.setString("remote", pushRemote, "url", aRemoteURL);
+
+    String fetchRemote = "fetchRemote";
+    config.setString("remote", fetchRemote, "url", aRemoteURL);
+    config.setString("remote", fetchRemote, "fetch", "refs/*:refs/*");
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(newReplicationFileBasedConfig());
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+
+    assertThatIsDestination(destinations.get(0), pushRemote, aRemoteURL);
+  }
 }