Merge branch 'stable-3.0'

* stable-3.0:
  Revert "Revert "Do not reload config when queue is not ready""
  Fix auto-reload test by mocking a running replicationQueue
  Add replication refs-filtering before push
  Create missing repository only at the remote we are pushing to
  Remove the NewProjectCreatedListener implementation
  Consistently handle remote repository creation failures
  Revert "Do not reload config when queue is not ready"
  Do not reload config when queue is not ready
  AbstractConfigTest: Import createTempDirectory static
  Fix creation of missing repository when replicating to a Gerrit server
  Revert "Remove replication event from pending when runway is allowed"
  Trace details of the replication events cancelled
  ReplicationFileBasedConfig: Fix setting default sshConnectionTimeout
  SshHelper: Add class javadoc
  Make more classes and fields public to ease extensibility
  Allow to configure timeout for SSH connection and commands
  StartCommand:  Fix synchronization on non-final field
  Destination: Suppress FutureReturnValueIgnored warning
  Cancel auto-reload runnable at configuration shutdown
  Cancel pending replications upon shutdown
  Remove replication event from pending when runway is allowed

Change-Id: Idfa32db6de2c61eb606cacafce3d2127a8def6dc
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 04e206d..1cd2036 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -51,6 +51,7 @@
   // ReplicationConfig
   private final Provider<ReplicationQueue> replicationQueue;
   private final ScheduledExecutorService autoReloadExecutor;
+  private ScheduledFuture<?> autoReloadRunnable;
 
   @Inject
   public AutoReloadConfigDecorator(
@@ -96,9 +97,13 @@
   private synchronized void reloadIfNeeded() {
     if (isAutoReload()) {
       ReplicationQueue queue = replicationQueue.get();
+
       long lastModified = getLastModified(currentConfig);
       try {
-        if (lastModified > currentConfigTs && lastModified > lastFailedConfigTs) {
+        if (lastModified > currentConfigTs
+            && lastModified > lastFailedConfigTs
+            && queue.isRunning()
+            && !queue.isReplaying()) {
           queue.stop();
           currentConfig = loadConfig();
           currentConfigTs = lastModified;
@@ -145,6 +150,10 @@
 
   @Override
   public synchronized int shutdown() {
+    if (autoReloadRunnable != null) {
+      autoReloadRunnable.cancel(false);
+      autoReloadRunnable = null;
+    }
     return currentConfig.shutdown();
   }
 
@@ -152,8 +161,18 @@
   @Override
   public synchronized void startup(WorkQueue workQueue) {
     currentConfig.startup(workQueue);
-    ScheduledFuture<?> ignoredHere =
+    autoReloadRunnable =
         autoReloadExecutor.scheduleAtFixedRate(
             this::reloadIfNeeded, RELOAD_DELAY, RELOAD_INTERVAL, TimeUnit.SECONDS);
   }
+
+  @Override
+  public synchronized int getSshConnectionTimeout() {
+    return currentConfig.getSshConnectionTimeout();
+  }
+
+  @Override
+  public synchronized int getSshCommandTimeout() {
+    return currentConfig.getSshCommandTimeout();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index eeaf841..058e76a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -71,6 +71,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.eclipse.jgit.lib.Constants;
@@ -225,12 +226,45 @@
   public int shutdown() {
     int cnt = 0;
     if (pool != null) {
-      cnt = pool.shutdownNow().size();
-      pool = null;
+      synchronized (stateLock) {
+        int numPending = pending.size();
+        int numInFlight = inFlight.size();
+
+        if (numPending > 0 || numInFlight > 0) {
+          repLog.warn(
+              "Cancelling replication events (pending={}, inFlight={}) for destination {}",
+              numPending,
+              numInFlight,
+              getRemoteConfigName());
+
+          foreachPushOp(
+              pending,
+              push -> {
+                push.cancel();
+                return null;
+              });
+          pending.clear();
+          foreachPushOp(
+              inFlight,
+              push -> {
+                push.setCanceledWhileRunning();
+                return null;
+              });
+          inFlight.clear();
+        }
+        cnt = pool.shutdownNow().size();
+        pool = null;
+      }
     }
     return cnt;
   }
 
+  private void foreachPushOp(Map<URIish, PushOne> opsMap, Function<PushOne, Void> pushOneFunction) {
+    for (PushOne pushOne : ImmutableList.copyOf(opsMap.values())) {
+      pushOneFunction.apply(pushOne);
+    }
+  }
+
   private boolean shouldReplicate(ProjectState state, CurrentUser user)
       throws PermissionBackendException {
     if (!config.replicateHiddenProjects()
@@ -337,7 +371,7 @@
     if (!config.replicatePermissions()) {
       PushOne e;
       synchronized (stateLock) {
-        e = pending.get(uri);
+        e = getPendingPush(uri);
       }
       if (e == null) {
         try (Repository git = gitManager.openRepository(project)) {
@@ -360,7 +394,7 @@
     }
 
     synchronized (stateLock) {
-      PushOne e = pending.get(uri);
+      PushOne e = getPendingPush(uri);
       if (e == null) {
         e = opFactory.create(project, uri);
         addRef(e, ref);
@@ -378,6 +412,14 @@
     }
   }
 
+  private PushOne getPendingPush(URIish uri) {
+    PushOne e = pending.get(uri);
+    if (e != null && !e.wasCanceled()) {
+      return e;
+    }
+    return null;
+  }
+
   void pushWasCanceled(PushOne pushOp) {
     synchronized (stateLock) {
       URIish uri = pushOp.getURI();
@@ -426,7 +468,7 @@
   void reschedule(PushOne pushOp, RetryReason reason) {
     synchronized (stateLock) {
       URIish uri = pushOp.getURI();
-      PushOne pendingPushOp = pending.get(uri);
+      PushOne pendingPushOp = getPendingPush(uri);
 
       if (pendingPushOp != null) {
         // There is one PushOp instance already pending to same URI.
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index 856ffb1..b2d0de2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -19,7 +19,7 @@
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.transport.RemoteConfig;
 
-class DestinationConfiguration {
+public class DestinationConfiguration {
   static final int DEFAULT_REPLICATION_DELAY = 15;
   static final int DEFAULT_RESCHEDULE_DELAY = 3;
 
@@ -40,7 +40,7 @@
   private final RemoteConfig remoteConfig;
   private final int maxRetries;
 
-  DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) {
+  protected DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) {
     this.remoteConfig = remoteConfig;
     String name = remoteConfig.getName();
     urls = ImmutableList.copyOf(cfg.getStringList("remote", name, "url"));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 09c4820..d4e1148 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -23,6 +23,7 @@
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Sets;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.ResourceConflictException;
 import com.google.gerrit.metrics.Timer1;
@@ -114,6 +115,8 @@
   private final ProjectCache projectCache;
   private final CreateProjectTask.Factory createProjectFactory;
   private final AtomicBoolean canceledWhileRunning;
+  private final TransportFactory transportFactory;
+  private DynamicItem<ReplicationPushFilter> replicationPushFilter;
 
   @Inject
   PushOne(
@@ -129,6 +132,7 @@
       ReplicationMetrics m,
       ProjectCache pc,
       CreateProjectTask.Factory cpf,
+      TransportFactory tf,
       @Assisted Project.NameKey d,
       @Assisted URIish u) {
     gitManager = grm;
@@ -150,18 +154,27 @@
     createProjectFactory = cpf;
     canceledWhileRunning = new AtomicBoolean(false);
     maxRetries = p.getMaxRetries();
+    transportFactory = tf;
+  }
+
+  @Inject(optional = true)
+  public void setReplicationPushFilter(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
+    this.replicationPushFilter = replicationPushFilter;
   }
 
   @Override
   public void cancel() {
-    repLog.info("Replication {} was canceled", getURI());
+    repLog.info("Replication [{}] to {} was canceled", HexFormat.fromInt(id), getURI());
     canceledByReplication();
     pool.pushWasCanceled(this);
   }
 
   @Override
   public void setCanceledWhileRunning() {
-    repLog.info("Replication {} was canceled while being executed", getURI());
+    repLog.info(
+        "Replication [{}] to {} was canceled while being executed",
+        HexFormat.fromInt(id),
+        getURI());
     canceledWhileRunning.set(true);
   }
 
@@ -205,7 +218,7 @@
   }
 
   boolean wasCanceled() {
-    return canceled;
+    return canceled || canceledWhileRunning.get();
   }
 
   URIish getURI() {
@@ -437,7 +450,7 @@
 
   private void runImpl() throws IOException, PermissionBackendException {
     PushResult res;
-    try (Transport tn = Transport.open(git, uri)) {
+    try (Transport tn = transportFactory.open(git, uri)) {
       res = pushVia(tn);
     }
     updateStates(res.getRemoteUpdates());
@@ -506,7 +519,12 @@
       local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build());
     }
 
-    return pushAllRefs ? doPushAll(tn, local) : doPushDelta(local);
+    List<RemoteRefUpdate> remoteUpdatesList =
+        pushAllRefs ? doPushAll(tn, local) : doPushDelta(local);
+
+    return replicationPushFilter == null || replicationPushFilter.get() == null
+        ? remoteUpdatesList
+        : replicationPushFilter.get().filter(projectName.get(), remoteUpdatesList);
   }
 
   private List<RemoteRefUpdate> doPushAll(Transport tn, Map<String, Ref> local)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
index 375ca29..44ce79e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
@@ -21,13 +21,13 @@
 import org.eclipse.jgit.transport.RemoteRefUpdate.Status;
 
 public class RefReplicatedEvent extends RefEvent {
-  static final String TYPE = "ref-replicated";
+  public static final String TYPE = "ref-replicated";
 
-  final String project;
-  final String ref;
-  final String targetNode;
-  final String status;
-  final Status refStatus;
+  public final String project;
+  public final String ref;
+  public final String targetNode;
+  public final String status;
+  public final Status refStatus;
 
   public RefReplicatedEvent(
       String project,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
index c5c56c3..712714a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
@@ -18,7 +18,7 @@
 import com.google.gerrit.server.events.RefEvent;
 
 public class RefReplicationDoneEvent extends RefEvent {
-  static final String TYPE = "ref-replication-done";
+  public static final String TYPE = "ref-replication-done";
 
   final String project;
   final String ref;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index 1c62ab7..929c538 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -47,4 +47,8 @@
   int shutdown();
 
   void startup(WorkQueue workQueue);
+
+  int getSshConnectionTimeout();
+
+  int getSshCommandTimeout();
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationExtensionPointModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationExtensionPointModule.java
new file mode 100644
index 0000000..b92a54a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationExtensionPointModule.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.inject.AbstractModule;
+
+/**
+ * Gerrit libModule for applying a ref-filter for outgoing replications.
+ *
+ * <p>It should be used only when an actual filter is defined, otherwise the default replication
+ * plugin behaviour will be pushing all refs without any filtering.
+ */
+public class ReplicationExtensionPointModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    DynamicItem.itemOf(binder(), ReplicationPushFilter.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 64eacf9..a075f12 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -17,6 +17,8 @@
 import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp;
 import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 
 import com.google.common.base.Strings;
@@ -29,6 +31,7 @@
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.config.ConfigUtil;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
@@ -52,6 +55,7 @@
 @Singleton
 public class ReplicationFileBasedConfig implements ReplicationConfig {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int DEFAULT_SSH_CONNECTION_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes
 
   private List<Destination> destinations;
   private final SitePaths site;
@@ -59,6 +63,8 @@
   private boolean replicateAllOnPluginStart;
   private boolean defaultForceUpdate;
   private int maxRefsToLog;
+  private int sshCommandTimeout;
+  private int sshConnectionTimeout = DEFAULT_SSH_CONNECTION_TIMEOUT_MS;
   private final FileBasedConfig config;
   private final Path pluginDataDir;
 
@@ -124,6 +130,18 @@
 
     maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0);
 
+    sshCommandTimeout =
+        (int) ConfigUtil.getTimeUnit(config, "gerrit", null, "sshCommandTimeout", 0, SECONDS);
+    sshConnectionTimeout =
+        (int)
+            ConfigUtil.getTimeUnit(
+                config,
+                "gerrit",
+                null,
+                "sshConnectionTimeout",
+                DEFAULT_SSH_CONNECTION_TIMEOUT_MS,
+                MILLISECONDS);
+
     ImmutableList.Builder<Destination> dest = ImmutableList.builder();
     for (RemoteConfig c : allRemotes(config)) {
       if (c.getURIs().isEmpty()) {
@@ -308,4 +326,14 @@
       cfg.start(workQueue);
     }
   }
+
+  @Override
+  public int getSshConnectionTimeout() {
+    return sshConnectionTimeout;
+  }
+
+  @Override
+  public int getSshCommandTimeout() {
+    return sshCommandTimeout;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index 9b7e8e6..f9a2b2c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -66,5 +66,7 @@
     EventTypes.register(RefReplicationDoneEvent.TYPE, RefReplicationDoneEvent.class);
     EventTypes.register(ReplicationScheduledEvent.TYPE, ReplicationScheduledEvent.class);
     bind(SshSessionFactory.class).toProvider(ReplicationSshSessionFactoryProvider.class);
+
+    bind(TransportFactory.class).to(TransportFactoryImpl.class).in(Scopes.SINGLETON);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationPushFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationPushFilter.java
new file mode 100644
index 0000000..eb6ba90
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationPushFilter.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.extensions.annotations.ExtensionPoint;
+import java.util.List;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+
+/**
+ * Filter that is invoked before list of remote ref updates is pushed to remote instance.
+ *
+ * <p>It can be used to filter out unwanted updates.
+ */
+@ExtensionPoint
+public interface ReplicationPushFilter {
+
+  public List<RemoteRefUpdate> filter(String projectName, List<RemoteRefUpdate> remoteUpdatesList);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 8484d2a..a0e4b57 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -47,6 +47,7 @@
   private final ReplicationState.Factory replicationStateFactory;
   private final EventsStorage eventsStorage;
   private volatile boolean running;
+  private volatile boolean replaying;
 
   @Inject
   ReplicationQueue(
@@ -82,6 +83,14 @@
     }
   }
 
+  public boolean isRunning() {
+    return running;
+  }
+
+  public boolean isReplaying() {
+    return replaying;
+  }
+
   void scheduleFullSync(Project.NameKey project, String urlMatch, ReplicationState state) {
     scheduleFullSync(project, urlMatch, state, false);
   }
@@ -129,9 +138,14 @@
   }
 
   private void firePendingEvents() {
-    for (EventsStorage.ReplicateRefUpdate e : eventsStorage.list()) {
-      repLog.info("Firing pending event {}", e);
-      onGitReferenceUpdated(e.project, e.ref);
+    replaying = true;
+    try {
+      for (EventsStorage.ReplicateRefUpdate e : eventsStorage.list()) {
+        repLog.info("Firing pending event {}", e);
+        onGitReferenceUpdated(e.project, e.ref);
+      }
+    } finally {
+      replaying = false;
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
index 849cde2..005d983 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
@@ -18,11 +18,11 @@
 import com.google.gerrit.server.events.RefEvent;
 
 public class ReplicationScheduledEvent extends RefEvent {
-  static final String TYPE = "ref-replication-scheduled";
+  public static final String TYPE = "ref-replication-scheduled";
 
-  final String project;
-  final String ref;
-  final String targetNode;
+  public final String project;
+  public final String ref;
+  public final String targetNode;
 
   public ReplicationScheduledEvent(String project, String ref, String targetNode) {
     super(TYPE);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
index cfa95dd..f2d55de 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
@@ -26,7 +26,7 @@
  * state to the stderr console.
  */
 @Singleton
-class ReplicationStateLogger implements ReplicationStateListener {
+public class ReplicationStateLogger implements ReplicationStateListener {
 
   @Override
   public void warn(String msg, ReplicationState... states) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/SshHelper.java b/src/main/java/com/googlesource/gerrit/plugins/replication/SshHelper.java
index 68e9652..d73c101 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/SshHelper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/SshHelper.java
@@ -25,19 +25,23 @@
 import org.eclipse.jgit.util.FS;
 import org.eclipse.jgit.util.io.StreamCopyThread;
 
-class SshHelper {
-  private static final int SSH_REMOTE_TIMEOUT = 120 * 1000; // 2 minutes = 120 * 1000ms
-
+/** Utility class that provides SSH access to remote URIs. */
+public class SshHelper {
   private final Provider<SshSessionFactory> sshSessionFactoryProvider;
+  private final int commandTimeout;
+  private final int connectionTimeout;
 
   @Inject
-  SshHelper(Provider<SshSessionFactory> sshSessionFactoryProvider) {
+  protected SshHelper(
+      ReplicationConfig replicationConfig, Provider<SshSessionFactory> sshSessionFactoryProvider) {
     this.sshSessionFactoryProvider = sshSessionFactoryProvider;
+    this.commandTimeout = replicationConfig.getSshCommandTimeout();
+    this.connectionTimeout = replicationConfig.getSshConnectionTimeout();
   }
 
-  int executeRemoteSsh(URIish uri, String cmd, OutputStream errStream) throws IOException {
+  public int executeRemoteSsh(URIish uri, String cmd, OutputStream errStream) throws IOException {
     RemoteSession ssh = connect(uri);
-    Process proc = ssh.exec(cmd, 0);
+    Process proc = ssh.exec(cmd, commandTimeout);
     proc.getOutputStream().close();
     StreamCopyThread out = new StreamCopyThread(proc.getInputStream(), errStream);
     StreamCopyThread err = new StreamCopyThread(proc.getErrorStream(), errStream);
@@ -54,7 +58,7 @@
     return proc.exitValue();
   }
 
-  OutputStream newErrorBufferStream() {
+  public OutputStream newErrorBufferStream() {
     return new OutputStream() {
       private final StringBuilder out = new StringBuilder();
       private final StringBuilder line = new StringBuilder();
@@ -83,7 +87,7 @@
     };
   }
 
-  RemoteSession connect(URIish uri) throws TransportException {
-    return sshSessionFactoryProvider.get().getSession(uri, null, FS.DETECTED, SSH_REMOTE_TIMEOUT);
+  protected RemoteSession connect(URIish uri) throws TransportException {
+    return sshSessionFactoryProvider.get().getSession(uri, null, FS.DETECTED, connectionTimeout);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactory.java
new file mode 100644
index 0000000..ba14299
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactory.java
@@ -0,0 +1,26 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+
+public interface TransportFactory {
+
+  Transport open(Repository local, URIish uri) throws NotSupportedException, TransportException;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactoryImpl.java b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactoryImpl.java
new file mode 100644
index 0000000..58c1214
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactoryImpl.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+
+public class TransportFactoryImpl implements TransportFactory {
+
+  @Override
+  public Transport open(Repository git, URIish uri)
+      throws NotSupportedException, TransportException {
+    return Transport.open(git, uri);
+  }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 6493761..f58845d 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -86,6 +86,14 @@
 :	Number of refs, that are pushed during replication, to be logged.
 	For printing all refs to the logs, use a value of 0. By default, 0.
 
+gerrit.sshCommandTimeout
+:	Timeout for SSH command execution. If 0, there is no timeout and
+	the client waits indefinitely. By default, 0.
+
+gerrit.sshConnectionTimeout
+:	Timeout for SSH connections. If 0, there is no timeout and
+        the client waits indefinitely. By default, 2 minutes.
+
 replication.lockErrorMaxRetries
 :	Number of times to retry a replication operation if a lock
 	error is detected.
diff --git a/src/main/resources/Documentation/extension-point.md b/src/main/resources/Documentation/extension-point.md
new file mode 100644
index 0000000..345fd8f
--- /dev/null
+++ b/src/main/resources/Documentation/extension-point.md
@@ -0,0 +1,39 @@
+@PLUGIN@ extension points
+==============
+
+The replication plugin exposes an extension point to allow influencing the behaviour of the replication events from another plugin or a script.
+Extension points can be defined from the replication plugin only when it is loaded as [libModule](/config-gerrit.html#gerrit.installModule) and
+implemented by another plugin by declaring a `provided` dependency from the replication plugin.
+
+### Install extension libModule
+
+The replication plugin's extension points are defined in the `c.g.g.p.r.ReplicationExtensionPointModule`
+that needs to be configured as libModule.
+
+Create a symbolic link from `$GERRIT_SITE/plugins/replication.jar` into `$GERRIT_SITE/lib`
+and then add the replication extension module to the `gerrit.config`.
+
+Example:
+
+```
+[gerrit]
+  installModule = com.googlesource.gerrit.plugins.replication.ReplicationExtensionPointModule
+```
+
+> **NOTE**: Use and configuration of the replication plugin as library module requires a Gerrit server restart and does not support hot plugin install or upgrade.
+
+
+### Extension points
+
+* `com.googlesource.gerrit.plugins.replication.ReplicationPushFilter`
+
+  Filter out the ref updates pushed to a remote instance.
+  Only one filter at a time is supported. Filter implementation needs to bind a `DynamicItem`.
+
+  Default: no filtering
+
+  Example:
+
+  ```
+  DynamicItem.bind(binder(),ReplicationPushFilter.class).to(ReplicationPushFilterImpl.class);
+  ```
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
index da533c6..3525129 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import static com.google.common.truth.Truth.assertThat;
+import static java.nio.file.Files.createTempDirectory;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.createNiceMock;
@@ -83,7 +84,7 @@
   }
 
   protected static Path createTempPath(String prefix) throws IOException {
-    return java.nio.file.Files.createTempDirectory(prefix);
+    return createTempDirectory(prefix);
   }
 
   protected FileBasedConfig newReplicationConfig() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
index 1b83625..211cafa 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
@@ -151,6 +151,7 @@
 
   private void setupMocks() {
     replicationQueueMock = createNiceMock(ReplicationQueue.class);
+    expect(replicationQueueMock.isRunning()).andReturn(true);
     replay(replicationQueueMock);
 
     workQueueMock = createNiceMock(WorkQueue.class);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
new file mode 100644
index 0000000..5be6552
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -0,0 +1,384 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.RemoteRefUpdateCollectionMatcher.eqRemoteRef;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.getCurrentArguments;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.eclipse.jgit.lib.Ref.Storage.NEW;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.git.PerThreadRequestScope;
+import com.google.gerrit.server.permissions.PermissionBackend;
+import com.google.gerrit.server.permissions.PermissionBackend.ForProject;
+import com.google.gerrit.server.permissions.PermissionBackend.WithUser;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gerrit.server.project.ProjectState;
+import com.google.gerrit.server.util.IdGenerator;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import junit.framework.AssertionFailedError;
+import org.easymock.IAnswer;
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectIdRef;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefDatabase;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.FetchConnection;
+import org.eclipse.jgit.transport.PushConnection;
+import org.eclipse.jgit.transport.PushResult;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+import org.eclipse.jgit.util.FS;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PushOneTest {
+  private static final int TEST_PUSH_TIMEOUT_SECS = 10;
+
+  private GitRepositoryManager gitRepositoryManagerMock;
+  private Repository repositoryMock;
+  private PermissionBackend permissionBackendMock;
+  private PermissionBackend.WithUser withUserMock;
+  private PermissionBackend.ForProject forProjectMock;
+
+  private Destination destinationMock;
+  private RemoteConfig remoteConfigMock;
+  private RefSpec refSpecMock;
+  private CredentialsFactory credentialsFactory;
+  private PerThreadRequestScope.Scoper threadRequestScoperMock;
+  private ReplicationQueue replicationQueueMock;
+  private IdGenerator idGeneratorMock;
+  private ReplicationStateListeners replicationStateListenersMock;
+  private ReplicationMetrics replicationMetricsMock;
+  private Timer1.Context timerContextMock;
+  private ProjectCache projectCacheMock;
+  private TransportFactory transportFactoryMock;
+  private Transport transportMock;
+  private FetchConnection fetchConnection;
+  private PushConnection pushConnection;
+  private ProjectState projectStateMock;
+  private RefUpdate refUpdateMock;
+  private CreateProjectTask.Factory createProjectTaskFactoryMock;
+  private ReplicationConfig replicationConfigMock;
+  private RefDatabase refDatabaseMock;
+
+  private Project.NameKey projectNameKey;
+  private URIish urish;
+  private List<Ref> localRefs;
+
+  private Map<String, Ref> remoteRefs;
+  private CountDownLatch isCallFinished;
+  private Ref newLocalRef;
+
+  @Before
+  public void setup() throws Exception {
+    projectNameKey = Project.nameKey("fooProject");
+    urish = new URIish("http://foo.com/fooProject.git");
+
+    newLocalRef =
+        new ObjectIdRef.Unpeeled(
+            NEW, "foo", ObjectId.fromString("0000000000000000000000000000000000000001"));
+
+    localRefs = Arrays.asList(newLocalRef);
+
+    Ref remoteRef = new ObjectIdRef.Unpeeled(NEW, "foo", ObjectId.zeroId());
+    remoteRefs = new HashMap<>();
+    remoteRefs.put("fooProject", remoteRef);
+
+    isCallFinished = new CountDownLatch(1);
+
+    setupMocks();
+  }
+
+  private void setupMocks() throws Exception {
+    FileBasedConfig config = new FileBasedConfig(new Config(), new File("/foo"), FS.DETECTED);
+    config.setString("remote", "Replication", "push", "foo");
+
+    setupRefUpdateMock();
+    setupRepositoryMock(config);
+    setupGitRepoManagerMock();
+
+    projectStateMock = createNiceMock(ProjectState.class);
+    forProjectMock = createNiceMock(ForProject.class);
+    setupWithUserMock();
+    setupPermissionBackedMock();
+
+    setupDestinationMock();
+
+    setupRefSpecMock();
+    setupRemoteConfigMock();
+
+    credentialsFactory = createNiceMock(CredentialsFactory.class);
+
+    setupFetchConnectionMock();
+    setupPushConnectionMock();
+    setupRequestScopeMock();
+    replicationQueueMock = createNiceMock(ReplicationQueue.class);
+    idGeneratorMock = createNiceMock(IdGenerator.class);
+    replicationStateListenersMock = createNiceMock(ReplicationStateListeners.class);
+
+    timerContextMock = createNiceMock(Timer1.Context.class);
+    setupReplicationMetricsMock();
+
+    setupTransportMock();
+
+    setupProjectCacheMock();
+
+    replicationConfigMock = createNiceMock(ReplicationConfig.class);
+
+    replay(
+        gitRepositoryManagerMock,
+        refUpdateMock,
+        repositoryMock,
+        permissionBackendMock,
+        destinationMock,
+        remoteConfigMock,
+        credentialsFactory,
+        threadRequestScoperMock,
+        replicationQueueMock,
+        idGeneratorMock,
+        replicationStateListenersMock,
+        replicationMetricsMock,
+        projectCacheMock,
+        timerContextMock,
+        transportFactoryMock,
+        projectStateMock,
+        withUserMock,
+        forProjectMock,
+        fetchConnection,
+        pushConnection,
+        refSpecMock,
+        refDatabaseMock,
+        replicationConfigMock);
+  }
+
+  @Test
+  public void shouldPushAllRefsWhenNoFilters() throws InterruptedException, IOException {
+    shouldPushAllRefsWithDynamicItemFilter(DynamicItem.itemOf(ReplicationPushFilter.class, null));
+  }
+
+  @Test
+  public void shouldPushAllRefsWhenNoFiltersSetup() throws InterruptedException, IOException {
+    shouldPushAllRefsWithDynamicItemFilter(null);
+  }
+
+  private void shouldPushAllRefsWithDynamicItemFilter(
+      DynamicItem<ReplicationPushFilter> replicationPushFilter)
+      throws IOException, NotSupportedException, TransportException, InterruptedException {
+    List<RemoteRefUpdate> expectedUpdates =
+        Arrays.asList(
+            new RemoteRefUpdate(
+                repositoryMock,
+                newLocalRef.getName(),
+                newLocalRef.getObjectId(),
+                "fooProject",
+                false,
+                "fooProject",
+                null));
+
+    PushResult pushResult = new PushResult();
+
+    expect(transportMock.push(anyObject(), eqRemoteRef(expectedUpdates)))
+        .andReturn(pushResult)
+        .once();
+    replay(transportMock);
+
+    PushOne pushOne = createPushOne(replicationPushFilter);
+
+    pushOne.addRef(PushOne.ALL_REFS);
+    pushOne.run();
+
+    isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
+
+    verify(transportMock);
+  }
+
+  @Test
+  public void shouldBlockReplicationUsingPushFilter() throws InterruptedException, IOException {
+    DynamicItem<ReplicationPushFilter> replicationPushFilter =
+        DynamicItem.itemOf(
+            ReplicationPushFilter.class,
+            new ReplicationPushFilter() {
+
+              @Override
+              public List<RemoteRefUpdate> filter(
+                  String projectName, List<RemoteRefUpdate> remoteUpdatesList) {
+                return Collections.emptyList();
+              }
+            });
+
+    // easymock way to check if method was never called
+    expect(transportMock.push(anyObject(), anyObject()))
+        .andThrow(new AssertionFailedError())
+        .anyTimes();
+    replay(transportMock);
+
+    PushOne pushOne = createPushOne(replicationPushFilter);
+
+    pushOne.addRef(PushOne.ALL_REFS);
+    pushOne.run();
+
+    isCallFinished.await(10, TimeUnit.SECONDS);
+
+    verify(transportMock);
+  }
+
+  private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
+    PushOne push =
+        new PushOne(
+            gitRepositoryManagerMock,
+            permissionBackendMock,
+            destinationMock,
+            remoteConfigMock,
+            replicationConfigMock,
+            credentialsFactory,
+            threadRequestScoperMock,
+            idGeneratorMock,
+            replicationStateListenersMock,
+            replicationMetricsMock,
+            projectCacheMock,
+            createProjectTaskFactoryMock,
+            transportFactoryMock,
+            projectNameKey,
+            urish);
+
+    push.setReplicationPushFilter(replicationPushFilter);
+    return push;
+  }
+
+  private void setupProjectCacheMock() throws IOException {
+    projectCacheMock = createNiceMock(ProjectCache.class);
+    expect(projectCacheMock.checkedGet(projectNameKey)).andReturn(projectStateMock);
+  }
+
+  private void setupTransportMock() throws NotSupportedException, TransportException {
+    transportMock = createNiceMock(Transport.class);
+    expect(transportMock.openFetch()).andReturn(fetchConnection);
+    transportFactoryMock = createNiceMock(TransportFactory.class);
+    expect(transportFactoryMock.open(repositoryMock, urish)).andReturn(transportMock).anyTimes();
+  }
+
+  private void setupReplicationMetricsMock() {
+    replicationMetricsMock = createNiceMock(ReplicationMetrics.class);
+    expect(replicationMetricsMock.start(anyObject())).andReturn(timerContextMock);
+  }
+
+  private void setupRequestScopeMock() {
+    threadRequestScoperMock = createNiceMock(PerThreadRequestScope.Scoper.class);
+    expect(threadRequestScoperMock.scope(anyObject()))
+        .andAnswer(
+            new IAnswer<Callable<Object>>() {
+              @SuppressWarnings("unchecked")
+              @Override
+              public Callable<Object> answer() throws Throwable {
+                Callable<Object> originalCall = (Callable<Object>) getCurrentArguments()[0];
+                return new Callable<Object>() {
+
+                  @Override
+                  public Object call() throws Exception {
+                    Object result = originalCall.call();
+                    isCallFinished.countDown();
+                    return result;
+                  }
+                };
+              }
+            })
+        .anyTimes();
+  }
+
+  private void setupPushConnectionMock() {
+    pushConnection = createNiceMock(PushConnection.class);
+    expect(pushConnection.getRefsMap()).andReturn(remoteRefs);
+  }
+
+  private void setupFetchConnectionMock() {
+    fetchConnection = createNiceMock(FetchConnection.class);
+    expect(fetchConnection.getRefsMap()).andReturn(remoteRefs);
+  }
+
+  private void setupRemoteConfigMock() {
+    remoteConfigMock = createNiceMock(RemoteConfig.class);
+    expect(remoteConfigMock.getPushRefSpecs()).andReturn(ImmutableList.of(refSpecMock));
+  }
+
+  private void setupRefSpecMock() {
+    refSpecMock = createNiceMock(RefSpec.class);
+    expect(refSpecMock.matchSource(anyObject(String.class))).andReturn(true);
+    expect(refSpecMock.expandFromSource(anyObject(String.class))).andReturn(refSpecMock);
+    expect(refSpecMock.getDestination()).andReturn("fooProject").anyTimes();
+    expect(refSpecMock.isForceUpdate()).andReturn(false).anyTimes();
+  }
+
+  private void setupDestinationMock() {
+    destinationMock = createNiceMock(Destination.class);
+    expect(destinationMock.requestRunway(anyObject())).andReturn(RunwayStatus.allowed());
+  }
+
+  private void setupPermissionBackedMock() {
+    permissionBackendMock = createNiceMock(PermissionBackend.class);
+    expect(permissionBackendMock.currentUser()).andReturn(withUserMock);
+  }
+
+  private void setupWithUserMock() {
+    withUserMock = createNiceMock(WithUser.class);
+    expect(withUserMock.project(projectNameKey)).andReturn(forProjectMock);
+  }
+
+  private void setupGitRepoManagerMock() throws IOException {
+    gitRepositoryManagerMock = createNiceMock(GitRepositoryManager.class);
+    expect(gitRepositoryManagerMock.openRepository(projectNameKey)).andReturn(repositoryMock);
+  }
+
+  private void setupRepositoryMock(FileBasedConfig config) throws IOException {
+    repositoryMock = createNiceMock(Repository.class);
+    refDatabaseMock = createNiceMock(RefDatabase.class);
+    expect(repositoryMock.getConfig()).andReturn(config).anyTimes();
+    expect(repositoryMock.getRefDatabase()).andReturn(refDatabaseMock);
+    expect(refDatabaseMock.getRefs()).andReturn(localRefs);
+    expect(repositoryMock.updateRef("fooProject")).andReturn(refUpdateMock);
+  }
+
+  private void setupRefUpdateMock() {
+    refUpdateMock = createNiceMock(RefUpdate.class);
+    expect(refUpdateMock.getOldObjectId())
+        .andReturn(ObjectId.fromString("0000000000000000000000000000000000000001"))
+        .anyTimes();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
new file mode 100644
index 0000000..111a792
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
@@ -0,0 +1,64 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.util.Collection;
+import java.util.Objects;
+import org.easymock.EasyMock;
+import org.easymock.IArgumentMatcher;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+
+public class RemoteRefUpdateCollectionMatcher implements IArgumentMatcher {
+  Collection<RemoteRefUpdate> expectedRemoteRefs;
+
+  public static Collection<RemoteRefUpdate> eqRemoteRef(
+      Collection<RemoteRefUpdate> expectedRemoteRefs) {
+    EasyMock.reportMatcher(new RemoteRefUpdateCollectionMatcher(expectedRemoteRefs));
+    return null;
+  }
+
+  public RemoteRefUpdateCollectionMatcher(Collection<RemoteRefUpdate> expectedRemoteRefs) {
+    this.expectedRemoteRefs = expectedRemoteRefs;
+  }
+
+  @Override
+  public boolean matches(Object argument) {
+    if (!(argument instanceof Collection)) return false;
+
+    @SuppressWarnings("unchecked")
+    Collection<RemoteRefUpdate> refs = (Collection<RemoteRefUpdate>) argument;
+
+    if (expectedRemoteRefs.size() != refs.size()) return false;
+    return refs.stream()
+        .allMatch(
+            ref -> expectedRemoteRefs.stream().anyMatch(expectedRef -> compare(ref, expectedRef)));
+  }
+
+  @Override
+  public void appendTo(StringBuffer buffer) {
+    buffer.append("expected:" + expectedRemoteRefs.toString());
+  }
+
+  private boolean compare(RemoteRefUpdate ref, RemoteRefUpdate expectedRef) {
+    return Objects.equals(ref.getRemoteName(), expectedRef.getRemoteName())
+        && Objects.equals(ref.getStatus(), expectedRef.getStatus())
+        && Objects.equals(ref.getExpectedOldObjectId(), expectedRef.getExpectedOldObjectId())
+        && Objects.equals(ref.getNewObjectId(), expectedRef.getNewObjectId())
+        && Objects.equals(ref.isFastForward(), expectedRef.isFastForward())
+        && Objects.equals(ref.getSrcRef(), expectedRef.getSrcRef())
+        && Objects.equals(ref.isForceUpdate(), expectedRef.isForceUpdate())
+        && Objects.equals(ref.getMessage(), expectedRef.getMessage());
+  }
+}