Merge branch 'stable-3.1'

* stable-3.1:
  ReplicationQueue: Check nulls in firePendingEvents
  Log stack trace when an error occur while deleting event
  Append LF to the json string of persisted replication event
  Change default for the replicateOnStartup to false
  Don't lose ref-updated events on plugin restart

Change-Id: Ife23d8746ca1af5060c4d77f8974b707fec4625a
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 4c07f40..c043baf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -66,6 +66,11 @@
   }
 
   @Override
+  public int getDistributionInterval() {
+    return currentConfig.getDistributionInterval();
+  }
+
+  @Override
   public synchronized int getMaxRefsToLog() {
     return currentConfig.getMaxRefsToLog();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 92b07ed..1524c2a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -64,6 +64,7 @@
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -563,7 +564,9 @@
       if (inFlightOp != null) {
         return RunwayStatus.denied(inFlightOp.getId());
       }
-      replicationTasksStorage.get().start(op);
+      if (!replicationTasksStorage.get().start(op)) {
+        return RunwayStatus.deniedExternal();
+      }
       inFlight.put(op.getURI(), op);
     }
     return RunwayStatus.allowed();
@@ -576,6 +579,17 @@
     }
   }
 
+  public Set<String> getPrunableTaskNames() {
+    Set<String> names = new HashSet<>();
+    for (PushOne push : pending.values()) {
+      if (!replicationTasksStorage.get().isWaiting(push)) {
+        repLog.debug("No longer isWaiting, can prune " + push.getURI());
+        names.add(push.toString());
+      }
+    }
+    return names;
+  }
+
   boolean wouldPushProject(Project.NameKey project) {
     if (!shouldReplicate(project)) {
       return false;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
index 2f210e0..0eeab48 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -286,6 +286,11 @@
         continue;
       }
 
+      if (!c.getFetchRefSpecs().isEmpty()) {
+        repLog.info("Ignore '{}' endpoint: not a 'push' target", c.getName());
+        continue;
+      }
+
       // If destination for push is not set assume equal to source.
       for (RefSpec ref : c.getPushRefSpecs()) {
         if (ref.getDestination() == null) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index b46c278..df6cc42 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -319,6 +319,8 @@
     if (!status.isAllowed()) {
       if (status.isCanceled()) {
         repLog.info("PushOp for replication to {} was canceled and thus won't be rescheduled", uri);
+      } else if (status.isExternalInflight()) {
+        repLog.info("PushOp for replication to {} was denied externally", uri);
       } else {
         repLog.info(
             "Rescheduling replication to {} to avoid collision with the in-flight push [{}].",
@@ -522,7 +524,11 @@
         }
         local = n;
       }
-      local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build());
+      local =
+          forProject
+              .filter(local.values(), git, RefFilterOptions.builder().setFilterMeta(true).build())
+              .stream()
+              .collect(toMap(Ref::getName, r -> r));
     }
 
     List<RemoteRefUpdate> remoteUpdatesList =
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index b981bc8..68fe430 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -44,6 +44,13 @@
   boolean isDefaultForceUpdate();
 
   /**
+   * Returns the interval in seconds for running task distribution.
+   *
+   * @return number of seconds, zero if never.
+   */
+  int getDistributionInterval();
+
+  /**
    * Returns the maximum number of ref-specs to log into the replication_log whenever a push
    * operation is completed against a replication end.
    *
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 554e441..d714376 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -76,6 +76,11 @@
   }
 
   @Override
+  public int getDistributionInterval() {
+    return config.getInt("replication", "distributionInterval", 0);
+  }
+
+  @Override
   public int getMaxRefsToLog() {
     return maxRefsToLog;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index e9a60e4..0dcfc95 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
@@ -34,6 +36,8 @@
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import org.eclipse.jgit.transport.URIish;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +54,7 @@
 
   private final ReplicationStateListener stateLog;
 
+  private final ReplicationConfig replConfig;
   private final WorkQueue workQueue;
   private final DynamicItem<EventDispatcher> dispatcher;
   private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency
@@ -57,14 +62,17 @@
   private volatile boolean running;
   private volatile boolean replaying;
   private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
+  private Distributor distributor;
 
   @Inject
   ReplicationQueue(
+      ReplicationConfig rc,
       WorkQueue wq,
       Provider<ReplicationDestinations> rd,
       DynamicItem<EventDispatcher> dis,
       ReplicationStateListeners sl,
       ReplicationTasksStorage rts) {
+    replConfig = rc;
     workQueue = wq;
     dispatcher = dis;
     destinations = rd;
@@ -81,12 +89,14 @@
       replicationTasksStorage.resetAll();
       firePendingEvents();
       fireBeforeStartupEvents();
+      distributor = new Distributor(workQueue);
     }
   }
 
   @Override
   public void stop() {
     running = false;
+    distributor.stop();
     int discarded = destinations.get().shutdown();
     if (discarded > 0) {
       repLog.warn("Canceled {} replication events during shutdown", discarded);
@@ -110,17 +120,17 @@
   @VisibleForTesting
   public void scheduleFullSync(
       Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
-    fire(project, urlMatch, PushOne.ALL_REFS, state, now);
+    fire(project, urlMatch, PushOne.ALL_REFS, state, now, false);
   }
 
   @Override
   public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
-    fire(event.getProjectName(), event.getRefName());
+    fire(event.getProjectName(), event.getRefName(), false);
   }
 
-  private void fire(String projectName, String refName) {
+  private void fire(String projectName, String refName, boolean isPersisted) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
-    fire(Project.nameKey(projectName), null, refName, state, false);
+    fire(Project.nameKey(projectName), null, refName, state, false, isPersisted);
     state.markAllPushTasksScheduled();
   }
 
@@ -129,7 +139,8 @@
       String urlMatch,
       String refName,
       ReplicationState state,
-      boolean now) {
+      boolean now,
+      boolean isPersisted) {
     if (!running) {
       stateLog.warn(
           "Replication plugin did not finish startup before event, event replication is postponed",
@@ -141,8 +152,10 @@
     for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
       if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
         for (URIish uri : cfg.getURIs(project, urlMatch)) {
-          replicationTasksStorage.create(
-              new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+          if (!isPersisted) {
+            replicationTasksStorage.create(
+                new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+          }
           cfg.schedule(project, refName, uri, state, now);
         }
       }
@@ -158,7 +171,7 @@
         String eventKey = String.format("%s:%s", t.project, t.ref);
         if (!eventsReplayed.contains(eventKey)) {
           repLog.info("Firing pending task {}", eventKey);
-          fire(t.project, t.ref);
+          fire(t.project, t.ref, true);
           eventsReplayed.add(eventKey);
         }
       }
@@ -167,6 +180,30 @@
     }
   }
 
+  private void pruneCompleted() {
+    // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes.
+    // We also cannot access them by taskId since PushOnes don't have a taskId, they do have
+    // and Id, but it not the id assigned to the task in the queues. The tasks in the queue
+    // do use the same name as returned by toString() though, so that be used to correlate
+    // PushOnes with queue tasks despite their wrappers.
+    Set<String> prunableTaskNames = new HashSet<>();
+    for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
+      prunableTaskNames.addAll(destination.getPrunableTaskNames());
+    }
+
+    for (WorkQueue.Task<?> task : workQueue.getTasks()) {
+      WorkQueue.Task.State state = task.getState();
+      if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) {
+        if (task instanceof WorkQueue.ProjectTask) {
+          if (prunableTaskNames.contains(task.toString())) {
+            repLog.debug("Pruning externally completed task:" + task);
+            task.cancel(false);
+          }
+        }
+      }
+    }
+  }
+
   @Override
   public void onProjectDeleted(ProjectDeletedListener.Event event) {
     Project.NameKey p = Project.nameKey(event.getProjectName());
@@ -187,7 +224,7 @@
       String eventKey = String.format("%s:%s", event.projectName(), event.refName());
       if (!eventsReplayed.contains(eventKey)) {
         repLog.info("Firing pending task {}", event);
-        fire(event.projectName(), event.refName());
+        fire(event.projectName(), event.refName(), false);
         eventsReplayed.add(eventKey);
       }
     }
@@ -204,4 +241,49 @@
 
     public abstract String refName();
   }
+
+  protected class Distributor implements WorkQueue.CancelableRunnable {
+    public ScheduledThreadPoolExecutor executor;
+    public ScheduledFuture<?> future;
+
+    public Distributor(WorkQueue wq) {
+      int distributionInterval = replConfig.getDistributionInterval();
+      if (distributionInterval > 0) {
+        executor = wq.createQueue(1, "Replication Distribution", false);
+        future =
+            executor.scheduleWithFixedDelay(
+                this, distributionInterval, distributionInterval, SECONDS);
+      }
+    }
+
+    @Override
+    public void run() {
+      if (!running) {
+        return;
+      }
+      try {
+        firePendingEvents();
+        pruneCompleted();
+      } catch (Exception e) {
+        repLog.error("error distributing tasks", e);
+      }
+    }
+
+    @Override
+    public void cancel() {
+      future.cancel(true);
+    }
+
+    public void stop() {
+      if (executor != null) {
+        cancel();
+        executor.getQueue().remove(this);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "Replication Distributor";
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 3e6c4d4..d60bd11 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -25,6 +25,7 @@
 import com.google.inject.Singleton;
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
@@ -46,13 +47,19 @@
  * task:
  *
  * <p><code>
- *   .../building/<tmp_name>  new replication tasks under construction
- *   .../running/<sha1>       running replication tasks
- *   .../waiting/<sha1>       outstanding replication tasks
+ *   .../building/<tmp_name>                       new replication tasks under construction
+ *   .../running/<uri_sha1>/                       lock for URI
+ *   .../running/<uri_sha1>/<task_sha1>            running replication tasks
+ *   .../waiting/<task_sha1_NN_shard>/<task_sha1>  outstanding replication tasks
  * </code>
  *
+ * <p>The URI lock is acquired by creating the directory and released by removing it.
+ *
  * <p>Tasks are moved atomically via a rename between those directories to indicate the current
  * state of each task.
+ *
+ * <p>Note: The .../waiting/<task_sha1_NN_shard> directories are never removed. This helps prevent
+ * failures when moving tasks to and from the shard directories from different hosts concurrently.
  */
 @Singleton
 public class ReplicationTasksStorage {
@@ -60,26 +67,50 @@
 
   private boolean disableDeleteForTesting;
 
-  public static class ReplicateRefUpdate {
-    public final String project;
+  public static class ReplicateRefUpdate extends UriUpdate {
     public final String ref;
-    public final String uri;
-    public final String remote;
 
-    public ReplicateRefUpdate(PushOne push, String ref) {
-      this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName());
+    public ReplicateRefUpdate(UriUpdate update, String ref) {
+      this(update.project, ref, update.uri, update.remote);
     }
 
     public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
-      this.project = project;
+      this(project, ref, uri.toASCIIString(), remote);
+    }
+
+    protected ReplicateRefUpdate(String project, String ref, String uri, String remote) {
+      super(project, uri, remote);
       this.ref = ref;
-      this.uri = uri.toASCIIString();
+    }
+
+    @Override
+    public String toString() {
+      return "ref-update " + ref + " (" + super.toString() + ")";
+    }
+  }
+
+  public static class UriUpdate {
+    public final String project;
+    public final String uri;
+    public final String remote;
+
+    public UriUpdate(PushOne push) {
+      this(push.getProjectNameKey().get(), push.getURI(), push.getRemoteName());
+    }
+
+    public UriUpdate(String project, URIish uri, String remote) {
+      this(project, uri.toASCIIString(), remote);
+    }
+
+    public UriUpdate(String project, String uri, String remote) {
+      this.project = project;
+      this.uri = uri;
       this.remote = remote;
     }
 
     @Override
     public String toString() {
-      return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
+      return "uri-update " + project + " uri:" + uri + " remote:" + remote;
     }
   }
 
@@ -107,28 +138,60 @@
     this.disableDeleteForTesting = deleteDisabled;
   }
 
-  public synchronized void start(PushOne push) {
-    for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).start();
+  public synchronized boolean start(PushOne push) {
+    UriLock lock = new UriLock(push);
+    if (!lock.acquire()) {
+      return false;
     }
+
+    boolean started = false;
+    for (String ref : push.getRefs()) {
+      started = new Task(lock, ref).start() || started;
+    }
+
+    if (!started) { // No tasks left, likely replicated externally
+      lock.release();
+    }
+    return started;
   }
 
   public synchronized void reset(PushOne push) {
+    UriLock lock = new UriLock(push);
     for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).reset();
+      new Task(lock, ref).reset();
     }
+    lock.release();
   }
 
   public synchronized void resetAll() {
-    for (ReplicateRefUpdate r : listRunning()) {
-      new Task(r).reset();
+    try (DirectoryStream<Path> dirs = Files.newDirectoryStream(createDir(runningUpdates))) {
+      for (Path dir : dirs) {
+        UriLock lock = null;
+        for (ReplicateRefUpdate u : list(dir)) {
+          if (lock == null) {
+            lock = new UriLock(u);
+          }
+          new Task(u).reset();
+        }
+        if (lock != null) {
+          lock.release();
+        }
+      }
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Error while aborting running tasks");
     }
   }
 
-  public synchronized void finish(PushOne push) {
-    for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).finish();
+  public boolean isWaiting(PushOne push) {
+    return push.getRefs().stream().map(ref -> new Task(push, ref)).anyMatch(Task::isWaiting);
+  }
+
+  public void finish(PushOne push) {
+    UriLock lock = new UriLock(push);
+    for (ReplicateRefUpdate r : list(lock.runningDir)) {
+      new Task(lock, r.ref).finish();
     }
+    lock.release();
   }
 
   public synchronized List<ReplicateRefUpdate> listWaiting() {
@@ -180,6 +243,53 @@
     }
   }
 
+  private class UriLock {
+    public final UriUpdate update;
+    public final String uriKey;
+    public final Path runningDir;
+
+    public UriLock(PushOne push) {
+      this(new UriUpdate(push));
+    }
+
+    public UriLock(UriUpdate update) {
+      this.update = update;
+      uriKey = sha1(update.uri).name();
+      runningDir = createDir(runningUpdates).resolve(uriKey);
+    }
+
+    public boolean acquire() {
+      try {
+        logger.atFine().log("MKDIR %s %s", runningDir, updateLog());
+        Files.createDirectory(runningDir);
+        return true;
+      } catch (FileAlreadyExistsException e) {
+        return false; // already running, likely externally
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while starting uri %s", uriKey);
+        return true; // safer to risk a duplicate than to skip it
+      }
+    }
+
+    public void release() {
+      if (disableDeleteForTesting) {
+        logger.atFine().log("DELETE %s %s DISABLED", runningDir, updateLog());
+        return;
+      }
+
+      try {
+        logger.atFine().log("DELETE %s %s", runningDir, updateLog());
+        Files.delete(runningDir);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while releasing uri %s", uriKey);
+      }
+    }
+
+    private String updateLog() {
+      return String.format("(%s => %s)", update.project, update.uri);
+    }
+  }
+
   private class Task {
     public final ReplicateRefUpdate update;
     public final String json;
@@ -187,13 +297,21 @@
     public final Path running;
     public final Path waiting;
 
-    public Task(ReplicateRefUpdate update) {
-      this.update = update;
+    public Task(ReplicateRefUpdate r) {
+      this(new UriLock(r), r.ref);
+    }
+
+    public Task(PushOne push, String ref) {
+      this(new UriLock(push), ref);
+    }
+
+    public Task(UriLock lock, String ref) {
+      update = new ReplicateRefUpdate(lock.update, ref);
       json = GSON.toJson(update) + "\n";
       String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
       taskKey = sha1(key).name();
-      running = createDir(runningUpdates).resolve(taskKey);
-      waiting = createDir(waitingUpdates).resolve(taskKey);
+      running = lock.runningDir.resolve(taskKey);
+      waiting = createDir(waitingUpdates.resolve(taskKey.substring(0, 2))).resolve(taskKey);
     }
 
     public String create() {
@@ -213,14 +331,19 @@
       return taskKey;
     }
 
-    public void start() {
+    public boolean start() {
       rename(waiting, running);
+      return Files.exists(running);
     }
 
     public void reset() {
       rename(running, waiting);
     }
 
+    public boolean isWaiting() {
+      return Files.exists(waiting);
+    }
+
     public void finish() {
       if (disableDeleteForTesting) {
         logger.atFine().log("DELETE %s %s DISABLED", running, updateLog());
@@ -231,7 +354,7 @@
         logger.atFine().log("DELETE %s %s", running, updateLog());
         Files.delete(running);
       } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+        logger.atSevere().withCause(e).log("Error while finishing task %s", taskKey);
       }
     }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
index bcb1e2f..f7071d8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
@@ -27,6 +27,10 @@
     return new RunwayStatus(false, inFlightPushId);
   }
 
+  public static RunwayStatus deniedExternal() {
+    return new RunwayStatus(false, -1);
+  }
+
   private final boolean allowed;
   private final int inFlightPushId;
 
@@ -43,6 +47,10 @@
     return !allowed && inFlightPushId == 0;
   }
 
+  public boolean isExternalInflight() {
+    return !allowed && inFlightPushId == -1;
+  }
+
   public int getInFlightPushId() {
     return inFlightPushId;
   }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 7233061..19a478d 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -94,6 +94,20 @@
 :	Timeout for SSH connections. If 0, there is no timeout and
         the client waits indefinitely. By default, 2 minutes.
 
+replication.distributionInterval
+:	Interval in seconds for running the replication distributor. When
+	run, the replication distributor will add all persisted waiting tasks
+	to the queue to ensure that externally loaded tasks are visible to
+	the current process. If zero, turn off the replication distributor. By
+	default, zero.
+
+	Turning this on is likely only useful when there are other processes
+	(such as other masters in the same cluster) writing to the same
+	persistence store. To ensure that updates are seen well before their
+	replicationDelay expires when the distributor is used, the recommended
+	value for this is approximately the smallest remote.NAME.replicationDelay
+	divided by 5.
+
 replication.lockErrorMaxRetries
 :	Number of times to retry a replication operation if a lock
 	error is detected.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
index f2b027c..50ae385 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
@@ -66,6 +66,27 @@
     assertThatIsDestination(destinations.get(1), remoteName2, remoteUrl2);
   }
 
+  @Test
+  public void shouldSkipFetchRefSpecs() throws Exception {
+    FileBasedConfig config = newReplicationConfig();
+    String pushRemote = "pushRemote";
+    final String aRemoteURL = "ssh://somewhere/${name}.git";
+    config.setString("remote", pushRemote, "url", aRemoteURL);
+
+    String fetchRemote = "fetchRemote";
+    config.setString("remote", fetchRemote, "url", aRemoteURL);
+    config.setString("remote", fetchRemote, "fetch", "refs/*:refs/*");
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(newReplicationFileBasedConfig());
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+
+    assertThatIsDestination(destinations.get(0), pushRemote, aRemoteURL);
+  }
+
   private ReplicationFileBasedConfig newReplicationFileBasedConfig() {
     ReplicationFileBasedConfig replicationConfig =
         new ReplicationFileBasedConfig(sitePaths, pluginDataPath);