Merge branch 'stable-3.1' into stable-3.2

* stable-3.1:
  Call retryDone() when giving up after lock failures
  Fix issue with task cleanup after retry

Change-Id: I6dbeaa0d21545a1903bdb11c5de5d9e8f72079c5
diff --git a/BUILD b/BUILD
index 1bf917d..ba4a21d 100644
--- a/BUILD
+++ b/BUILD
@@ -21,6 +21,7 @@
 
 junit_tests(
     name = "replication_tests",
+    timeout = "long",
     srcs = glob([
         "src/test/java/**/*Test.java",
     ]),
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index fe5dbad..782ff4f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -67,6 +67,11 @@
   }
 
   @Override
+  public int getDistributionInterval() {
+    return currentConfig.getDistributionInterval();
+  }
+
+  @Override
   public synchronized int getMaxRefsToLog() {
     return currentConfig.getMaxRefsToLog();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
index fa26e82..2436fee 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
@@ -31,7 +31,7 @@
   }
 
   private final RemoteConfig config;
-  private final DestinationsCollection destinations;
+  private final ReplicationDestinations destinations;
   private final DynamicItem<AdminApiFactory> adminApiFactory;
   private final Project.NameKey project;
   private final String head;
@@ -39,7 +39,7 @@
   @Inject
   CreateProjectTask(
       RemoteConfig config,
-      DestinationsCollection destinations,
+      ReplicationDestinations destinations,
       DynamicItem<AdminApiFactory> adminApiFactory,
       @Assisted Project.NameKey project,
       @Assisted String head) {
@@ -63,7 +63,8 @@
       return true;
     }
 
-    repLog.warn("Cannot create new project {} on remote site {}.", projectName, replicateURI);
+    repLog.atWarning().log(
+        "Cannot create new project %s on remote site %s.", projectName, replicateURI);
     return false;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
index 4617672..8ea7227 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
@@ -55,7 +55,7 @@
       return;
     }
 
-    repLog.warn("Cannot delete project {} on remote site {}.", project, replicateURI);
+    repLog.atWarning().log("Cannot delete project %s on remote site %s.", project, replicateURI);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 62ea42b..67dae7e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.google.gerrit.server.project.ProjectCache.noSuchProject;
 import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName;
 import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
 import static org.eclipse.jgit.transport.RemoteRefUpdate.Status.NON_EXISTING;
@@ -30,6 +31,7 @@
 import com.google.gerrit.entities.BranchNameKey;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.exceptions.StorageException;
 import com.google.gerrit.extensions.config.FactoryModule;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
@@ -51,6 +53,7 @@
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.gerrit.server.project.ProjectState;
 import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.util.logging.NamedFluentLogger;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.Provider;
@@ -64,10 +67,13 @@
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -81,10 +87,9 @@
 import org.eclipse.jgit.transport.RemoteConfig;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.URIish;
-import org.slf4j.Logger;
 
 public class Destination {
-  private static final Logger repLog = ReplicationQueue.repLog;
+  private static final NamedFluentLogger repLog = ReplicationQueue.repLog;
 
   private static final String PROJECT_NOT_AVAILABLE = "source project %s not available";
 
@@ -94,7 +99,9 @@
 
   private final ReplicationStateListener stateLog;
   private final Object stateLock = new Object();
-  private final Map<URIish, PushOne> pending = new HashMap<>();
+  // writes are covered by the stateLock, but some reads are still
+  // allowed without the lock
+  private final ConcurrentMap<URIish, PushOne> pending = new ConcurrentHashMap<>();
   private final Map<URIish, PushOne> inFlight = new HashMap<>();
   private final PushOne.Factory opFactory;
   private final DeleteProjectTask.Factory deleteProjectFactory;
@@ -156,7 +163,7 @@
           builder.add(g.getUUID());
           addRecursiveParents(g.getUUID(), builder, groupIncludeCache);
         } else {
-          repLog.warn("Group \"{}\" not recognized, removing from authGroup", name);
+          repLog.atWarning().log("Group \"%s\" not recognized, removing from authGroup", name);
         }
       }
       remoteUser = new RemoteSiteUser(new ListGroupMembership(builder.build()));
@@ -240,11 +247,9 @@
         int numInFlight = inFlight.size();
 
         if (numPending > 0 || numInFlight > 0) {
-          repLog.warn(
-              "Cancelling replication events (pending={}, inFlight={}) for destination {}",
-              numPending,
-              numInFlight,
-              getRemoteConfigName());
+          repLog.atWarning().log(
+              "Cancelling replication events (pending=%d, inFlight=%d) for destination %s",
+              numPending, numInFlight, getRemoteConfigName());
 
           foreachPushOp(
               pending,
@@ -280,7 +285,8 @@
     if (!config.replicateHiddenProjects()
         && state.getProject().getState()
             == com.google.gerrit.extensions.client.ProjectState.HIDDEN) {
-      repLog.debug("Project {} is hidden and replication of hidden projects is disabled", name);
+      repLog.atFine().log(
+          "Project %s is hidden and replication of hidden projects is disabled", name);
       return false;
     }
 
@@ -293,10 +299,9 @@
       permissionBackend.user(user).project(state.getNameKey()).check(permissionToCheck);
       return true;
     } catch (AuthException e) {
-      repLog.debug(
-          "Project {} is not visible to current user {}",
-          name,
-          user.getUserName().orElse("unknown"));
+      repLog.atFine().log(
+          "Project %s is not visible to current user %s",
+          name, user.getUserName().orElse("unknown"));
       return false;
     }
   }
@@ -309,21 +314,22 @@
               () -> {
                 ProjectState projectState;
                 try {
-                  projectState = projectCache.checkedGet(project);
-                } catch (IOException e) {
-                  repLog.warn("Error reading project {} from cache", project, e);
+                  projectState = projectCache.get(project).orElseThrow(noSuchProject(project));
+                } catch (StorageException e) {
+                  repLog.atWarning().withCause(e).log(
+                      "Error reading project %s from cache", project);
                   return false;
                 }
                 if (projectState == null) {
-                  repLog.debug("Project {} does not exist", project);
+                  repLog.atFine().log("Project %s does not exist", project);
                   throw new NoSuchProjectException(project);
                 }
                 if (!projectState.statePermitsRead()) {
-                  repLog.debug("Project {} does not permit read", project);
+                  repLog.atFine().log("Project %s does not permit read", project);
                   return false;
                 }
                 if (!shouldReplicate(projectState, userProvider.get())) {
-                  repLog.debug("Project {} should not be replicated", project);
+                  repLog.atFine().log("Project %s should not be replicated", project);
                   return false;
                 }
                 if (PushOne.ALL_REFS.equals(ref)) {
@@ -339,11 +345,9 @@
                       .ref(ref)
                       .check(RefPermission.READ);
                 } catch (AuthException e) {
-                  repLog.debug(
-                      "Ref {} on project {} is not visible to calling user {}",
-                      ref,
-                      project,
-                      userProvider.get().getUserName().orElse("unknown"));
+                  repLog.atFine().log(
+                      "Ref %s on project %s is not visible to calling user %s",
+                      ref, project, userProvider.get().getUserName().orElse("unknown"));
                   return false;
                 }
                 return true;
@@ -365,13 +369,10 @@
               () -> {
                 ProjectState projectState;
                 try {
-                  projectState = projectCache.checkedGet(project);
-                } catch (IOException e) {
+                  projectState = projectCache.get(project).orElseThrow(noSuchProject(project));
+                } catch (StorageException e) {
                   return false;
                 }
-                if (projectState == null) {
-                  throw new NoSuchProjectException(project);
-                }
                 return shouldReplicate(projectState, userProvider.get());
               })
           .call();
@@ -391,10 +392,10 @@
   void schedule(
       Project.NameKey project, String ref, URIish uri, ReplicationState state, boolean now) {
     if (!shouldReplicate(project, ref, state)) {
-      repLog.debug("Not scheduling replication {}:{} => {}", project, ref, uri);
+      repLog.atFine().log("Not scheduling replication %s:%s => %s", project, ref, uri);
       return;
     }
-    repLog.info("scheduling replication {}:{} => {}", project, ref, uri);
+    repLog.atInfo().log("scheduling replication %s:%s => %s", project, ref, uri);
 
     if (!config.replicatePermissions()) {
       PushOne e;
@@ -436,7 +437,8 @@
         task.addState(ref, state);
       }
       state.increasePushTaskCount(project.get(), ref);
-      repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, task, config.getDelay());
+      repLog.atInfo().log(
+          "scheduled %s:%s => %s to run after %ds", project, ref, task, config.getDelay());
     }
   }
 
@@ -562,6 +564,7 @@
                   pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES);
             } else {
               pushOp.canceledByReplication();
+              pushOp.retryDone();
               pending.remove(uri);
               stateLog.error(
                   "Push to " + pushOp.getURI() + " cancelled after maximum number of retries",
@@ -598,13 +601,24 @@
     }
   }
 
+  public Set<String> getPrunableTaskNames() {
+    Set<String> names = new HashSet<>();
+    for (PushOne push : pending.values()) {
+      if (!replicationTasksStorage.get().isWaiting(push)) {
+        repLog.atFine().log("No longer isWaiting, can prune %s", push.getURI());
+        names.add(push.toString());
+      }
+    }
+    return names;
+  }
+
   boolean wouldPush(URIish uri, Project.NameKey project, String ref) {
     return matches(uri, project) && wouldPushProject(project) && wouldPushRef(ref);
   }
 
   boolean wouldPushProject(Project.NameKey project) {
     if (!shouldReplicate(project)) {
-      repLog.debug("Skipping replication of project {}", project.get());
+      repLog.atFine().log("Skipping replication of project %s", project.get());
       return false;
     }
 
@@ -616,7 +630,8 @@
 
     boolean matches = (new ReplicationFilter(projects)).matches(project);
     if (!matches) {
-      repLog.debug("Skipping replication of project {}; does not match filter", project.get());
+      repLog.atFine().log(
+          "Skipping replication of project %s; does not match filter", project.get());
     }
     return matches;
   }
@@ -627,7 +642,7 @@
 
   boolean wouldPushRef(String ref) {
     if (!config.replicatePermissions() && RefNames.REFS_CONFIG.equals(ref)) {
-      repLog.debug("Skipping push of ref {}; it is a meta ref", ref);
+      repLog.atFine().log("Skipping push of ref %s; it is a meta ref", ref);
       return false;
     }
     if (PushOne.ALL_REFS.equals(ref)) {
@@ -638,7 +653,7 @@
         return true;
       }
     }
-    repLog.debug("Skipping push of ref {}; it does not match push ref specs", ref);
+    repLog.atFine().log("Skipping push of ref %s; it does not match push ref specs", ref);
     return false;
   }
 
@@ -688,7 +703,7 @@
     } else if (remoteNameStyle.equals("basenameOnly")) {
       name = FilenameUtils.getBaseName(name);
     } else if (!remoteNameStyle.equals("slash")) {
-      repLog.debug("Unknown remoteNameStyle: {}, falling back to slash", remoteNameStyle);
+      repLog.atFine().log("Unknown remoteNameStyle: %s, falling back to slash", remoteNameStyle);
     }
     String replacedPath = replaceName(template.getPath(), name, isSingleProjectMatch());
     return (replacedPath != null) ? template.setPath(replacedPath) : template;
@@ -774,7 +789,7 @@
       try {
         eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
       } catch (PermissionBackendException e) {
-        repLog.error("error posting event", e);
+        repLog.atSevere().withCause(e).log("error posting event");
       }
     }
   }
@@ -788,7 +803,7 @@
       try {
         eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
       } catch (PermissionBackendException e) {
-        repLog.error("error posting event", e);
+        repLog.atSevere().withCause(e).log("error posting event");
       }
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java
index 4050c9c..9b6d431 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.flogger.FluentLogger;
@@ -54,6 +56,11 @@
         continue;
       }
 
+      if (!c.getFetchRefSpecs().isEmpty()) {
+        repLog.atInfo().log("Ignore '%s' endpoint: not a 'push' target", c.getName());
+        continue;
+      }
+
       // If destination for push is not set assume equal to source.
       for (RefSpec ref : c.getPushRefSpecs()) {
         if (ref.getDestination() == null) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
index eaf5b27..747c0f6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -107,7 +107,7 @@
         try {
           uri = new URIish(url);
         } catch (URISyntaxException e) {
-          repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage());
+          repLog.atWarning().log("adminURL '%s' is invalid: %s", url, e.getMessage());
           continue;
         }
 
@@ -115,13 +115,14 @@
           String path =
               replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
           if (path == null) {
-            repLog.warn("adminURL {} does not contain ${name}", uri);
+            repLog.atWarning().log("adminURL %s does not contain ${name}", uri);
             continue;
           }
 
           uri = uri.setPath(path);
           if (!isSSH(uri)) {
-            repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri);
+            repLog.atWarning().log(
+                "adminURL '%s' is invalid: only SSH and HTTP are supported", uri);
             continue;
           }
         }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
index 4cc9974..5b24bf5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
@@ -152,6 +152,11 @@
   }
 
   @Override
+  public int getDistributionInterval() {
+    return replicationConfig.getDistributionInterval();
+  }
+
+  @Override
   public String getVersion() {
     Hasher hasher = Hashing.murmur3_128().newHasher();
     hasher.putString(replicationConfig.getVersion(), UTF_8);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
index 66130f9..fa81dd0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
@@ -62,34 +62,34 @@
 
   @Override
   public boolean createProject(Project.NameKey project, String head) {
-    repLog.info("Creating project {} on {}", project, uri);
+    repLog.atInfo().log("Creating project %s on %s", project, uri);
     String url = String.format("%s/a/projects/%s", toHttpUri(uri), Url.encode(project.get()));
     try {
       return httpClient
           .execute(new HttpPut(url), new HttpResponseHandler(), getContext())
           .isSuccessful();
     } catch (IOException e) {
-      repLog.error("Couldn't perform project creation on {}", uri, e);
+      repLog.atSevere().withCause(e).log("Couldn't perform project creation on %s", uri);
       return false;
     }
   }
 
   @Override
   public boolean deleteProject(Project.NameKey project) {
-    repLog.info("Deleting project {} on {}", project, uri);
+    repLog.atInfo().log("Deleting project %s on %s", project, uri);
     String url = String.format("%s/a/projects/%s", toHttpUri(uri), Url.encode(project.get()));
     try {
       httpClient.execute(new HttpDelete(url), new HttpResponseHandler(), getContext());
       return true;
     } catch (IOException e) {
-      repLog.error("Couldn't perform project deletion on {}", uri, e);
+      repLog.atSevere().withCause(e).log("Couldn't perform project deletion on %s", uri);
     }
     return false;
   }
 
   @Override
   public boolean updateHead(Project.NameKey project, String newHead) {
-    repLog.info("Updating head of {} on {}", project, uri);
+    repLog.atInfo().log("Updating head of %s on %s", project, uri);
     String url = String.format("%s/a/projects/%s/HEAD", toHttpUri(uri), Url.encode(project.get()));
     try {
       HttpPut req = new HttpPut(url);
@@ -98,7 +98,7 @@
       httpClient.execute(req, new HttpResponseHandler(), getContext());
       return true;
     } catch (IOException e) {
-      repLog.error("Couldn't perform update head on {}", uri, e);
+      repLog.atSevere().withCause(e).log("Couldn't perform update head on %s", uri);
     }
     return false;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
index da960e6..b092363 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -43,9 +43,9 @@
         u.disableRefLog();
         u.link(head);
       }
-      repLog.info("Created local repository: {}", uri);
+      repLog.atInfo().log("Created local repository: %s", uri);
     } catch (IOException e) {
-      repLog.error("Error creating local repository {}", uri.getPath(), e);
+      repLog.atSevere().withCause(e).log("Error creating local repository %s", uri.getPath());
       return false;
     }
     return true;
@@ -55,9 +55,9 @@
   public boolean deleteProject(Project.NameKey project) {
     try {
       recursivelyDelete(new File(uri.getPath()));
-      repLog.info("Deleted local repository: {}", uri);
+      repLog.atInfo().log("Deleted local repository: %s", uri);
     } catch (IOException e) {
-      repLog.error("Error deleting local repository {}:\n", uri.getPath(), e);
+      repLog.atSevere().withCause(e).log("Error deleting local repository %s:\n", uri.getPath());
       return false;
     }
     return true;
@@ -71,7 +71,8 @@
         u.link(newHead);
       }
     } catch (IOException e) {
-      repLog.error("Failed to update HEAD of repository {} to {}", uri.getPath(), newHead, e);
+      repLog.atSevere().withCause(e).log(
+          "Failed to update HEAD of repository %s to %s", uri.getPath(), newHead);
       return false;
     }
     return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index e63a350..ebc8889 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.google.common.flogger.LazyArgs.lazy;
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -37,6 +38,7 @@
 import com.google.gerrit.server.git.ProjectRunnable;
 import com.google.gerrit.server.git.WorkQueue.CanceledWhileRunning;
 import com.google.gerrit.server.ioutil.HexFormat;
+import com.google.gerrit.server.logging.TraceContext;
 import com.google.gerrit.server.permissions.PermissionBackend;
 import com.google.gerrit.server.permissions.PermissionBackend.RefFilterOptions;
 import com.google.gerrit.server.permissions.PermissionBackendException;
@@ -56,6 +58,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.eclipse.jgit.errors.NoRemoteRepositoryException;
@@ -76,7 +79,6 @@
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.Transport;
 import org.eclipse.jgit.transport.URIish;
-import org.slf4j.MDC;
 
 /**
  * A push to remote operation started by {@link GitReferenceUpdatedListener}.
@@ -87,7 +89,7 @@
 class PushOne implements ProjectRunnable, CanceledWhileRunning, UriUpdates {
   private final ReplicationStateListener stateLog;
   static final String ALL_REFS = "..all..";
-  static final String ID_MDC_KEY = "pushOneId";
+  static final String ID_KEY = "pushOneId";
 
   // The string here needs to match the one returned by Git(versions prior to 2014) server.
   // See:
@@ -180,17 +182,16 @@
 
   @Override
   public void cancel() {
-    repLog.info("Replication [{}] to {} was canceled", HexFormat.fromInt(id), getURI());
+    repLog.atInfo().log("Replication [%s] to %s was canceled", HexFormat.fromInt(id), getURI());
     canceledByReplication();
     pool.pushWasCanceled(this);
   }
 
   @Override
   public void setCanceledWhileRunning() {
-    repLog.info(
-        "Replication [{}] to {} was canceled while being executed",
-        HexFormat.fromInt(id),
-        getURI());
+    repLog.atInfo().log(
+        "Replication [%s] to %s was canceled while being executed",
+        HexFormat.fromInt(id), getURI());
     canceledWhileRunning.set(true);
   }
 
@@ -229,7 +230,7 @@
     return maxRetries == 0 || retryCount <= maxRetries;
   }
 
-  private void retryDone() {
+  void retryDone() {
     this.retrying = false;
   }
 
@@ -250,9 +251,9 @@
     if (ALL_REFS.equals(ref)) {
       delta.clear();
       pushAllRefs = true;
-      repLog.trace("Added all refs for replication to {}", uri);
+      repLog.atFinest().log("Added all refs for replication to %s", uri);
     } else if (!pushAllRefs && delta.add(ref)) {
-      repLog.trace("Added ref {} for replication to {}", ref, uri);
+      repLog.atFinest().log("Added ref %s for replication to %s", ref, uri);
     }
   }
 
@@ -330,28 +331,33 @@
   }
 
   private void runPushOperation() {
+    try (TraceContext ctx = TraceContext.open().addTag(ID_KEY, HexFormat.fromInt(id))) {
+      doRunPushOperation();
+    }
+  }
+
+  private void doRunPushOperation() {
     // Lock the queue, and remove ourselves, so we can't be modified once
     // we start replication (instead a new instance, with the same URI, is
     // created and scheduled for a future point in time.)
     //
-    MDC.put(ID_MDC_KEY, HexFormat.fromInt(id));
     RunwayStatus status = pool.requestRunway(this);
     isCollision = false;
     if (!status.isAllowed()) {
       if (status.isCanceled()) {
-        repLog.info("PushOp for replication to {} was canceled and thus won't be rescheduled", uri);
+        repLog.atInfo().log(
+            "PushOp for replication to %s was canceled and thus won't be rescheduled", uri);
       } else {
-        repLog.info(
-            "Rescheduling replication to {} to avoid collision with the in-flight push [{}].",
-            uri,
-            HexFormat.fromInt(status.getInFlightPushId()));
+        repLog.atInfo().log(
+            "Rescheduling replication to %s to avoid collision with the in-flight push [%s].",
+            uri, HexFormat.fromInt(status.getInFlightPushId()));
         pool.reschedule(this, Destination.RetryReason.COLLISION);
         isCollision = true;
       }
       return;
     }
 
-    repLog.info("Replication to {} started...", uri);
+    repLog.atInfo().log("Replication to %s started...", uri);
     Timer1.Context<String> destinationContext = metrics.start(config.getName());
     try {
       long startedAt = destinationContext.getStartTime();
@@ -366,12 +372,9 @@
             config.getName(), projectName.get(), pool.getSlowLatencyThreshold(), elapsed);
       }
       retryDone();
-      repLog.info(
-          "Replication to {} completed in {}ms, {}ms delay, {} retries",
-          uri,
-          elapsed,
-          delay,
-          retryCount);
+      repLog.atInfo().log(
+          "Replication to %s completed in %dms, %dms delay, %d retries",
+          uri, elapsed, delay, retryCount);
     } catch (RepositoryNotFoundException e) {
       stateLog.error(
           "Cannot replicate " + projectName + "; Local repository error: " + e.getMessage(),
@@ -388,7 +391,7 @@
           || msg.contains("unavailable")) {
         createRepository();
       } else {
-        repLog.error("Cannot replicate {}; Remote repository error: {}", projectName, msg);
+        repLog.atSevere().log("Cannot replicate %s; Remote repository error: %s", projectName, msg);
       }
 
     } catch (NoRemoteRepositoryException e) {
@@ -398,10 +401,10 @@
     } catch (TransportException e) {
       Throwable cause = e.getCause();
       if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) {
-        repLog.error("Cannot replicate to {}: {}", uri, cause.getMessage());
+        repLog.atSevere().log("Cannot replicate to %s: %s", uri, cause.getMessage());
       } else if (e instanceof UpdateRefFailureException) {
         updateRefRetryCount++;
-        repLog.error("Cannot replicate to {} due to a lock or write ref failure", uri);
+        repLog.atSevere().log("Cannot replicate to %s due to a lock or write ref failure", uri);
 
         // The remote push operation should be retried.
         if (updateRefRetryCount <= maxUpdateRefRetries) {
@@ -412,17 +415,15 @@
           }
         } else {
           retryDone();
-          repLog.error(
-              "Giving up after {} '{}' failures during replication to {}",
-              updateRefRetryCount,
-              e.getMessage(),
-              uri);
+          repLog.atSevere().log(
+              "Giving up after %d '%s' failures during replication to %s",
+              updateRefRetryCount, e.getMessage(), uri);
         }
       } else {
         if (canceledWhileRunning.get()) {
           logCanceledWhileRunningException(e);
         } else {
-          repLog.error("Cannot replicate to {}", uri, e);
+          repLog.atSevere().withCause(e).log("Cannot replicate to %s", uri);
           // The remote push operation should be retried.
           pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR);
         }
@@ -440,7 +441,7 @@
   }
 
   private void logCanceledWhileRunningException(TransportException e) {
-    repLog.info("Cannot replicate to {}. It was canceled while running", uri, e);
+    repLog.atInfo().withCause(e).log("Cannot replicate to %s. It was canceled while running", uri);
   }
 
   private void createRepository() {
@@ -448,10 +449,11 @@
       try {
         Ref head = git.exactRef(Constants.HEAD);
         if (createProject(projectName, head != null ? getName(head) : null)) {
-          repLog.warn("Missing repository created; retry replication to {}", uri);
+          repLog.atWarning().log("Missing repository created; retry replication to %s", uri);
           pool.reschedule(this, Destination.RetryReason.REPOSITORY_MISSING);
         } else {
-          repLog.warn("Missing repository could not be created when replicating {}", uri);
+          repLog.atWarning().log(
+              "Missing repository could not be created when replicating %s", uri);
         }
       } catch (IOException ioe) {
         stateLog.error(
@@ -498,14 +500,14 @@
     }
 
     if (replConfig.getMaxRefsToLog() == 0 || todo.size() <= replConfig.getMaxRefsToLog()) {
-      repLog.info("Push to {} references: {}", uri, refUpdatesForLogging(todo));
+      repLog.atInfo().log("Push to %s references: %s", uri, lazy(() -> refUpdatesForLogging(todo)));
     } else {
-      repLog.info(
-          "Push to {} references (first {} of {} listed): {}",
+      repLog.atInfo().log(
+          "Push to %s references (first %d of %d listed): %s",
           uri,
           replConfig.getMaxRefsToLog(),
           todo.size(),
-          refUpdatesForLogging(todo.subList(0, replConfig.getMaxRefsToLog())));
+          lazy(() -> refUpdatesForLogging(todo.subList(0, replConfig.getMaxRefsToLog()))));
     }
 
     return tn.push(NullProgressMonitor.INSTANCE, todo);
@@ -540,8 +542,8 @@
 
   private List<RemoteRefUpdate> generateUpdates(Transport tn)
       throws IOException, PermissionBackendException {
-    ProjectState projectState = projectCache.checkedGet(projectName);
-    if (projectState == null) {
+    Optional<ProjectState> projectState = projectCache.get(projectName);
+    if (!projectState.isPresent()) {
       return Collections.emptyList();
     }
 
@@ -550,7 +552,7 @@
     boolean filter;
     PermissionBackend.ForProject forProject = permissionBackend.currentUser().project(projectName);
     try {
-      projectState.checkStatePermitsRead();
+      projectState.get().checkStatePermitsRead();
       forProject.check(ProjectPermission.READ);
       filter = false;
     } catch (AuthException | ResourceConflictException e) {
@@ -570,7 +572,11 @@
         }
         local = n;
       }
-      local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build());
+      local =
+          forProject
+              .filter(local.values(), git, RefFilterOptions.builder().setFilterMeta(true).build())
+              .stream()
+              .collect(toMap(Ref::getName, r -> r));
     }
 
     List<RemoteRefUpdate> remoteUpdatesList =
@@ -587,7 +593,7 @@
     Map<String, Ref> remote = listRemote(tn);
     for (Ref src : local.values()) {
       if (!canPushRef(src.getName(), noPerms)) {
-        repLog.debug("Skipping push of ref {}", src.getName());
+        repLog.atFine().log("Skipping push of ref %s", src.getName());
         continue;
       }
 
@@ -604,7 +610,7 @@
     if (config.isMirror()) {
       for (Ref ref : remote.values()) {
         if (Constants.HEAD.equals(ref.getName())) {
-          repLog.debug("Skipping deletion of {}", ref.getName());
+          repLog.atFine().log("Skipping deletion of %s", ref.getName());
           continue;
         }
         RefSpec spec = matchDst(ref.getName());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
index 7538298..f96c157 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -42,17 +42,14 @@
     OutputStream errStream = sshHelper.newErrorBufferStream();
     try {
       sshHelper.executeRemoteSsh(uri, cmd, errStream);
-      repLog.info("Created remote repository: {}", uri);
+      repLog.atInfo().log("Created remote repository: %s", uri);
     } catch (IOException e) {
-      repLog.error(
-          "Error creating remote repository at {}:\n"
-              + "  Exception: {}\n"
-              + "  Command: {}\n"
-              + "  Output: {}",
-          uri,
-          e,
-          cmd,
-          errStream);
+      repLog.atSevere().log(
+          "Error creating remote repository at %s:\n"
+              + "  Exception: %s\n"
+              + "  Command: %s\n"
+              + "  Output: %s",
+          uri, e, cmd, errStream);
       return false;
     }
     return true;
@@ -65,17 +62,14 @@
     OutputStream errStream = sshHelper.newErrorBufferStream();
     try {
       sshHelper.executeRemoteSsh(uri, cmd, errStream);
-      repLog.info("Deleted remote repository: {}", uri);
+      repLog.atInfo().log("Deleted remote repository: %s", uri);
     } catch (IOException e) {
-      repLog.error(
-          "Error deleting remote repository at {}:\n"
-              + "  Exception: {}\n"
-              + "  Command: {}\n"
-              + "  Output: {}",
-          uri,
-          e,
-          cmd,
-          errStream);
+      repLog.atSevere().log(
+          "Error deleting remote repository at %s:\n"
+              + "  Exception: %s\n"
+              + "  Command: %s\n"
+              + "  Output: %s",
+          uri, e, cmd, errStream);
       return false;
     }
     return true;
@@ -90,16 +84,12 @@
     try {
       sshHelper.executeRemoteSsh(uri, cmd, errStream);
     } catch (IOException e) {
-      repLog.error(
-          "Error updating HEAD of remote repository at {} to {}:\n"
-              + "  Exception: {}\n"
-              + "  Command: {}\n"
-              + "  Output: {}",
-          uri,
-          newHead,
-          e,
-          cmd,
-          errStream);
+      repLog.atSevere().log(
+          "Error updating HEAD of remote repository at %s to %s:\n"
+              + "  Exception: %s\n"
+              + "  Command: %s\n"
+              + "  Output: %s",
+          uri, newHead, e, cmd, errStream);
       return false;
     }
     return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index b978952..8bbb180 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -45,6 +45,13 @@
   boolean isDefaultForceUpdate();
 
   /**
+   * Returns the interval in seconds for running task distribution.
+   *
+   * @return number of seconds, zero if never.
+   */
+  int getDistributionInterval();
+
+  /**
    * Returns the maximum number of ref-specs to log into the replication_log whenever a push
    * operation is completed against a replication end.
    *
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index e99f6b1..2631cbe 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -47,9 +47,9 @@
     try {
       config.load();
     } catch (ConfigInvalidException e) {
-      repLog.error("Config file {} is invalid: {}", cfgPath, e.getMessage(), e);
+      repLog.atSevere().withCause(e).log("Config file %s is invalid: %s", cfgPath, e.getMessage());
     } catch (IOException e) {
-      repLog.error("Cannot read {}: {}", cfgPath, e.getMessage(), e);
+      repLog.atSevere().withCause(e).log("Cannot read %s: %s", cfgPath, e.getMessage());
     }
     this.replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
     this.defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
@@ -86,6 +86,11 @@
   }
 
   @Override
+  public int getDistributionInterval() {
+    return config.getInt("replication", "distributionInterval", 0);
+  }
+
+  @Override
   public int getMaxRefsToLog() {
     return maxRefsToLog;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java
index fed09f9..60a9523 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java
@@ -28,6 +28,6 @@
         systemLog,
         serverInfo,
         ReplicationQueue.REPLICATION_LOG_NAME,
-        new PatternLayout("[%d] [%X{" + PushOne.ID_MDC_KEY + "}] %m%n"));
+        new PatternLayout("[%d] %m%n"));
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 4625407..6af4156 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.Queues;
 import com.google.gerrit.common.UsedAt;
@@ -25,6 +27,7 @@
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.util.logging.NamedFluentLogger;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
@@ -35,9 +38,9 @@
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import org.eclipse.jgit.transport.URIish;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /** Manages automatic replication to remote repositories. */
 public class ReplicationQueue
@@ -47,10 +50,11 @@
         ProjectDeletedListener,
         HeadUpdatedListener {
   static final String REPLICATION_LOG_NAME = "replication_log";
-  static final Logger repLog = LoggerFactory.getLogger(REPLICATION_LOG_NAME);
+  static final NamedFluentLogger repLog = NamedFluentLogger.forName(REPLICATION_LOG_NAME);
 
   private final ReplicationStateListener stateLog;
 
+  private final ReplicationConfig replConfig;
   private final WorkQueue workQueue;
   private final DynamicItem<EventDispatcher> dispatcher;
   private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency
@@ -58,14 +62,17 @@
   private volatile boolean running;
   private volatile boolean replaying;
   private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
+  private Distributor distributor;
 
   @Inject
   ReplicationQueue(
+      ReplicationConfig rc,
       WorkQueue wq,
       Provider<ReplicationDestinations> rd,
       DynamicItem<EventDispatcher> dis,
       ReplicationStateListeners sl,
       ReplicationTasksStorage rts) {
+    replConfig = rc;
     workQueue = wq;
     dispatcher = dis;
     destinations = rd;
@@ -84,15 +91,17 @@
       t.setDaemon(true);
       t.start();
       fireBeforeStartupEvents();
+      distributor = new Distributor(workQueue);
     }
   }
 
   @Override
   public void stop() {
     running = false;
+    distributor.stop();
     int discarded = destinations.get().shutdown();
     if (discarded > 0) {
-      repLog.warn("Canceled {} replication events during shutdown", discarded);
+      repLog.atWarning().log("Canceled %d replication events during shutdown", discarded);
     }
   }
 
@@ -172,7 +181,7 @@
         cfg.schedule(project, refName, uri, state, now);
       }
     } else {
-      repLog.debug("Skipping ref {} on project {}", refName, project.get());
+      repLog.atFine().log("Skipping ref %s on project %s", refName, project.get());
     }
     if (withoutState) {
       state.markAllPushTasksScheduled();
@@ -185,22 +194,46 @@
       replaying = true;
       for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
         if (t == null) {
-          repLog.warn("Encountered null replication event in ReplicationTasksStorage");
+          repLog.atWarning().log("Encountered null replication event in ReplicationTasksStorage");
           continue;
         }
         try {
           fire(new URIish(t.uri), Project.nameKey(t.project), t.ref);
         } catch (URISyntaxException e) {
-          repLog.error("Encountered malformed URI for persisted event %s", t);
+          repLog.atSevere().withCause(e).log("Encountered malformed URI for persisted event %s", t);
         }
       }
     } catch (Throwable e) {
-      repLog.error("Unexpected error while firing pending events", e);
+      repLog.atSevere().withCause(e).log("Unexpected error while firing pending events");
     } finally {
       replaying = false;
     }
   }
 
+  private void pruneCompleted() {
+    // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes.
+    // We also cannot access them by taskId since PushOnes don't have a taskId, they do have
+    // and Id, but it not the id assigned to the task in the queues. The tasks in the queue
+    // do use the same name as returned by toString() though, so that be used to correlate
+    // PushOnes with queue tasks despite their wrappers.
+    Set<String> prunableTaskNames = new HashSet<>();
+    for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
+      prunableTaskNames.addAll(destination.getPrunableTaskNames());
+    }
+
+    for (WorkQueue.Task<?> task : workQueue.getTasks()) {
+      WorkQueue.Task.State state = task.getState();
+      if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) {
+        if (task instanceof WorkQueue.ProjectTask) {
+          if (prunableTaskNames.contains(task.toString())) {
+            repLog.atFine().log("Pruning externally completed task: %s", task);
+            task.cancel(false);
+          }
+        }
+      }
+    }
+  }
+
   @Override
   public void onProjectDeleted(ProjectDeletedListener.Event event) {
     Project.NameKey p = Project.nameKey(event.getProjectName());
@@ -220,7 +253,7 @@
     for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) {
       String eventKey = String.format("%s:%s", event.projectName(), event.refName());
       if (!eventsReplayed.contains(eventKey)) {
-        repLog.info("Firing pending task {}", event);
+        repLog.atInfo().log("Firing pending task %s", event);
         fire(event.projectName(), event.refName());
         eventsReplayed.add(eventKey);
       }
@@ -238,4 +271,49 @@
 
     public abstract String refName();
   }
+
+  protected class Distributor implements WorkQueue.CancelableRunnable {
+    public ScheduledThreadPoolExecutor executor;
+    public ScheduledFuture<?> future;
+
+    public Distributor(WorkQueue wq) {
+      int distributionInterval = replConfig.getDistributionInterval();
+      if (distributionInterval > 0) {
+        executor = wq.createQueue(1, "Replication Distribution", false);
+        future =
+            executor.scheduleWithFixedDelay(
+                this, distributionInterval, distributionInterval, SECONDS);
+      }
+    }
+
+    @Override
+    public void run() {
+      if (!running) {
+        return;
+      }
+      try {
+        firePendingEvents();
+        pruneCompleted();
+      } catch (Exception e) {
+        repLog.atSevere().withCause(e).log("error distributing tasks");
+      }
+    }
+
+    @Override
+    public void cancel() {
+      future.cancel(true);
+    }
+
+    public void stop() {
+      if (executor != null) {
+        cancel();
+        executor.getQueue().remove(this);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "Replication Distributor";
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
index f2d55de..3e73033 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
@@ -31,19 +31,19 @@
   @Override
   public void warn(String msg, ReplicationState... states) {
     stateWriteErr("Warning: " + msg, states);
-    repLog.warn(msg);
+    repLog.atWarning().log(msg);
   }
 
   @Override
   public void error(String msg, ReplicationState... states) {
     stateWriteErr("Error: " + msg, states);
-    repLog.error(msg);
+    repLog.atSevere().log(msg);
   }
 
   @Override
   public void error(String msg, Throwable t, ReplicationState... states) {
     stateWriteErr("Error: " + msg, states);
-    repLog.error(msg, t);
+    repLog.atSevere().withCause(t).log(msg);
   }
 
   private void stateWriteErr(String msg, ReplicationState[] states) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index ead218b..35ecec6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -48,9 +48,9 @@
  * task:
  *
  * <p><code>
- *   .../building/<tmp_name>  new replication tasks under construction
- *   .../running/<sha1>       running replication tasks
- *   .../waiting/<sha1>       outstanding replication tasks
+ *   .../building/<tmp_name>                       new replication tasks under construction
+ *   .../running/<sha1>                            running replication tasks
+ *   .../waiting/<task_sha1>  outstanding replication tasks
  * </code>
  *
  * <p>Tasks are moved atomically via a rename between those directories to indicate the current
@@ -114,12 +114,18 @@
   }
 
   public synchronized void resetAll() {
-    for (ReplicateRefUpdate r : listRunning()) {
+    for (ReplicateRefUpdate r : list(createDir(runningUpdates))) {
       new Task(r).reset();
     }
   }
 
-  public synchronized void finish(UriUpdates uriUpdates) {
+  public boolean isWaiting(UriUpdates uriUpdates) {
+    return uriUpdates.getReplicateRefUpdates().stream()
+        .map(update -> new Task(update))
+        .anyMatch(Task::isWaiting);
+  }
+
+  public void finish(UriUpdates uriUpdates) {
     for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
       new Task(update).finish();
     }
@@ -179,14 +185,12 @@
   @VisibleForTesting
   class Task {
     public final ReplicateRefUpdate update;
-    public final String json;
     public final String taskKey;
     public final Path running;
     public final Path waiting;
 
     public Task(ReplicateRefUpdate update) {
       this.update = update;
-      json = GSON.toJson(update) + "\n";
       String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
       taskKey = sha1(key).name();
       running = createDir(runningUpdates).resolve(taskKey);
@@ -198,6 +202,7 @@
         return taskKey;
       }
 
+      String json = GSON.toJson(update) + "\n";
       try {
         Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null);
         logger.atFine().log("CREATE %s %s", tmp, updateLog());
@@ -218,6 +223,10 @@
       rename(running, waiting);
     }
 
+    public boolean isWaiting() {
+      return Files.exists(waiting);
+    }
+
     public void finish() {
       try {
         logger.atFine().log("DELETE %s %s", running, updateLog());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
index ffa6be1..fdbd5e7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
@@ -58,7 +58,8 @@
       return;
     }
 
-    repLog.warn("Cannot update HEAD of project {} on remote site {}.", project, replicateURI);
+    repLog.atWarning().log(
+        "Cannot update HEAD of project %s on remote site %s.", project, replicateURI);
   }
 
   @Override
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 06f5f48..d606b7b 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -94,6 +94,20 @@
 :	Timeout for SSH connections. If 0, there is no timeout and
         the client waits indefinitely. By default, 2 minutes.
 
+replication.distributionInterval
+:	Interval in seconds for running the replication distributor. When
+	run, the replication distributor will add all persisted waiting tasks
+	to the queue to ensure that externally loaded tasks are visible to
+	the current process. If zero, turn off the replication distributor. By
+	default, zero.
+
+	Turning this on is likely only useful when there are other processes
+	(such as other masters in the same cluster) writing to the same
+	persistence store. To ensure that updates are seen well before their
+	replicationDelay expires when the distributor is used, the recommended
+	value for this is approximately the smallest remote.NAME.replicationDelay
+	divided by 5.
+
 <a name="replication.updateRefErrorMaxRetries">replication.updateRefErrorMaxRetries</a>
 :	Number of times to retry a replication operation if an update
 	ref error is detected.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index c09fcd1..94f0dc4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -41,6 +41,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -245,9 +246,9 @@
     return push;
   }
 
-  private void setupProjectCacheMock() throws IOException {
+  private void setupProjectCacheMock() {
     projectCacheMock = mock(ProjectCache.class);
-    when(projectCacheMock.checkedGet(projectNameKey)).thenReturn(projectStateMock);
+    when(projectCacheMock.get(projectNameKey)).thenReturn(Optional.of(projectStateMock));
   }
 
   private void setupTransportMock() throws NotSupportedException, TransportException {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
index efacae7..79b05cc 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
@@ -65,4 +65,25 @@
     assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
     assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
   }
+
+  @Test
+  public void shouldSkipFetchRefSpecs() throws Exception {
+    FileBasedConfig config = newReplicationConfig();
+    String pushRemote = "pushRemote";
+    final String aRemoteURL = "ssh://somewhere/${name}.git";
+    config.setString("remote", pushRemote, "url", aRemoteURL);
+
+    String fetchRemote = "fetchRemote";
+    config.setString("remote", fetchRemote, "url", aRemoteURL);
+    config.setString("remote", fetchRemote, "fetch", "refs/*:refs/*");
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(newReplicationFileBasedConfig());
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+
+    assertThatIsDestination(destinations.get(0), pushRemote, aRemoteURL);
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index ce87a13..fdea8d0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -46,9 +46,10 @@
     name = "replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
 public class ReplicationIT extends ReplicationDaemon {
+  private static final int TEST_REPLICATION_DELAY = 1;
+  private static final int TEST_REPLICATION_RETRY = 1;
   private static final Duration TEST_TIMEOUT =
-      Duration.ofSeconds(
-          (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60) + 1);
+      Duration.ofSeconds((TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) + 1);
 
   @Inject private DynamicSet<ProjectDeletedListener> deletedListeners;
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
index 67c04b2..31a4f49 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -22,8 +22,13 @@
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.api.projects.BranchInput;
+import com.googlesource.gerrit.plugins.replication.Destination.QueueInfo;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.io.IOException;
 import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -32,6 +37,7 @@
 import java.util.Optional;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
+import org.eclipse.jgit.transport.URIish;
 import org.junit.Test;
 
 /**
@@ -49,7 +55,14 @@
   protected static final int TEST_REPLICATION_MAX_RETRIES = 1;
   protected static final Duration TEST_TASK_FINISH_TIMEOUT =
       Duration.ofSeconds(TEST_TASK_FINISH_SECONDS);
+  private static final Duration MAX_RETRY_WITH_TOLERANCE_TIMEOUT =
+      Duration.ofSeconds(
+          (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
+                  * TEST_REPLICATION_MAX_RETRIES
+              + 10);
   protected ReplicationTasksStorage tasksStorage;
+  private DestinationsCollection destinationCollection;
+  private ReplicationConfig replicationConfig;
 
   @Override
   public void setUpTestPlugin() throws Exception {
@@ -60,6 +73,8 @@
         Optional.of("not-used-project")); // Simulates a full replication.config initialization
     super.setUpTestPlugin();
     tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+    destinationCollection = plugin.getSysInjector().getInstance(DestinationsCollection.class);
+    replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class);
   }
 
   @Test
@@ -270,6 +285,53 @@
     WaitUtil.waitUntil(() -> tasksStorage.listRunning().size() == 0, TEST_TASK_FINISH_TIMEOUT);
   }
 
+  @Test
+  public void shouldCleanupBothTasksAndLocksAfterNewProjectReplication() throws Exception {
+    setReplicationDestination("task_cleanup_locks_project", "replica", ALL_PROJECTS);
+    config.setInt("remote", "task_cleanup_locks_project", "replicationRetry", 0);
+    config.save();
+    reloadConfig();
+    assertThat(tasksStorage.listRunning()).hasSize(0);
+    Project.NameKey sourceProject = createTestProject("task_cleanup_locks_project");
+
+    WaitUtil.waitUntil(
+        () -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")),
+        TEST_NEW_PROJECT_TIMEOUT);
+    WaitUtil.waitUntil(() -> isTaskCleanedUp(), TEST_TASK_FINISH_TIMEOUT);
+  }
+
+  @Test
+  public void shouldCleanupBothTasksAndLocksAfterReplicationCancelledAfterMaxRetries()
+      throws Exception {
+    String projectName = "task_cleanup_locks_project_cancelled";
+    String remoteDestination = "http://invalidurl:9090/";
+    URIish urish = new URIish(remoteDestination + projectName + ".git");
+
+    setReplicationDestination(projectName, "replica", Optional.of(projectName));
+    // replace correct urls with invalid one to trigger retry
+    config.setString("remote", projectName, "url", remoteDestination + "${name}.git");
+    config.setInt("remote", projectName, "replicationMaxRetries", TEST_REPLICATION_MAX_RETRIES);
+    config.save();
+    reloadConfig();
+    Destination destination =
+        destinationCollection.getAll(FilterType.ALL).stream()
+            .filter(dest -> dest.getProjects().contains(projectName))
+            .findFirst()
+            .get();
+
+    createTestProject(projectName);
+
+    WaitUtil.waitUntil(
+        () -> isTaskRescheduled(destination.getQueueInfo(), urish), TEST_NEW_PROJECT_TIMEOUT);
+    // replicationRetry is set to 1 minute which is the minimum value. That's why
+    // should be safe to get the pushOne object from pending because it should be
+    // here for one minute
+    PushOne pushOp = destination.getQueueInfo().pending.get(urish);
+
+    WaitUtil.waitUntil(() -> pushOp.wasCanceled(), MAX_RETRY_WITH_TOLERANCE_TIMEOUT);
+    WaitUtil.waitUntil(() -> isTaskCleanedUp(), TEST_TASK_FINISH_TIMEOUT);
+  }
+
   private void replicateBranchDeletion(boolean mirror) throws Exception {
     setReplicationDestination("foo", "replica", ALL_PROJECTS);
     reloadConfig();
@@ -290,6 +352,21 @@
     assertThat(listWaitingReplicationTasks(branchToDelete)).hasSize(1);
   }
 
+  private boolean isTaskRescheduled(QueueInfo queue, URIish uri) {
+    PushOne pushOne = queue.pending.get(uri);
+    return pushOne == null ? false : pushOne.isRetrying();
+  }
+
+  private boolean isTaskCleanedUp() {
+    Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates");
+    Path runningUpdates = refUpdates.resolve("running");
+    try {
+      return Files.list(runningUpdates).count() == 0;
+    } catch (IOException e) {
+      throw new RuntimeException(e.getMessage(), e);
+    }
+  }
+
   private Stream<ReplicateRefUpdate> waitingChangeReplicationTasksForRemote(
       String changeRef, String remote) {
     return tasksStorage.listWaiting().stream()
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
index 38c2905..d9fbbe5 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
@@ -347,11 +347,11 @@
   }
 
   protected static void assertIsWaiting(Task task) {
-    assertTrue(whiteBoxIsWaiting(task));
+    assertTrue(task.isWaiting());
   }
 
   protected static void assertNotWaiting(Task task) {
-    assertFalse(whiteBoxIsWaiting(task));
+    assertFalse(task.isWaiting());
   }
 
   protected static void assertIsRunning(Task task) {
@@ -366,10 +366,6 @@
     return Files.exists(task.running);
   }
 
-  private static boolean whiteBoxIsWaiting(Task task) {
-    return Files.exists(task.waiting);
-  }
-
   public static URIish getUrish(String uri) {
     try {
       return new URIish(uri);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
index 97992c8..141f739 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -16,6 +16,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -71,10 +72,20 @@
   }
 
   @Test
+  public void canCheckIfUpdateIsWaiting() {
+    storage.create(REF_UPDATE);
+    assertTrue(storage.isWaiting(uriUpdates));
+
+    storage.start(uriUpdates);
+    assertFalse(storage.isWaiting(uriUpdates));
+  }
+
+  @Test
   public void canStartWaitingUpdate() throws Exception {
     storage.create(REF_UPDATE);
     storage.start(uriUpdates);
     assertThat(storage.listWaiting()).isEmpty();
+    assertFalse(storage.isWaiting(uriUpdates));
     assertContainsExactly(storage.listRunning(), REF_UPDATE);
   }
 
@@ -128,6 +139,8 @@
     String keyA = storage.create(REF_UPDATE);
     String keyB = storage.create(updateB);
     assertThat(storage.listWaiting()).hasSize(2);
+    assertTrue(storage.isWaiting(uriUpdates));
+    assertTrue(storage.isWaiting(TestUriUpdates.create(updateB)));
     assertNotEquals(keyA, keyB);
   }
 
@@ -187,6 +200,8 @@
     storage.create(REF_UPDATE);
     storage.create(updateB);
     assertThat(storage.listWaiting()).hasSize(2);
+    assertTrue(storage.isWaiting(uriUpdates));
+    assertTrue(storage.isWaiting(TestUriUpdates.create(updateB)));
   }
 
   @Test
@@ -198,6 +213,8 @@
     String keyB = storage.create(refB);
     assertThat(storage.listWaiting()).hasSize(2);
     assertNotEquals(keyA, keyB);
+    assertTrue(storage.isWaiting(TestUriUpdates.create(refA)));
+    assertTrue(storage.isWaiting(TestUriUpdates.create(refB)));
   }
 
   @Test
@@ -236,6 +253,7 @@
 
     storage.start(uriUpdates);
     assertContainsExactly(storage.listRunning(), REF_UPDATE);
+    assertFalse(storage.isWaiting(uriUpdates));
     assertThat(storage.listWaiting()).isEmpty();
 
     storage.finish(uriUpdates);
@@ -255,6 +273,7 @@
 
     storage.resetAll();
     assertContainsExactly(storage.listWaiting(), REF_UPDATE);
+    assertTrue(storage.isWaiting(uriUpdates));
     assertThat(storage.listRunning()).isEmpty();
   }
 
@@ -267,6 +286,7 @@
     storage.start(uriUpdates);
     assertContainsExactly(storage.listRunning(), REF_UPDATE);
     assertThat(storage.listWaiting()).isEmpty();
+    assertFalse(storage.isWaiting(uriUpdates));
 
     storage.finish(uriUpdates);
     assertNoIncompleteTasks(storage);