Merge branch 'stable-3.0' into stable-3.1

* stable-3.0:
  Move storage portion of replicateBranchDeletion ITs
  Refactor Replication*IT tests to share a base class
  ReplicationIT: Add shouldMatch* e2e tests
  ReplicationStorageIT: Move shouldMatch* tests from ReplicationIT
  ReplicationStorageIT: Add shouldFire*ChangeRefs tests
  Move storage-based ITs into ReplicationStorageIT
  ReplicationQueue: Remove unused method

This change does not try to reimpose the breakdown of tests that was
done in 3.0. That will be done in follow up change(s) to improve
reviewability of this change.

Change-Id: I81d27fd47da8eecad3aca36d8e6400679fb564a3
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
index acbf763..f3d2103 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 
 public interface AdminApi {
   public boolean createProject(Project.NameKey project, String head);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index a43d7d9..fe5dbad 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -14,133 +14,46 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Multimap;
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.common.FileUtil;
-import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
 import com.google.gerrit.extensions.annotations.PluginName;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
-import com.google.inject.Provider;
 import com.google.inject.Singleton;
-import java.io.IOException;
 import java.nio.file.Path;
-import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import org.eclipse.jgit.errors.ConfigInvalidException;
-import org.eclipse.jgit.transport.URIish;
+import org.eclipse.jgit.lib.Config;
 
 @Singleton
-public class AutoReloadConfigDecorator implements ReplicationConfig {
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+public class AutoReloadConfigDecorator implements ReplicationConfig, LifecycleListener {
   private static final long RELOAD_DELAY = 120;
   private static final long RELOAD_INTERVAL = 60;
 
-  private volatile ReplicationFileBasedConfig currentConfig;
-  private long currentConfigTs;
-  private long lastFailedConfigTs;
+  private volatile ReplicationConfig currentConfig;
 
-  private final SitePaths site;
-  private final Destination.Factory destinationFactory;
-  private final Path pluginDataDir;
-  // Use Provider<> instead of injecting the ReplicationQueue because of circular dependency with
-  // ReplicationConfig
-  private final Provider<ReplicationQueue> replicationQueue;
   private final ScheduledExecutorService autoReloadExecutor;
   private ScheduledFuture<?> autoReloadRunnable;
-
-  private volatile boolean shuttingDown;
+  private final AutoReloadRunnable reloadRunner;
 
   @Inject
   public AutoReloadConfigDecorator(
-      SitePaths site,
-      Destination.Factory destinationFactory,
-      Provider<ReplicationQueue> replicationQueue,
-      @PluginData Path pluginDataDir,
       @PluginName String pluginName,
-      WorkQueue workQueue)
-      throws ConfigInvalidException, IOException {
-    this.site = site;
-    this.destinationFactory = destinationFactory;
-    this.pluginDataDir = pluginDataDir;
-    this.currentConfig = loadConfig();
-    this.currentConfigTs = getLastModified(currentConfig);
-    this.replicationQueue = replicationQueue;
+      WorkQueue workQueue,
+      @MainReplicationConfig ReplicationConfig replicationConfig,
+      AutoReloadRunnable reloadRunner,
+      EventBus eventBus) {
+    this.currentConfig = replicationConfig;
     this.autoReloadExecutor = workQueue.createQueue(1, pluginName + "_auto-reload-config");
-  }
-
-  private static long getLastModified(ReplicationFileBasedConfig cfg) {
-    return FileUtil.lastModified(cfg.getCfgPath());
-  }
-
-  private ReplicationFileBasedConfig loadConfig() throws ConfigInvalidException, IOException {
-    return new ReplicationFileBasedConfig(site, destinationFactory, pluginDataDir);
-  }
-
-  private synchronized boolean isAutoReload() {
-    return currentConfig.getConfig().getBoolean("gerrit", "autoReload", false);
-  }
-
-  @Override
-  public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
-    reloadIfNeeded();
-    return currentConfig.getDestinations(uri, project, ref);
-  }
-
-  @Override
-  public synchronized List<Destination> getDestinations(FilterType filterType) {
-    return currentConfig.getDestinations(filterType);
-  }
-
-  @Override
-  public synchronized Multimap<Destination, URIish> getURIs(
-      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
-    return currentConfig.getURIs(remoteName, projectName, filterType);
-  }
-
-  private synchronized void reloadIfNeeded() {
-    reload(false);
+    this.reloadRunner = reloadRunner;
+    eventBus.register(this);
   }
 
   @VisibleForTesting
-  public void forceReload() {
-    reload(true);
-  }
-
-  private void reload(boolean force) {
-    if (force || isAutoReload()) {
-      ReplicationQueue queue = replicationQueue.get();
-
-      long lastModified = getLastModified(currentConfig);
-      try {
-        if (force
-            || (!shuttingDown
-                && lastModified > currentConfigTs
-                && lastModified > lastFailedConfigTs
-                && queue.isRunning()
-                && !queue.isReplaying())) {
-          queue.stop();
-          currentConfig = loadConfig();
-          currentConfigTs = lastModified;
-          lastFailedConfigTs = 0;
-          logger.atInfo().log(
-              "Configuration reloaded: %d destinations",
-              currentConfig.getDestinations(FilterType.ALL).size());
-        }
-      } catch (Exception e) {
-        logger.atSevere().withCause(e).log(
-            "Cannot reload replication configuration: keeping existing settings");
-        lastFailedConfigTs = lastModified;
-        return;
-      } finally {
-        queue.start();
-      }
-    }
+  public void reload() {
+    reloadRunner.reload();
   }
 
   @Override
@@ -159,45 +72,36 @@
   }
 
   @Override
-  public synchronized boolean isEmpty() {
-    return currentConfig.isEmpty();
-  }
-
-  @Override
   public Path getEventsDirectory() {
     return currentConfig.getEventsDirectory();
   }
 
-  /* shutdown() cannot be set as a synchronized method because
-   * it may need to wait for pending events to complete;
-   * e.g. when enabling the drain of replication events before
-   * shutdown.
-   *
-   * As a rule of thumb for synchronized methods, because they
-   * implicitly define a critical section and associated lock,
-   * they should never hold waiting for another resource, otherwise
-   * the risk of deadlock is very high.
-   *
-   * See more background about deadlocks, what they are and how to
-   * prevent them at: https://en.wikipedia.org/wiki/Deadlock
-   */
   @Override
-  public int shutdown() {
-    this.shuttingDown = true;
-    if (autoReloadRunnable != null) {
-      autoReloadRunnable.cancel(false);
-      autoReloadRunnable = null;
-    }
-    return currentConfig.shutdown();
+  public synchronized void start() {
+    autoReloadRunnable =
+        autoReloadExecutor.scheduleAtFixedRate(
+            reloadRunner, RELOAD_DELAY, RELOAD_INTERVAL, TimeUnit.SECONDS);
   }
 
   @Override
-  public synchronized void startup(WorkQueue workQueue) {
-    shuttingDown = false;
-    currentConfig.startup(workQueue);
-    autoReloadRunnable =
-        autoReloadExecutor.scheduleAtFixedRate(
-            this::reloadIfNeeded, RELOAD_DELAY, RELOAD_INTERVAL, TimeUnit.SECONDS);
+  public synchronized void stop() {
+    if (autoReloadRunnable != null) {
+      if (!autoReloadRunnable.cancel(true)) {
+        throw new IllegalStateException(
+            "Unable to cancel replication reload task: cannot guarantee orderly shutdown");
+      }
+      autoReloadRunnable = null;
+    }
+  }
+
+  @Override
+  public String getVersion() {
+    return currentConfig.getVersion();
+  }
+
+  @Subscribe
+  public void onReload(ReplicationConfig newConfig) {
+    currentConfig = newConfig;
   }
 
   @Override
@@ -209,4 +113,9 @@
   public synchronized int getSshCommandTimeout() {
     return currentConfig.getSshCommandTimeout();
   }
+
+  @Override
+  public Config getConfig() {
+    return currentConfig.getConfig();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java
new file mode 100644
index 0000000..71f7c67
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java
@@ -0,0 +1,79 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import java.util.List;
+
+public class AutoReloadRunnable implements Runnable {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final EventBus eventBus;
+  private final Provider<ObservableQueue> queueObserverProvider;
+  private final ConfigParser configParser;
+  private ReplicationConfig loadedConfig;
+  private Provider<ReplicationConfig> replicationConfigProvider;
+  private String loadedConfigVersion;
+  private String lastFailedConfigVersion;
+
+  @Inject
+  public AutoReloadRunnable(
+      ConfigParser configParser,
+      @MainReplicationConfig Provider<ReplicationConfig> replicationConfigProvider,
+      EventBus eventBus,
+      Provider<ObservableQueue> queueObserverProvider) {
+    this.replicationConfigProvider = replicationConfigProvider;
+    this.loadedConfig = replicationConfigProvider.get();
+    this.loadedConfigVersion = loadedConfig.getVersion();
+    this.lastFailedConfigVersion = "";
+    this.eventBus = eventBus;
+    this.queueObserverProvider = queueObserverProvider;
+    this.configParser = configParser;
+  }
+
+  @Override
+  public synchronized void run() {
+    String pendingConfigVersion = loadedConfig.getVersion();
+    ObservableQueue queue = queueObserverProvider.get();
+    if (pendingConfigVersion.equals(loadedConfigVersion)
+        || pendingConfigVersion.equals(lastFailedConfigVersion)
+        || !queue.isRunning()
+        || queue.isReplaying()) {
+      return;
+    }
+
+    reload();
+  }
+
+  synchronized void reload() {
+    String pendingConfigVersion = loadedConfig.getVersion();
+    try {
+      ReplicationConfig newConfig = replicationConfigProvider.get();
+      final List<RemoteConfiguration> newValidDestinations =
+          configParser.parseRemotes(newConfig.getConfig());
+      loadedConfig = newConfig;
+      loadedConfigVersion = newConfig.getVersion();
+      lastFailedConfigVersion = "";
+      eventBus.post(newValidDestinations);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log(
+          "Cannot reload replication configuration: keeping existing settings");
+      lastFailedConfigVersion = pendingConfigVersion;
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
index 98f364d..5bae0af 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
@@ -31,11 +31,10 @@
   private final AtomicReference<SecureCredentialsFactory> secureCredentialsFactory;
   private volatile long secureCredentialsFactoryLoadTs;
   private final SitePaths site;
-  private ReplicationFileBasedConfig config;
+  private ReplicationConfig config;
 
   @Inject
-  public AutoReloadSecureCredentialsFactoryDecorator(
-      SitePaths site, ReplicationFileBasedConfig config)
+  public AutoReloadSecureCredentialsFactoryDecorator(SitePaths site, ReplicationConfig config)
       throws ConfigInvalidException, IOException {
     this.site = site;
     this.config = config;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java
new file mode 100644
index 0000000..29ea706
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.util.List;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
+
+/** Parser for parsing {@link Config} to a collection of {@link RemoteConfiguration} objects */
+public interface ConfigParser {
+
+  /**
+   * parse the new replication config
+   *
+   * @param config new configuration to parse
+   * @return List of parsed {@link RemoteConfiguration}
+   * @throws ConfigInvalidException if the new configuration is not valid.
+   */
+  List<RemoteConfiguration> parseRemotes(Config config) throws ConfigInvalidException;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
index a8dede3..fa26e82 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
@@ -16,8 +16,8 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
@@ -31,7 +31,7 @@
   }
 
   private final RemoteConfig config;
-  private final ReplicationConfig replicationConfig;
+  private final DestinationsCollection destinations;
   private final DynamicItem<AdminApiFactory> adminApiFactory;
   private final Project.NameKey project;
   private final String head;
@@ -39,21 +39,20 @@
   @Inject
   CreateProjectTask(
       RemoteConfig config,
-      ReplicationConfig replicationConfig,
+      DestinationsCollection destinations,
       DynamicItem<AdminApiFactory> adminApiFactory,
       @Assisted Project.NameKey project,
       @Assisted String head) {
     this.config = config;
-    this.replicationConfig = replicationConfig;
+    this.destinations = destinations;
     this.adminApiFactory = adminApiFactory;
     this.project = project;
     this.head = head;
   }
 
   public boolean create() {
-    return replicationConfig
-        .getURIs(Optional.of(config.getName()), project, FilterType.PROJECT_CREATION).values()
-        .stream()
+    return destinations.getURIs(Optional.of(config.getName()), project, FilterType.PROJECT_CREATION)
+        .values().stream()
         .map(u -> createProject(u, project, head))
         .reduce(true, (a, b) -> a && b);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
index f9b2ad7..4617672 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
@@ -16,8 +16,8 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.ioutil.HexFormat;
 import com.google.gerrit.server.util.IdGenerator;
 import com.google.inject.Inject;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 3b8208b..35470eb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -26,13 +26,13 @@
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Lists;
 import com.google.gerrit.common.data.GroupReference;
+import com.google.gerrit.entities.AccountGroup;
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.extensions.config.FactoryModule;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
-import com.google.gerrit.reviewdb.client.AccountGroup;
-import com.google.gerrit.reviewdb.client.Branch;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.reviewdb.client.RefNames;
 import com.google.gerrit.server.CurrentUser;
 import com.google.gerrit.server.PluginUser;
 import com.google.gerrit.server.account.GroupBackend;
@@ -60,7 +60,6 @@
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.servlet.RequestScoped;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
-import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
@@ -554,6 +553,7 @@
             postReplicationFailedEvent(pushOp, status);
             if (pushOp.setToRetry()) {
               postReplicationScheduledEvent(pushOp);
+              replicationTasksStorage.get().reset(pushOp);
               @SuppressWarnings("unused")
               ScheduledFuture<?> ignored2 =
                   pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES);
@@ -580,36 +580,21 @@
       if (inFlightOp != null) {
         return RunwayStatus.denied(inFlightOp.getId());
       }
+      replicationTasksStorage.get().start(op);
       inFlight.put(op.getURI(), op);
     }
     return RunwayStatus.allowed();
   }
 
-  void notifyFinished(PushOne task) {
+  void notifyFinished(PushOne op) {
     synchronized (stateLock) {
-      inFlight.remove(task.getURI());
-      if (!task.wasCanceled()) {
-        for (String ref : task.getRefs()) {
-          if (!refHasPendingPush(task.getURI(), ref)) {
-            replicationTasksStorage
-                .get()
-                .delete(
-                    new ReplicateRefUpdate(
-                        task.getProjectNameKey().get(), ref, task.getURI(), getRemoteConfigName()));
-          }
-        }
+      if (!op.isRetrying()) {
+        replicationTasksStorage.get().finish(op);
       }
+      inFlight.remove(op.getURI());
     }
   }
 
-  private boolean refHasPendingPush(URIish opUri, String ref) {
-    return pushContainsRef(pending.get(opUri), ref) || pushContainsRef(inFlight.get(opUri), ref);
-  }
-
-  private boolean pushContainsRef(PushOne op, String ref) {
-    return op != null && op.getRefs().contains(ref);
-  }
-
   boolean wouldPush(URIish uri, Project.NameKey project, String ref) {
     return matches(uri, project) && wouldPushProject(project) && wouldPushRef(ref);
   }
@@ -634,21 +619,7 @@
   }
 
   boolean isSingleProjectMatch() {
-    List<String> projects = config.getProjects();
-    boolean ret = (projects.size() == 1);
-    if (ret) {
-      String projectMatch = projects.get(0);
-      if (ReplicationFilter.getPatternType(projectMatch)
-          != ReplicationFilter.PatternType.EXACT_MATCH) {
-        // projectMatch is either regular expression, or wild-card.
-        //
-        // Even though they might refer to a single project now, they need not
-        // after new projects have been created. Hence, we do not treat them as
-        // matching a single project.
-        ret = false;
-      }
-    }
-    return ret;
+    return config.isSingleProjectMatch();
   }
 
   boolean wouldPushRef(String ref) {
@@ -775,6 +746,10 @@
     return config.getDelay() * 1000L;
   }
 
+  int getSlowLatencyThreshold() {
+    return config.getSlowLatencyThreshold();
+  }
+
   private static boolean matches(URIish uri, String urlMatch) {
     if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
       return true;
@@ -794,7 +769,7 @@
       ReplicationScheduledEvent event =
           new ReplicationScheduledEvent(project.get(), ref, targetNode);
       try {
-        eventDispatcher.get().postEvent(new Branch.NameKey(project, ref), event);
+        eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
       } catch (PermissionBackendException e) {
         repLog.error("error posting event", e);
       }
@@ -808,7 +783,7 @@
       RefReplicatedEvent event =
           new RefReplicatedEvent(project.get(), ref, targetNode, RefPushResult.FAILED, status);
       try {
-        eventDispatcher.get().postEvent(new Branch.NameKey(project, ref), event);
+        eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
       } catch (PermissionBackendException e) {
         repLog.error("error posting event", e);
       }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java
new file mode 100644
index 0000000..4050c9c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java
@@ -0,0 +1,103 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.flogger.FluentLogger;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.URIish;
+
+/**
+ * Implementation of {@link ConfigParser} for parsing {@link Config} to a collection of {@link
+ * DestinationConfiguration} objects
+ */
+public class DestinationConfigParser implements ConfigParser {
+
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.replication.ConfigParser#parseRemotes(org.eclipse.jgit.lib.Config)
+   */
+  @Override
+  public List<RemoteConfiguration> parseRemotes(Config config) throws ConfigInvalidException {
+
+    if (config.getSections().isEmpty()) {
+      logger.atWarning().log("Replication config does not exist or it's empty; not replicating");
+      return Collections.emptyList();
+    }
+
+    boolean defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
+
+    ImmutableList.Builder<RemoteConfiguration> confs = ImmutableList.builder();
+    for (RemoteConfig c : allRemotes(config)) {
+      if (c.getURIs().isEmpty()) {
+        continue;
+      }
+
+      // If destination for push is not set assume equal to source.
+      for (RefSpec ref : c.getPushRefSpecs()) {
+        if (ref.getDestination() == null) {
+          ref.setDestination(ref.getSource());
+        }
+      }
+
+      if (c.getPushRefSpecs().isEmpty()) {
+        c.addPushRefSpec(
+            new RefSpec()
+                .setSourceDestination("refs/*", "refs/*")
+                .setForceUpdate(defaultForceUpdate));
+      }
+
+      DestinationConfiguration destinationConfiguration = new DestinationConfiguration(c, config);
+
+      if (!destinationConfiguration.isSingleProjectMatch()) {
+        for (URIish u : c.getURIs()) {
+          if (u.getPath() == null || !u.getPath().contains("${name}")) {
+            throw new ConfigInvalidException(
+                String.format(
+                    "remote.%s.url \"%s\" lacks ${name} placeholder in %s",
+                    c.getName(), u, config));
+          }
+        }
+      }
+
+      confs.add(destinationConfiguration);
+    }
+
+    return confs.build();
+  }
+
+  private static List<RemoteConfig> allRemotes(Config cfg) throws ConfigInvalidException {
+    Set<String> names = cfg.getSubsections("remote");
+    List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size());
+    for (String name : names) {
+      try {
+        result.add(new RemoteConfig(cfg, name));
+      } catch (URISyntaxException e) {
+        throw new ConfigInvalidException(
+            String.format("remote %s has invalid URL in %s", name, cfg), e);
+      }
+    }
+    return result;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index f688cfc..4b757ea 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -16,13 +16,16 @@
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.config.ConfigUtil;
+import java.util.concurrent.TimeUnit;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.transport.RemoteConfig;
 
-public class DestinationConfiguration {
+public class DestinationConfiguration implements RemoteConfiguration {
   static final int DEFAULT_REPLICATION_DELAY = 15;
   static final int DEFAULT_RESCHEDULE_DELAY = 3;
   static final int DEFAULT_DRAIN_QUEUE_ATTEMPTS = 0;
+  private static final int DEFAULT_SLOW_LATENCY_THRESHOLD_SECS = 900;
 
   private final int delay;
   private final int rescheduleDelay;
@@ -41,6 +44,7 @@
   private final ImmutableList<String> authGroupNames;
   private final RemoteConfig remoteConfig;
   private final int maxRetries;
+  private final int slowLatencyThreshold;
 
   protected DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) {
     this.remoteConfig = remoteConfig;
@@ -67,16 +71,29 @@
     maxRetries =
         getInt(
             remoteConfig, cfg, "replicationMaxRetries", cfg.getInt("replication", "maxRetries", 0));
+
+    slowLatencyThreshold =
+        (int)
+            ConfigUtil.getTimeUnit(
+                cfg,
+                "remote",
+                remoteConfig.getName(),
+                "slowLatencyThreshold",
+                DEFAULT_SLOW_LATENCY_THRESHOLD_SECS,
+                TimeUnit.SECONDS);
   }
 
+  @Override
   public int getDelay() {
     return delay;
   }
 
+  @Override
   public int getRescheduleDelay() {
     return rescheduleDelay;
   }
 
+  @Override
   public int getRetryDelay() {
     return retryDelay;
   }
@@ -93,26 +110,32 @@
     return lockErrorMaxRetries;
   }
 
+  @Override
   public ImmutableList<String> getUrls() {
     return urls;
   }
 
+  @Override
   public ImmutableList<String> getAdminUrls() {
     return adminUrls;
   }
 
+  @Override
   public ImmutableList<String> getProjects() {
     return projects;
   }
 
+  @Override
   public ImmutableList<String> getAuthGroupNames() {
     return authGroupNames;
   }
 
+  @Override
   public String getRemoteNameStyle() {
     return remoteNameStyle;
   }
 
+  @Override
   public boolean replicatePermissions() {
     return replicatePermissions;
   }
@@ -129,10 +152,12 @@
     return replicateHiddenProjects;
   }
 
+  @Override
   public RemoteConfig getRemoteConfig() {
     return remoteConfig;
   }
 
+  @Override
   public int getMaxRetries() {
     return maxRetries;
   }
@@ -140,4 +165,9 @@
   private static int getInt(RemoteConfig rc, Config cfg, String name, int defValue) {
     return cfg.getInt("remote", rc.getName(), name, defValue);
   }
+
+  @Override
+  public int getSlowLatencyThreshold() {
+    return slowLatencyThreshold;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
new file mode 100644
index 0000000..eaf5b27
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -0,0 +1,269 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp;
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
+import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.Destination.Factory;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Predicate;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.transport.URIish;
+
+@Singleton
+public class DestinationsCollection implements ReplicationDestinations {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final Factory destinationFactory;
+  private final Provider<ReplicationQueue> replicationQueue;
+  private volatile List<Destination> destinations;
+  private boolean shuttingDown;
+
+  public static class EventQueueNotEmptyException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public EventQueueNotEmptyException(String errorMessage) {
+      super(errorMessage);
+    }
+  }
+
+  @Inject
+  public DestinationsCollection(
+      Destination.Factory destinationFactory,
+      Provider<ReplicationQueue> replicationQueue,
+      ReplicationConfig replicationConfig,
+      ConfigParser configParser,
+      EventBus eventBus)
+      throws ConfigInvalidException {
+    this.destinationFactory = destinationFactory;
+    this.replicationQueue = replicationQueue;
+    this.destinations =
+        allDestinations(
+            destinationFactory, configParser.parseRemotes(replicationConfig.getConfig()));
+    eventBus.register(this);
+  }
+
+  @Override
+  public Multimap<Destination, URIish> getURIs(
+      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
+    if (getAll(filterType).isEmpty()) {
+      return ImmutableMultimap.of();
+    }
+
+    SetMultimap<Destination, URIish> uris = HashMultimap.create();
+    for (Destination config : getAll(filterType)) {
+      if (filterType != FilterType.PROJECT_DELETION && !config.wouldPushProject(projectName)) {
+        continue;
+      }
+
+      if (remoteName.isPresent() && !config.getRemoteConfigName().equals(remoteName.get())) {
+        continue;
+      }
+
+      boolean adminURLUsed = false;
+
+      for (String url : config.getAdminUrls()) {
+        if (Strings.isNullOrEmpty(url)) {
+          continue;
+        }
+
+        URIish uri;
+        try {
+          uri = new URIish(url);
+        } catch (URISyntaxException e) {
+          repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage());
+          continue;
+        }
+
+        if (!isGerrit(uri) && !isGerritHttp(uri)) {
+          String path =
+              replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
+          if (path == null) {
+            repLog.warn("adminURL {} does not contain ${name}", uri);
+            continue;
+          }
+
+          uri = uri.setPath(path);
+          if (!isSSH(uri)) {
+            repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri);
+            continue;
+          }
+        }
+        uris.put(config, uri);
+        adminURLUsed = true;
+      }
+
+      if (!adminURLUsed) {
+        for (URIish uri : config.getURIs(projectName, "*")) {
+          uris.put(config, uri);
+        }
+      }
+    }
+    return uris;
+  }
+
+  @Override
+  public List<Destination> getAll(FilterType filterType) {
+    Predicate<? super Destination> filter;
+    switch (filterType) {
+      case PROJECT_CREATION:
+        filter = dest -> dest.isCreateMissingRepos();
+        break;
+      case PROJECT_DELETION:
+        filter = dest -> dest.isReplicateProjectDeletions();
+        break;
+      case ALL:
+      default:
+        filter = dest -> true;
+        break;
+    }
+    return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList());
+  }
+
+  @Override
+  public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
+    List<Destination> dests = new ArrayList<>();
+    for (Destination dest : getAll(FilterType.ALL)) {
+      if (dest.wouldPush(uri, project, ref)) {
+        dests.add(dest);
+      }
+    }
+    return dests;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return destinations.isEmpty();
+  }
+
+  @Override
+  public synchronized void startup(WorkQueue workQueue) {
+    shuttingDown = false;
+    for (Destination cfg : destinations) {
+      cfg.start(workQueue);
+    }
+  }
+
+  /* shutdown() cannot be set as a synchronized method because
+   * it may need to wait for pending events to complete;
+   * e.g. when enabling the drain of replication events before
+   * shutdown.
+   *
+   * As a rule of thumb for synchronized methods, because they
+   * implicitly define a critical section and associated lock,
+   * they should never hold waiting for another resource, otherwise
+   * the risk of deadlock is very high.
+   *
+   * See more background about deadlocks, what they are and how to
+   * prevent them at: https://en.wikipedia.org/wiki/Deadlock
+   */
+  @Override
+  public int shutdown() {
+    synchronized (this) {
+      shuttingDown = true;
+    }
+
+    int discarded = 0;
+    for (Destination cfg : destinations) {
+      try {
+        drainReplicationEvents(cfg);
+      } catch (EventQueueNotEmptyException e) {
+        logger.atWarning().log("Event queue not empty: %s", e.getMessage());
+      } finally {
+        discarded += cfg.shutdown();
+      }
+    }
+    return discarded;
+  }
+
+  void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
+    int drainQueueAttempts = destination.getDrainQueueAttempts();
+    if (drainQueueAttempts == 0) {
+      return;
+    }
+    int pending = destination.getQueueInfo().pending.size();
+    int inFlight = destination.getQueueInfo().inFlight.size();
+    while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
+      try {
+        logger.atInfo().log(
+            "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
+            inFlight, pending);
+        Thread.sleep(destination.getReplicationDelayMilliseconds());
+      } catch (InterruptedException ie) {
+        logger.atWarning().withCause(ie).log(
+            "Wait for replication events to drain has been interrupted");
+      }
+      pending = destination.getQueueInfo().pending.size();
+      inFlight = destination.getQueueInfo().inFlight.size();
+      drainQueueAttempts--;
+    }
+    if (pending > 0 || inFlight > 0) {
+      throw new EventQueueNotEmptyException(
+          String.format("Pending: %d - InFlight: %d", pending, inFlight));
+    }
+  }
+
+  @Subscribe
+  public synchronized void onReload(List<RemoteConfiguration> remoteConfigurations) {
+    if (shuttingDown) {
+      logger.atWarning().log("Shutting down: configuration reload ignored");
+      return;
+    }
+
+    try {
+      replicationQueue.get().stop();
+      destinations = allDestinations(destinationFactory, remoteConfigurations);
+      logger.atInfo().log("Configuration reloaded: %d destinations", getAll(FilterType.ALL).size());
+    } finally {
+      replicationQueue.get().start();
+    }
+  }
+
+  private List<Destination> allDestinations(
+      Destination.Factory destinationFactory, List<RemoteConfiguration> remoteConfigurations) {
+
+    ImmutableList.Builder<Destination> dest = ImmutableList.builder();
+    for (RemoteConfiguration c : remoteConfigurations) {
+      if (c instanceof DestinationConfiguration) {
+        dest.add(destinationFactory.create((DestinationConfiguration) c));
+      }
+    }
+    return dest.build();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
new file mode 100644
index 0000000..4cc9974
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
@@ -0,0 +1,182 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.io.Files.getNameWithoutExtension;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.collect.Lists;
+import com.google.common.flogger.FluentLogger;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.internal.storage.file.FileSnapshot;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+
+public class FanoutReplicationConfig implements ReplicationConfig {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final ReplicationFileBasedConfig replicationConfig;
+  private final Config config;
+  private final Path remoteConfigsDirPath;
+
+  @Inject
+  public FanoutReplicationConfig(SitePaths site, @PluginData Path pluginDataDir)
+      throws IOException, ConfigInvalidException {
+
+    remoteConfigsDirPath = site.etc_dir.resolve("replication");
+    replicationConfig = new ReplicationFileBasedConfig(site, pluginDataDir);
+    config = replicationConfig.getConfig();
+    removeRemotes(config);
+
+    try (Stream<Path> files = Files.list(remoteConfigsDirPath)) {
+      files
+          .filter(Files::isRegularFile)
+          .filter(FanoutReplicationConfig::isConfig)
+          .map(FanoutReplicationConfig::loadConfig)
+          .filter(Optional::isPresent)
+          .map(Optional::get)
+          .filter(FanoutReplicationConfig::isValid)
+          .forEach(cfg -> addRemoteConfig(cfg, config));
+    } catch (IllegalStateException e) {
+      throw new ConfigInvalidException(e.getMessage());
+    }
+  }
+
+  private static void removeRemotes(Config config) {
+    Set<String> remoteNames = config.getSubsections("remote");
+    if (remoteNames.size() > 0) {
+      logger.atSevere().log(
+          "When replication directory is present replication.config file cannot contain remote configuration. Ignoring: %s",
+          String.join(",", remoteNames));
+
+      for (String name : remoteNames) {
+        config.unsetSection("remote", name);
+      }
+    }
+  }
+
+  private static void addRemoteConfig(FileBasedConfig source, Config destination) {
+    String remoteName = getNameWithoutExtension(source.getFile().getName());
+    for (String name : source.getNames("remote")) {
+      destination.setStringList(
+          "remote",
+          remoteName,
+          name,
+          Lists.newArrayList(source.getStringList("remote", null, name)));
+    }
+  }
+
+  private static boolean isValid(Config cfg) {
+    if (cfg.getSections().size() != 1 || !cfg.getSections().contains("remote")) {
+      logger.atSevere().log(
+          "Remote replication configuration file %s must contain only one remote section.", cfg);
+      return false;
+    }
+    if (cfg.getSubsections("remote").size() > 0) {
+      logger.atSevere().log(
+          "Remote replication configuration file %s cannot contain remote subsections.", cfg);
+      return false;
+    }
+
+    return true;
+  }
+
+  private static Optional<FileBasedConfig> loadConfig(Path path) {
+    FileBasedConfig cfg = new FileBasedConfig(path.toFile(), FS.DETECTED);
+    try {
+      cfg.load();
+    } catch (IOException | ConfigInvalidException e) {
+      logger.atSevere().withCause(e).log(
+          "Cannot load remote replication configuration file %s.", path);
+      return Optional.empty();
+    }
+    return Optional.of(cfg);
+  }
+
+  private static boolean isConfig(Path p) {
+    return p.toString().endsWith(".config");
+  }
+
+  @Override
+  public boolean isReplicateAllOnPluginStart() {
+    return replicationConfig.isReplicateAllOnPluginStart();
+  }
+
+  @Override
+  public boolean isDefaultForceUpdate() {
+    return replicationConfig.isDefaultForceUpdate();
+  }
+
+  @Override
+  public int getMaxRefsToLog() {
+    return replicationConfig.getMaxRefsToLog();
+  }
+
+  @Override
+  public Path getEventsDirectory() {
+    return replicationConfig.getEventsDirectory();
+  }
+
+  @Override
+  public int getSshConnectionTimeout() {
+    return replicationConfig.getSshConnectionTimeout();
+  }
+
+  @Override
+  public int getSshCommandTimeout() {
+    return replicationConfig.getSshCommandTimeout();
+  }
+
+  @Override
+  public String getVersion() {
+    Hasher hasher = Hashing.murmur3_128().newHasher();
+    hasher.putString(replicationConfig.getVersion(), UTF_8);
+    try (Stream<Path> files = Files.list(remoteConfigsDirPath)) {
+      files
+          .filter(Files::isRegularFile)
+          .filter(FanoutReplicationConfig::isConfig)
+          .sorted()
+          .map(Path::toFile)
+          .map(FileSnapshot::save)
+          .forEach(
+              fileSnapshot ->
+                  // hashCode is based on file size, file key and last modified time
+                  hasher.putInt(fileSnapshot.hashCode()));
+      return hasher.hash().toString();
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log(
+          "Cannot list remote configuration files from %s. Returning replication.config file version",
+          remoteConfigsDirPath);
+      return replicationConfig.getVersion();
+    }
+  }
+
+  @Override
+  public Config getConfig() {
+    return config;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
index eac56df..66130f9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
@@ -18,8 +18,8 @@
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.restapi.Url;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import java.io.IOException;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
index 6dcc80e..d195aa3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.ssh.SshAddressesModule;
 import java.io.IOException;
 import java.io.OutputStream;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
index 07978ab..9264d9b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
@@ -40,11 +40,11 @@
   @Option(name = "--json", usage = "output in json format")
   private boolean json;
 
-  @Inject private ReplicationConfig config;
+  @Inject private ReplicationDestinations destinations;
 
   @Override
   protected void run() {
-    for (Destination d : config.getDestinations(FilterType.ALL)) {
+    for (Destination d : destinations.getAll(FilterType.ALL)) {
       if (matches(d.getRemoteConfigName())) {
         printRemote(d);
       }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
index aa6e16c..da960e6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -16,7 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import java.io.File;
 import java.io.IOException;
 import org.eclipse.jgit.internal.storage.file.FileRepository;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/MainReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/MainReplicationConfig.java
new file mode 100644
index 0000000..e8d95ec
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/MainReplicationConfig.java
@@ -0,0 +1,23 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@BindingAnnotation
+@Retention(RetentionPolicy.RUNTIME)
+public @interface MainReplicationConfig {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java
new file mode 100644
index 0000000..a347f3a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java
@@ -0,0 +1,55 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.io.IOException;
+import java.util.Locale;
+
+/** Some NFS utilities */
+public class Nfs {
+  /**
+   * Determine if a throwable or a cause in its causal chain is a Stale NFS File Handle
+   *
+   * @param throwable
+   * @return a boolean true if the throwable or a cause in its causal chain is a Stale NFS File
+   *     Handle
+   */
+  public static boolean isStaleFileHandleInCausalChain(Throwable throwable) {
+    while (throwable != null) {
+      if (throwable instanceof IOException && isStaleFileHandle((IOException) throwable)) {
+        return true;
+      }
+      throwable = throwable.getCause();
+    }
+    return false;
+  }
+
+  /**
+   * Determine if an IOException is a Stale NFS File Handle
+   *
+   * @param ioe
+   * @return a boolean true if the IOException is a Stale NFS FIle Handle
+   */
+  public static boolean isStaleFileHandle(IOException ioe) {
+    String msg = ioe.getMessage();
+    return msg != null && msg.toLowerCase(Locale.ROOT).matches(".*stale .*file .*handle.*");
+  }
+
+  public static <T extends Throwable> void throwIfNotStaleFileHandle(T e) throws T {
+    if (!isStaleFileHandleInCausalChain(e)) {
+      throw e;
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java
new file mode 100644
index 0000000..1007ae5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+/** Allows a queue activity to be observed */
+public interface ObservableQueue {
+  /**
+   * Indicates whether the observed queue is running
+   *
+   * @return true, when the queue is running, false otherwise
+   */
+  boolean isRunning();
+
+  /**
+   * Indicates whether the observed queue is replaying queued events
+   *
+   * @return true, when the queue is replaying, false otherwise
+   */
+  boolean isReplaying();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
index 833b02b..4f60319 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.gerrit.common.Nullable;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.inject.Inject;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index b488264..b53bf69 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -16,6 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.joining;
 import static java.util.stream.Collectors.toMap;
 
@@ -24,13 +25,13 @@
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Sets;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.ResourceConflictException;
 import com.google.gerrit.metrics.Timer1;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.reviewdb.client.RefNames;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.PerThreadRequestScope;
 import com.google.gerrit.server.git.ProjectRunnable;
@@ -83,7 +84,7 @@
  * <p>Instance members are protected by the lock within PushQueue. Callers must take that lock to
  * ensure they are working with a current view of the object.
  */
-class PushOne implements ProjectRunnable, CanceledWhileRunning {
+class PushOne implements ProjectRunnable, CanceledWhileRunning, UriUpdates {
   private final ReplicationStateListener stateLog;
   static final String ALL_REFS = "..all..";
   static final String ID_MDC_KEY = "pushOneId";
@@ -217,6 +218,10 @@
     return maxRetries == 0 || retryCount <= maxRetries;
   }
 
+  private void retryDone() {
+    this.retrying = false;
+  }
+
   void canceledByReplication() {
     canceled = true;
   }
@@ -225,7 +230,8 @@
     return canceled || canceledWhileRunning.get();
   }
 
-  URIish getURI() {
+  @Override
+  public URIish getURI() {
     return uri;
   }
 
@@ -239,7 +245,8 @@
     }
   }
 
-  Set<String> getRefs() {
+  @Override
+  public Set<String> getRefs() {
     return pushAllRefs ? Sets.newHashSet(ALL_REFS) : delta;
   }
 
@@ -334,14 +341,20 @@
     }
 
     repLog.info("Replication to {} started...", uri);
-    Timer1.Context context = metrics.start(config.getName());
+    Timer1.Context<String> destinationContext = metrics.start(config.getName());
     try {
-      long startedAt = context.getStartTime();
+      long startedAt = destinationContext.getStartTime();
       long delay = NANOSECONDS.toMillis(startedAt - createdAt);
       metrics.record(config.getName(), delay, retryCount);
       git = gitManager.openRepository(projectName);
       runImpl();
-      long elapsed = NANOSECONDS.toMillis(context.stop());
+      long elapsed = NANOSECONDS.toMillis(destinationContext.stop());
+
+      if (elapsed > SECONDS.toMillis(pool.getSlowLatencyThreshold())) {
+        metrics.recordSlowProjectReplication(
+            config.getName(), projectName.get(), pool.getSlowLatencyThreshold(), elapsed);
+      }
+      retryDone();
       repLog.info(
           "Replication to {} completed in {}ms, {}ms delay, {} retries",
           uri,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
index fccdb7b..d1ab790 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
@@ -14,9 +14,10 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.events.RefEvent;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import java.util.Objects;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.RemoteRefUpdate.Status;
 
@@ -45,11 +46,40 @@
 
   @Override
   public Project.NameKey getProjectNameKey() {
-    return new Project.NameKey(project);
+    return Project.nameKey(project);
   }
 
   @Override
   public String getRefName() {
     return ref;
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof RefReplicatedEvent)) {
+      return false;
+    }
+    RefReplicatedEvent event = (RefReplicatedEvent) other;
+    if (!Objects.equals(event.project, this.project)) {
+      return false;
+    }
+    if (!Objects.equals(event.ref, this.ref)) {
+      return false;
+    }
+    if (!Objects.equals(event.targetNode, this.targetNode)) {
+      return false;
+    }
+    if (!Objects.equals(event.status, this.status)) {
+      return false;
+    }
+    if (!Objects.equals(event.refStatus, this.refStatus)) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
index 4789a96..e663194 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
@@ -14,8 +14,9 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.events.RefEvent;
+import java.util.Objects;
 
 public class RefReplicationDoneEvent extends RefEvent {
   public static final String TYPE = "ref-replication-done";
@@ -33,11 +34,35 @@
 
   @Override
   public Project.NameKey getProjectNameKey() {
-    return new Project.NameKey(project);
+    return Project.nameKey(project);
   }
 
   @Override
   public String getRefName() {
     return ref;
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof RefReplicationDoneEvent)) {
+      return false;
+    }
+
+    RefReplicationDoneEvent event = (RefReplicationDoneEvent) other;
+    if (!Objects.equals(event.project, this.project)) {
+      return false;
+    }
+    if (!Objects.equals(event.ref, this.ref)) {
+      return false;
+    }
+    if (event.nodesCount != this.nodesCount) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
new file mode 100644
index 0000000..b66e73c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
@@ -0,0 +1,122 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.eclipse.jgit.transport.RemoteConfig;
+
+/** Remote configuration for a replication endpoint */
+public interface RemoteConfiguration {
+  /**
+   * Time to wait before scheduling a remote replication operation. Setting to 0 effectively
+   * disables the delay.
+   *
+   * @return the delay value in seconds
+   */
+  int getDelay();
+  /**
+   * Time to wait before rescheduling a remote replication operation, which might have failed the
+   * first time round. Setting to 0 effectively disables the delay.
+   *
+   * @return the delay value in seconds
+   */
+  int getRescheduleDelay();
+  /**
+   * Time to wait before retrying a failed remote replication operation, Setting to 0 effectively
+   * disables the delay.
+   *
+   * @return the delay value in seconds
+   */
+  int getRetryDelay();
+  /**
+   * List of the remote endpoint addresses used for replication.
+   *
+   * @return list of remote URL strings
+   */
+  ImmutableList<String> getUrls();
+  /**
+   * List of alternative remote endpoint addresses, used for admin operations, such as repository
+   * creation
+   *
+   * @return list of remote URL strings
+   */
+  ImmutableList<String> getAdminUrls();
+  /**
+   * List of repositories that should be replicated
+   *
+   * @return list of project strings
+   */
+  ImmutableList<String> getProjects();
+  /**
+   * List of groups that should be used to access the repositories.
+   *
+   * @return list of group strings
+   */
+  ImmutableList<String> getAuthGroupNames();
+  /**
+   * Influence how the name of the remote repository should be computed.
+   *
+   * @return a string representing a remote style name
+   */
+  String getRemoteNameStyle();
+  /**
+   * If true, permissions-only projects and the refs/meta/config branch will also be replicated
+   *
+   * @return a string representing a remote style name
+   */
+  boolean replicatePermissions();
+  /**
+   * the JGIT remote configuration representing the replication for this endpoint
+   *
+   * @return The remote config {@link RemoteConfig}
+   */
+  RemoteConfig getRemoteConfig();
+  /**
+   * Number of times to retry a replication operation
+   *
+   * @return the number of retries
+   */
+  int getMaxRetries();
+
+  /**
+   * the time duration after which the replication for a project should be considered “slow”
+   *
+   * @return the slow latency threshold
+   */
+  int getSlowLatencyThreshold();
+
+  /**
+   * Whether the remote configuration is for a single project only
+   *
+   * @return true, when configuration is for a single project, false otherwise
+   */
+  default boolean isSingleProjectMatch() {
+    List<String> projects = getProjects();
+    boolean ret = (projects.size() == 1);
+    if (ret) {
+      String projectMatch = projects.get(0);
+      if (ReplicationFilter.getPatternType(projectMatch)
+          != ReplicationFilter.PatternType.EXACT_MATCH) {
+        // projectMatch is either regular expression, or wild-card.
+        //
+        // Even though they might refer to a single project now, they need not
+        // after new projects have been created. Hence, we do not treat them as
+        // matching a single project.
+        ret = false;
+      }
+    }
+    return ret;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
index e685215..7538298 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -16,7 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import java.io.IOException;
 import java.io.OutputStream;
 import org.eclipse.jgit.transport.URIish;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index ccdead8..b978952 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -11,46 +11,81 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
+
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.common.collect.Multimap;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.git.WorkQueue;
 import java.nio.file.Path;
-import java.util.List;
-import java.util.Optional;
-import org.eclipse.jgit.transport.URIish;
+import org.eclipse.jgit.lib.Config;
 
+/** Configuration of all the replication end points. */
 public interface ReplicationConfig {
 
+  /** Filter for accessing replication projects. */
   enum FilterType {
     PROJECT_CREATION,
     PROJECT_DELETION,
     ALL
   }
 
-  List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref);
-
-  List<Destination> getDestinations(FilterType filterType);
-
-  Multimap<Destination, URIish> getURIs(
-      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType);
-
+  /**
+   * Returns current replication configuration of whether to replicate or not all the projects when
+   * the plugin starts.
+   *
+   * @return true if replication at plugin start, false otherwise.
+   */
   boolean isReplicateAllOnPluginStart();
 
+  /**
+   * Returns the default behaviour of the replication plugin when pushing to remote replication
+   * ends. Even though the property name has the 'update' suffix, it actually refers to Git push
+   * operation and not to a Git update.
+   *
+   * @return true if forced push is the default, false otherwise.
+   */
   boolean isDefaultForceUpdate();
 
+  /**
+   * Returns the maximum number of ref-specs to log into the replication_log whenever a push
+   * operation is completed against a replication end.
+   *
+   * @return maximum number of refs to log, zero if unlimited.
+   */
   int getMaxRefsToLog();
 
-  boolean isEmpty();
-
+  /**
+   * Configured location where the replication events are stored on the filesystem for being resumed
+   * and kept across restarts.
+   *
+   * @return path to store persisted events.
+   */
   Path getEventsDirectory();
 
-  int shutdown();
-
-  void startup(WorkQueue workQueue);
-
+  /**
+   * Timeout for establishing SSH connection to remote.
+   *
+   * @return connection timeout, zero if infinite.
+   */
   int getSshConnectionTimeout();
 
+  /**
+   * Timeout for executing an SSH command on remote.
+   *
+   * @return command timeout, zero if infinite.
+   */
   int getSshCommandTimeout();
+
+  /**
+   * Current logical version string of the current configuration loaded in memory, depending on the
+   * actual implementation of the configuration on the persistent storage.
+   *
+   * @return current logical version number.
+   */
+  String getVersion();
+
+  /**
+   * Return a copy of the current config.
+   *
+   * @return the config.
+   */
+  Config getConfig();
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
new file mode 100644
index 0000000..18ccc66
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
@@ -0,0 +1,73 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.collect.Multimap;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.WorkQueue;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import java.util.List;
+import java.util.Optional;
+import org.eclipse.jgit.transport.URIish;
+
+/** Git destinations currently active for replication. */
+public interface ReplicationDestinations {
+
+  /**
+   * Return all the URIs associated to a project and a filter criteria.
+   *
+   * @param remoteName name of the replication end or empty if selecting all ends.
+   * @param projectName name of the project
+   * @param filterType type of filter criteria for selecting projects
+   * @return the multi-map of destinations and the associated replication URIs
+   */
+  Multimap<Destination, URIish> getURIs(
+      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType);
+
+  /**
+   * List of currently active replication destinations.
+   *
+   * @param filterType type project filtering
+   * @return the list of active destinations
+   */
+  List<Destination> getAll(FilterType filterType);
+
+  /**
+   * Return the active replication destinations for a uri/project/ref triplet.
+   *
+   * @param uriish uri of the destinations
+   * @param project name of the project
+   * @param ref ref name
+   * @return the list of active destinations
+   */
+  List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref);
+
+  /** @return true if there are no destinations, false otherwise. */
+  boolean isEmpty();
+
+  /**
+   * Start replicating to all destinations.
+   *
+   * @param workQueue execution queue for scheduling the replication events.
+   */
+  void startup(WorkQueue workQueue);
+
+  /**
+   * Stop the replication to all destinations.
+   *
+   * @return number of events cancelled during shutdown.
+   */
+  int shutdown();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 3094929..e99f6b1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -13,52 +13,22 @@
 // limitations under the License.
 package com.googlesource.gerrit.plugins.replication;
 
-import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
-import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp;
-import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static java.util.stream.Collectors.toList;
 
 import com.google.common.base.Strings;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.SetMultimap;
-import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.annotations.PluginData;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.config.ConfigUtil;
 import com.google.gerrit.server.config.SitePaths;
-import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
-import com.google.inject.Singleton;
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Predicate;
 import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.transport.RemoteConfig;
-import org.eclipse.jgit.transport.URIish;
 import org.eclipse.jgit.util.FS;
 
-@Singleton
 public class ReplicationFileBasedConfig implements ReplicationConfig {
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final int DEFAULT_SSH_CONNECTION_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes
 
-  private List<Destination> destinations;
   private final SitePaths site;
   private Path cfgPath;
   private boolean replicateAllOnPluginStart;
@@ -70,188 +40,24 @@
   private final Path pluginDataDir;
 
   @Inject
-  public ReplicationFileBasedConfig(
-      SitePaths site, Destination.Factory destinationFactory, @PluginData Path pluginDataDir)
-      throws ConfigInvalidException, IOException {
+  public ReplicationFileBasedConfig(SitePaths site, @PluginData Path pluginDataDir) {
     this.site = site;
     this.cfgPath = site.etc_dir.resolve("replication.config");
     this.config = new FileBasedConfig(cfgPath.toFile(), FS.DETECTED);
-    this.destinations = allDestinations(destinationFactory);
-    this.pluginDataDir = pluginDataDir;
-  }
-
-  @Override
-  public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
-    List<Destination> dests = new ArrayList<>();
-    for (Destination dest : getDestinations(FilterType.ALL)) {
-      if (dest.wouldPush(uri, project, ref)) {
-        dests.add(dest);
-      }
-    }
-    return dests;
-  }
-
-  /*
-   * (non-Javadoc)
-   * @see
-   * com.googlesource.gerrit.plugins.replication.ReplicationConfig#getDestinations
-   * (com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType)
-   */
-  @Override
-  public List<Destination> getDestinations(FilterType filterType) {
-    Predicate<? super Destination> filter;
-    switch (filterType) {
-      case PROJECT_CREATION:
-        filter = dest -> dest.isCreateMissingRepos();
-        break;
-      case PROJECT_DELETION:
-        filter = dest -> dest.isReplicateProjectDeletions();
-        break;
-      case ALL:
-      default:
-        filter = dest -> true;
-        break;
-    }
-    return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList());
-  }
-
-  private List<Destination> allDestinations(Destination.Factory destinationFactory)
-      throws ConfigInvalidException, IOException {
-    if (!config.getFile().exists()) {
-      logger.atWarning().log("Config file %s does not exist; not replicating", config.getFile());
-      return Collections.emptyList();
-    }
-    if (config.getFile().length() == 0) {
-      logger.atInfo().log("Config file %s is empty; not replicating", config.getFile());
-      return Collections.emptyList();
-    }
-
     try {
       config.load();
     } catch (ConfigInvalidException e) {
-      throw new ConfigInvalidException(
-          String.format("Config file %s is invalid: %s", config.getFile(), e.getMessage()), e);
+      repLog.error("Config file {} is invalid: {}", cfgPath, e.getMessage(), e);
     } catch (IOException e) {
-      throw new IOException(
-          String.format("Cannot read %s: %s", config.getFile(), e.getMessage()), e);
+      repLog.error("Cannot read {}: {}", cfgPath, e.getMessage(), e);
     }
-
-    replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
-
-    defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
-
-    maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0);
-
-    sshCommandTimeout =
-        (int) ConfigUtil.getTimeUnit(config, "gerrit", null, "sshCommandTimeout", 0, SECONDS);
-    sshConnectionTimeout =
-        (int)
-            ConfigUtil.getTimeUnit(
-                config,
-                "gerrit",
-                null,
-                "sshConnectionTimeout",
-                DEFAULT_SSH_CONNECTION_TIMEOUT_MS,
-                MILLISECONDS);
-
-    ImmutableList.Builder<Destination> dest = ImmutableList.builder();
-    for (RemoteConfig c : allRemotes(config)) {
-      if (c.getURIs().isEmpty()) {
-        continue;
-      }
-
-      // If destination for push is not set assume equal to source.
-      for (RefSpec ref : c.getPushRefSpecs()) {
-        if (ref.getDestination() == null) {
-          ref.setDestination(ref.getSource());
-        }
-      }
-
-      if (c.getPushRefSpecs().isEmpty()) {
-        c.addPushRefSpec(
-            new RefSpec()
-                .setSourceDestination("refs/*", "refs/*")
-                .setForceUpdate(defaultForceUpdate));
-      }
-
-      Destination destination = destinationFactory.create(new DestinationConfiguration(c, config));
-
-      if (!destination.isSingleProjectMatch()) {
-        for (URIish u : c.getURIs()) {
-          if (u.getPath() == null || !u.getPath().contains("${name}")) {
-            throw new ConfigInvalidException(
-                String.format(
-                    "remote.%s.url \"%s\" lacks ${name} placeholder in %s",
-                    c.getName(), u, config.getFile()));
-          }
-        }
-      }
-
-      dest.add(destination);
-    }
-    return dest.build();
+    this.replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
+    this.defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
+    this.maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0);
+    this.pluginDataDir = pluginDataDir;
   }
 
-  @Override
-  public Multimap<Destination, URIish> getURIs(
-      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
-    if (getDestinations(filterType).isEmpty()) {
-      return ImmutableMultimap.of();
-    }
-
-    SetMultimap<Destination, URIish> uris = HashMultimap.create();
-    for (Destination config : getDestinations(filterType)) {
-      if (filterType != FilterType.PROJECT_DELETION && !config.wouldPushProject(projectName)) {
-        continue;
-      }
-
-      if (remoteName.isPresent() && !config.getRemoteConfigName().equals(remoteName.get())) {
-        continue;
-      }
-
-      boolean adminURLUsed = false;
-
-      for (String url : config.getAdminUrls()) {
-        if (Strings.isNullOrEmpty(url)) {
-          continue;
-        }
-
-        URIish uri;
-        try {
-          uri = new URIish(url);
-        } catch (URISyntaxException e) {
-          repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage());
-          continue;
-        }
-
-        if (!isGerrit(uri) && !isGerritHttp(uri)) {
-          String path =
-              replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
-          if (path == null) {
-            repLog.warn("adminURL {} does not contain ${name}", uri);
-            continue;
-          }
-
-          uri = uri.setPath(path);
-          if (!isSSH(uri)) {
-            repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri);
-            continue;
-          }
-        }
-        uris.put(config, uri);
-        adminURLUsed = true;
-      }
-
-      if (!adminURLUsed) {
-        for (URIish uri : config.getURIs(projectName, "*")) {
-          uris.put(config, uri);
-        }
-      }
-    }
-    return uris;
-  }
-
-  static String replaceName(String in, String name, boolean keyIsOptional) {
+  public static String replaceName(String in, String name, boolean keyIsOptional) {
     String key = "${name}";
     int n = in.indexOf(key);
     if (0 <= n) {
@@ -284,28 +90,6 @@
     return maxRefsToLog;
   }
 
-  private static List<RemoteConfig> allRemotes(FileBasedConfig cfg) throws ConfigInvalidException {
-    Set<String> names = cfg.getSubsections("remote");
-    List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size());
-    for (String name : names) {
-      try {
-        result.add(new RemoteConfig(cfg, name));
-      } catch (URISyntaxException e) {
-        throw new ConfigInvalidException(
-            String.format("remote %s has invalid URL in %s", name, cfg.getFile()), e);
-      }
-    }
-    return result;
-  }
-
-  /* (non-Javadoc)
-   * @see com.googlesource.gerrit.plugins.replication.ReplicationConfig#isEmpty()
-   */
-  @Override
-  public boolean isEmpty() {
-    return destinations.isEmpty();
-  }
-
   @Override
   public Path getEventsDirectory() {
     String eventsDirectory = config.getString("replication", null, "eventsDirectory");
@@ -320,66 +104,13 @@
   }
 
   @Override
-  public int shutdown() {
-    int discarded = 0;
-    for (Destination cfg : destinations) {
-      try {
-        drainReplicationEvents(cfg);
-      } catch (EventQueueNotEmptyException e) {
-        logger.atWarning().log("Event queue not empty: %s", e.getMessage());
-      } finally {
-        discarded += cfg.shutdown();
-      }
-    }
-    return discarded;
-  }
-
-  void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
-    int drainQueueAttempts = destination.getDrainQueueAttempts();
-    if (drainQueueAttempts == 0) {
-      return;
-    }
-    int pending = destination.getQueueInfo().pending.size();
-    int inFlight = destination.getQueueInfo().inFlight.size();
-
-    while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
-      try {
-        logger.atInfo().log(
-            "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
-            inFlight, pending);
-        Thread.sleep(destination.getReplicationDelayMilliseconds());
-      } catch (InterruptedException ie) {
-        logger.atWarning().withCause(ie).log(
-            "Wait for replication events to drain has been interrupted");
-      }
-      pending = destination.getQueueInfo().pending.size();
-      inFlight = destination.getQueueInfo().inFlight.size();
-      drainQueueAttempts--;
-    }
-
-    if (pending > 0 || inFlight > 0) {
-      throw new EventQueueNotEmptyException(
-          String.format("Pending: %d - InFlight: %d", pending, inFlight));
-    }
-  }
-
-  public static class EventQueueNotEmptyException extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    public EventQueueNotEmptyException(String errorMessage) {
-      super(errorMessage);
-    }
-  }
-
-  FileBasedConfig getConfig() {
+  public Config getConfig() {
     return config;
   }
 
   @Override
-  public void startup(WorkQueue workQueue) {
-    for (Destination cfg : destinations) {
-      cfg.start(workQueue);
-    }
+  public String getVersion() {
+    return Long.toString(config.getFile().lastModified());
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
index 05bbb03..5b4204e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.gerrit.common.data.AccessSection;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import java.util.Collections;
 import java.util.List;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java
index afc7926..1bc17ec 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java
@@ -14,11 +14,14 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.metrics.Description;
 import com.google.gerrit.metrics.Field;
 import com.google.gerrit.metrics.Histogram1;
+import com.google.gerrit.metrics.Histogram3;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.server.logging.PluginMetadata;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -27,10 +30,37 @@
   private final Timer1<String> executionTime;
   private final Histogram1<String> executionDelay;
   private final Histogram1<String> executionRetries;
+  private final Histogram3<Integer, String, String> slowProjectReplicationLatency;
 
   @Inject
-  ReplicationMetrics(MetricMaker metricMaker) {
-    Field<String> DEST_FIELD = Field.ofString("destination");
+  ReplicationMetrics(@PluginName String pluginName, MetricMaker metricMaker) {
+    Field<String> DEST_FIELD =
+        Field.ofString(
+                "destination",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(PluginMetadata.create("destination", fieldValue)))
+            .build();
+
+    Field<String> PROJECT_FIELD =
+        Field.ofString(
+                "project",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(PluginMetadata.create("project", fieldValue)))
+            .build();
+
+    Field<Integer> SLOW_THRESHOLD_FIELD =
+        Field.ofInteger(
+                "slow_threshold",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(
+                            PluginMetadata.create("slow_threshold", fieldValue.toString())))
+            .build();
 
     executionTime =
         metricMaker.newTimer(
@@ -55,6 +85,17 @@
                 .setCumulative()
                 .setUnit("retries"),
             DEST_FIELD);
+
+    slowProjectReplicationLatency =
+        metricMaker.newHistogram(
+            "latency_slower_than_threshold" + "",
+            new Description(
+                    "latency for project to destination, where latency was slower than threshold")
+                .setCumulative()
+                .setUnit(Description.Units.MILLISECONDS),
+            SLOW_THRESHOLD_FIELD,
+            PROJECT_FIELD,
+            DEST_FIELD);
   }
 
   /**
@@ -63,7 +104,7 @@
    * @param name the destination name.
    * @return the timer context.
    */
-  Timer1.Context start(String name) {
+  Timer1.Context<String> start(String name) {
     return executionTime.start(name);
   }
 
@@ -78,4 +119,17 @@
     executionDelay.record(name, delay);
     executionRetries.record(name, retries);
   }
+
+  /**
+   * Record replication latency for project to destination, where latency was slower than threshold
+   *
+   * @param destinationName the destination name.
+   * @param projectName the project name.
+   * @param slowThreshold replication initialDelay in milliseconds.
+   * @param latency number of retries.
+   */
+  void recordSlowProjectReplication(
+      String destinationName, String projectName, Integer slowThreshold, long latency) {
+    slowProjectReplicationLatency.record(slowThreshold, destinationName, projectName, latency);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index 835d068..c2b96a1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -16,6 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.StartReplicationCapability.START_REPLICATION;
 
+import com.google.common.eventbus.EventBus;
 import com.google.gerrit.extensions.annotations.Exports;
 import com.google.gerrit.extensions.config.CapabilityDefinition;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
@@ -23,18 +24,38 @@
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.events.EventTypes;
 import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.ProvisionException;
 import com.google.inject.Scopes;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.internal.UniqueAnnotations;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.transport.SshSessionFactory;
+import org.eclipse.jgit.util.FS;
 
 class ReplicationModule extends AbstractModule {
+  private final SitePaths site;
+  private final Path cfgPath;
+
+  @Inject
+  public ReplicationModule(SitePaths site) {
+    this.site = site;
+    cfgPath = site.etc_dir.resolve("replication.config");
+  }
+
   @Override
   protected void configure() {
     install(new FactoryModuleBuilder().build(Destination.Factory.class));
     bind(ReplicationQueue.class).in(Scopes.SINGLETON);
+    bind(ObservableQueue.class).to(ReplicationQueue.class);
     bind(LifecycleListener.class)
         .annotatedWith(UniqueAnnotations.create())
         .to(ReplicationQueue.class);
@@ -57,7 +78,22 @@
 
     install(new FactoryModuleBuilder().build(PushAll.Factory.class));
 
-    bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
+    bind(EventBus.class).in(Scopes.SINGLETON);
+    bind(ReplicationDestinations.class).to(DestinationsCollection.class);
+    bind(ConfigParser.class).to(DestinationConfigParser.class).in(Scopes.SINGLETON);
+
+    if (getReplicationConfig().getBoolean("gerrit", "autoReload", false)) {
+      bind(ReplicationConfig.class)
+          .annotatedWith(MainReplicationConfig.class)
+          .to(getReplicationConfigClass());
+      bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class).in(Scopes.SINGLETON);
+      bind(LifecycleListener.class)
+          .annotatedWith(UniqueAnnotations.create())
+          .to(AutoReloadConfigDecorator.class);
+    } else {
+      bind(ReplicationConfig.class).to(getReplicationConfigClass()).in(Scopes.SINGLETON);
+    }
+
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
     DynamicSet.bind(binder(), ReplicationStateListener.class).to(ReplicationStateLogger.class);
 
@@ -68,4 +104,22 @@
 
     bind(TransportFactory.class).to(TransportFactoryImpl.class).in(Scopes.SINGLETON);
   }
+
+  private FileBasedConfig getReplicationConfig() {
+    File replicationConfigFile = cfgPath.toFile();
+    FileBasedConfig config = new FileBasedConfig(replicationConfigFile, FS.DETECTED);
+    try {
+      config.load();
+    } catch (IOException | ConfigInvalidException e) {
+      throw new ProvisionException("Unable to load " + replicationConfigFile.getAbsolutePath(), e);
+    }
+    return config;
+  }
+
+  private Class<? extends ReplicationConfig> getReplicationConfigClass() {
+    if (Files.exists(site.etc_dir.resolve("replication"))) {
+      return FanoutReplicationConfig.class;
+    }
+    return ReplicationFileBasedConfig.class;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 51f0c12..4625407 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -17,15 +17,16 @@
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.Queues;
 import com.google.gerrit.common.UsedAt;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
@@ -40,7 +41,8 @@
 
 /** Manages automatic replication to remote repositories. */
 public class ReplicationQueue
-    implements LifecycleListener,
+    implements ObservableQueue,
+        LifecycleListener,
         GitReferenceUpdatedListener,
         ProjectDeletedListener,
         HeadUpdatedListener {
@@ -51,7 +53,7 @@
 
   private final WorkQueue workQueue;
   private final DynamicItem<EventDispatcher> dispatcher;
-  private final ReplicationConfig config;
+  private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency
   private final ReplicationTasksStorage replicationTasksStorage;
   private volatile boolean running;
   private volatile boolean replaying;
@@ -60,13 +62,13 @@
   @Inject
   ReplicationQueue(
       WorkQueue wq,
-      ReplicationConfig rc,
+      Provider<ReplicationDestinations> rd,
       DynamicItem<EventDispatcher> dis,
       ReplicationStateListeners sl,
       ReplicationTasksStorage rts) {
     workQueue = wq;
     dispatcher = dis;
-    config = rc;
+    destinations = rd;
     stateLog = sl;
     replicationTasksStorage = rts;
     beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
@@ -75,8 +77,9 @@
   @Override
   public void start() {
     if (!running) {
-      config.startup(workQueue);
+      destinations.get().startup(workQueue);
       running = true;
+      replicationTasksStorage.resetAll();
       Thread t = new Thread(this::firePendingEvents, "firePendingEvents");
       t.setDaemon(true);
       t.start();
@@ -87,64 +90,60 @@
   @Override
   public void stop() {
     running = false;
-    int discarded = config.shutdown();
+    int discarded = destinations.get().shutdown();
     if (discarded > 0) {
       repLog.warn("Canceled {} replication events during shutdown", discarded);
     }
   }
 
+  @Override
   public boolean isRunning() {
     return running;
   }
 
+  @Override
   public boolean isReplaying() {
     return replaying;
   }
 
   public void scheduleFullSync(
       Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
-    if (!running) {
-      stateLog.warn("Replication plugin did not finish startup before event", state);
-      return;
-    }
-
-    for (Destination cfg : config.getDestinations(FilterType.ALL)) {
-      if (cfg.wouldPushProject(project)) {
-        for (URIish uri : cfg.getURIs(project, urlMatch)) {
-          cfg.schedule(project, PushOne.ALL_REFS, uri, state, now);
-          replicationTasksStorage.persist(
-              new ReplicateRefUpdate(
-                  project.get(), PushOne.ALL_REFS, uri, cfg.getRemoteConfigName()));
-        }
-      }
-    }
+    fire(project, urlMatch, PushOne.ALL_REFS, state, now);
   }
 
   @Override
   public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
-    onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+    fire(event.getProjectName(), event.getRefName());
   }
 
-  private void onGitReferenceUpdated(String projectName, String refName) {
+  private void fire(String projectName, String refName) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+    fire(Project.nameKey(projectName), null, refName, state, false);
+    state.markAllPushTasksScheduled();
+  }
+
+  private void fire(
+      Project.NameKey project,
+      String urlMatch,
+      String refName,
+      ReplicationState state,
+      boolean now) {
     if (!running) {
       stateLog.warn(
           "Replication plugin did not finish startup before event, event replication is postponed",
           state);
-      beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(projectName, refName));
+      beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(project.get(), refName));
       return;
     }
 
-    Project.NameKey project = new Project.NameKey(projectName);
-    for (Destination cfg : config.getDestinations(FilterType.ALL)) {
-      pushReference(cfg, project, refName, state);
+    for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
+      pushReference(cfg, project, urlMatch, refName, state, now);
     }
-    state.markAllPushTasksScheduled();
   }
 
   private void fire(URIish uri, Project.NameKey project, String refName) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
-    for (Destination dest : config.getDestinations(uri, project, refName)) {
+    for (Destination dest : destinations.get().getDestinations(uri, project, refName)) {
       dest.schedule(project, refName, uri, state);
     }
     state.markAllPushTasksScheduled();
@@ -152,26 +151,29 @@
 
   @UsedAt(UsedAt.Project.COLLABNET)
   public void pushReference(Destination cfg, Project.NameKey project, String refName) {
-    pushReference(cfg, project, refName, null);
+    pushReference(cfg, project, null, refName, null, true);
   }
 
   private void pushReference(
-      Destination cfg, Project.NameKey project, String refName, ReplicationState state) {
+      Destination cfg,
+      Project.NameKey project,
+      String urlMatch,
+      String refName,
+      ReplicationState state,
+      boolean now) {
     boolean withoutState = state == null;
     if (withoutState) {
       state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
     }
-
     if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
-      for (URIish uri : cfg.getURIs(project, null)) {
-        replicationTasksStorage.persist(
+      for (URIish uri : cfg.getURIs(project, urlMatch)) {
+        replicationTasksStorage.create(
             new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
-        cfg.schedule(project, refName, uri, state);
+        cfg.schedule(project, refName, uri, state, now);
       }
     } else {
       repLog.debug("Skipping ref {} on project {}", refName, project.get());
     }
-
     if (withoutState) {
       state.markAllPushTasksScheduled();
     }
@@ -181,13 +183,13 @@
     replaying = true;
     try {
       replaying = true;
-      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) {
+      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
         if (t == null) {
           repLog.warn("Encountered null replication event in ReplicationTasksStorage");
           continue;
         }
         try {
-          fire(new URIish(t.uri), new Project.NameKey(t.project), t.ref);
+          fire(new URIish(t.uri), Project.nameKey(t.project), t.ref);
         } catch (URISyntaxException e) {
           repLog.error("Encountered malformed URI for persisted event %s", t);
         }
@@ -201,15 +203,15 @@
 
   @Override
   public void onProjectDeleted(ProjectDeletedListener.Event event) {
-    Project.NameKey p = new Project.NameKey(event.getProjectName());
-    config.getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries().stream()
+    Project.NameKey p = Project.nameKey(event.getProjectName());
+    destinations.get().getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries().stream()
         .forEach(e -> e.getKey().scheduleDeleteProject(e.getValue(), p));
   }
 
   @Override
   public void onHeadUpdated(HeadUpdatedListener.Event event) {
-    Project.NameKey p = new Project.NameKey(event.getProjectName());
-    config.getURIs(Optional.empty(), p, FilterType.ALL).entries().stream()
+    Project.NameKey p = Project.nameKey(event.getProjectName());
+    destinations.get().getURIs(Optional.empty(), p, FilterType.ALL).entries().stream()
         .forEach(e -> e.getKey().scheduleUpdateHead(e.getValue(), p, event.getNewHeadName()));
   }
 
@@ -219,7 +221,7 @@
       String eventKey = String.format("%s:%s", event.projectName(), event.refName());
       if (!eventsReplayed.contains(eventKey)) {
         repLog.info("Firing pending task {}", event);
-        onGitReferenceUpdated(event.projectName(), event.refName());
+        fire(event.projectName(), event.refName());
         eventsReplayed.add(eventKey);
       }
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
index aa965fe..28f6b6b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.events.RefEvent;
 
 public class ReplicationScheduledEvent extends RefEvent {
@@ -38,6 +38,6 @@
 
   @Override
   public Project.NameKey getProjectNameKey() {
-    return new Project.NameKey(project);
+    return Project.nameKey(project);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index fd92e27..ead218b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -24,15 +24,38 @@
 import com.google.inject.ProvisionException;
 import com.google.inject.Singleton;
 import java.io.IOException;
+import java.nio.file.DirectoryIteratorException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.List;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.transport.URIish;
 
+/**
+ * A persistent store for replication tasks.
+ *
+ * <p>The data of this store lives under <replication_data>/ref-updates where replication_data is
+ * determined by the replication.eventsDirectory config option and defaults to
+ * <site_dir>/data/replication. Atomic renames must be supported from anywhere within the store to
+ * anywhere within the store. This generally means that all the contents of the store needs to live
+ * on the same filesystem.
+ *
+ * <p>Individual tasks are stored in files under the following directories using the sha1 of the
+ * task:
+ *
+ * <p><code>
+ *   .../building/<tmp_name>  new replication tasks under construction
+ *   .../running/<sha1>       running replication tasks
+ *   .../waiting/<sha1>       outstanding replication tasks
+ * </code>
+ *
+ * <p>Tasks are moved atomically via a rename between those directories to indicate the current
+ * state of each task.
+ */
 @Singleton
 public class ReplicationTasksStorage {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -58,7 +81,9 @@
 
   private static final Gson GSON = new Gson();
 
-  private final Path refUpdates;
+  private final Path buildingUpdates;
+  private final Path runningUpdates;
+  private final Path waitingUpdates;
 
   @Inject
   ReplicationTasksStorage(ReplicationConfig config) {
@@ -67,44 +92,50 @@
 
   @VisibleForTesting
   public ReplicationTasksStorage(Path refUpdates) {
-    this.refUpdates = refUpdates;
+    buildingUpdates = refUpdates.resolve("building");
+    runningUpdates = refUpdates.resolve("running");
+    waitingUpdates = refUpdates.resolve("waiting");
   }
 
-  public String persist(ReplicateRefUpdate r) {
-    String json = GSON.toJson(r) + "\n";
-    String key = r.project + "\n" + r.ref + "\n" + r.uri + "\n" + r.remote;
-    String eventKey = sha1(key).name();
-    Path file = refUpdates().resolve(eventKey);
-
-    if (Files.exists(file)) {
-      return eventKey;
-    }
-
-    try {
-      logger.atFine().log("CREATE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
-      Files.write(file, json.getBytes(UTF_8));
-    } catch (IOException e) {
-      logger.atWarning().withCause(e).log("Couldn't persist event %s", json);
-    }
-    return eventKey;
+  public synchronized String create(ReplicateRefUpdate r) {
+    return new Task(r).create();
   }
 
-  public void delete(ReplicateRefUpdate r) {
-    String key = r.project + "\n" + r.ref + "\n" + r.uri + "\n" + r.remote;
-    String taskKey = sha1(key).name();
-    Path file = refUpdates().resolve(taskKey);
-
-    try {
-      logger.atFine().log("DELETE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
-      Files.delete(file);
-    } catch (IOException e) {
-      logger.atSevere().withCause(e).log("Error while deleting event %s", taskKey);
+  public synchronized void start(UriUpdates uriUpdates) {
+    for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
+      new Task(update).start();
     }
   }
 
-  public List<ReplicateRefUpdate> list() {
+  public synchronized void reset(UriUpdates uriUpdates) {
+    for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
+      new Task(update).reset();
+    }
+  }
+
+  public synchronized void resetAll() {
+    for (ReplicateRefUpdate r : listRunning()) {
+      new Task(r).reset();
+    }
+  }
+
+  public synchronized void finish(UriUpdates uriUpdates) {
+    for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
+      new Task(update).finish();
+    }
+  }
+
+  public List<ReplicateRefUpdate> listWaiting() {
+    return list(createDir(waitingUpdates));
+  }
+
+  public List<ReplicateRefUpdate> listRunning() {
+    return list(createDir(runningUpdates));
+  }
+
+  private List<ReplicateRefUpdate> list(Path tasks) {
     List<ReplicateRefUpdate> results = new ArrayList<>();
-    try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) {
+    try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) {
       for (Path path : events) {
         if (Files.isRegularFile(path)) {
           try {
@@ -117,10 +148,17 @@
           } catch (IOException e) {
             logger.atSevere().withCause(e).log("Error when firing pending event %s", path);
           }
+        } else if (Files.isDirectory(path)) {
+          try {
+            results.addAll(list(path));
+          } catch (DirectoryIteratorException d) {
+            // iterating over the sub-directories is expected to have dirs disappear
+            Nfs.throwIfNotStaleFileHandle(d.getCause());
+          }
         }
       }
     } catch (IOException e) {
-      logger.atSevere().withCause(e).log("Error when firing pending events");
+      logger.atSevere().withCause(e).log("Error while listing tasks");
     }
     return results;
   }
@@ -130,11 +168,76 @@
     return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
   }
 
-  private Path refUpdates() {
+  private static Path createDir(Path dir) {
     try {
-      return Files.createDirectories(refUpdates);
+      return Files.createDirectories(dir);
     } catch (IOException e) {
-      throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e);
+      throw new ProvisionException(String.format("Couldn't create %s", dir), e);
+    }
+  }
+
+  @VisibleForTesting
+  class Task {
+    public final ReplicateRefUpdate update;
+    public final String json;
+    public final String taskKey;
+    public final Path running;
+    public final Path waiting;
+
+    public Task(ReplicateRefUpdate update) {
+      this.update = update;
+      json = GSON.toJson(update) + "\n";
+      String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
+      taskKey = sha1(key).name();
+      running = createDir(runningUpdates).resolve(taskKey);
+      waiting = createDir(waitingUpdates).resolve(taskKey);
+    }
+
+    public String create() {
+      if (Files.exists(waiting)) {
+        return taskKey;
+      }
+
+      try {
+        Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null);
+        logger.atFine().log("CREATE %s %s", tmp, updateLog());
+        Files.write(tmp, json.getBytes(UTF_8));
+        logger.atFine().log("RENAME %s %s %s", tmp, waiting, updateLog());
+        rename(tmp, waiting);
+      } catch (IOException e) {
+        logger.atWarning().withCause(e).log("Couldn't create task %s", json);
+      }
+      return taskKey;
+    }
+
+    public void start() {
+      rename(waiting, running);
+    }
+
+    public void reset() {
+      rename(running, waiting);
+    }
+
+    public void finish() {
+      try {
+        logger.atFine().log("DELETE %s %s", running, updateLog());
+        Files.delete(running);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+      }
+    }
+
+    private void rename(Path from, Path to) {
+      try {
+        logger.atFine().log("RENAME %s to %s %s", from, to, updateLog());
+        Files.move(from, to, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while renaming task %s", taskKey);
+      }
+    }
+
+    private String updateLog() {
+      return String.format("(%s:%s => %s)", update.project, update.ref, update.uri);
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
index 70452b4..ffa6be1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
@@ -16,8 +16,8 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.ioutil.HexFormat;
 import com.google.gerrit.server.util.IdGenerator;
 import com.google.inject.Inject;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
new file mode 100644
index 0000000..9c56c8e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
@@ -0,0 +1,41 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.entities.Project;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.eclipse.jgit.transport.URIish;
+
+/** A data container for updates to a single URI */
+public interface UriUpdates {
+  Project.NameKey getProjectNameKey();
+
+  URIish getURI();
+
+  String getRemoteName();
+
+  Set<String> getRefs();
+
+  default List<ReplicationTasksStorage.ReplicateRefUpdate> getReplicateRefUpdates() {
+    return getRefs().stream()
+        .map(
+            (ref) ->
+                new ReplicationTasksStorage.ReplicateRefUpdate(
+                    getProjectNameKey().get(), ref, getURI(), getRemoteName()))
+        .collect(Collectors.toList());
+  }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 194238c..0aad73b 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -142,6 +142,11 @@
 	persisted events will not be deleted. When the plugin is started again,
 	it will trigger all replications found under this directory.
 
+	For replication to work, is is important that atomic renames be possible
+	from within any subdirectory of the eventsDirectory to within any other
+	subdirectory of the eventsDirectory. This generally means that the entire
+	contents of the eventsDirectory should live on the same filesystem.
+
 	When not set, defaults to the plugin's data directory.
 
 remote.NAME.url
@@ -442,6 +447,88 @@
 	By default, replicates without matching, i.e. replicates
 	everything to all remotes.
 
+remote.NAME.slowLatencyThreshold
+:	the time duration after which the replication of a project to this
+	destination will be considered "slow". A slow project replication
+	will cause additional metrics to be exposed for further investigation.
+	See [metrics.md](metrics.md) for further details.
+
+	default: 15 minutes
+
+Directory `replication`
+--------------------
+The optional directory `$site_path/etc/replication` contains Git-style
+config files that controls the replication settings for the replication
+plugin. When present all `remote` sections from `replication.config` file are
+ignored.
+
+Files are composed of one `remote` section. Multiple `remote` sections or any
+other section makes the file invalid and skipped by the replication plugin.
+File name defines remote section name. Each section provides common configuration
+settings for one or more destination URLs. For more details how to setup `remote`
+sections please refer to the `replication.config` section.
+
+### Configuration example:
+
+Static configuration in `$site_path/etc/replication.config`:
+
+```
+[gerrit]
+    autoReload = true
+    replicateOnStartup = false
+[replication]
+    lockErrorMaxRetries = 5
+    maxRetries = 5
+```
+
+Remote sections in `$site_path/etc/replication` directory:
+
+* File `$site_path/etc/replication/host-one.config`
+
+ ```
+ [remote]
+    url = gerrit2@host-one.example.com:/some/path/${name}.git
+ ```
+
+
+* File `$site_path/etc/replication/pubmirror.config`
+
+ ```
+  [remote]
+    url = mirror1.us.some.org:/pub/git/${name}.git
+    url = mirror2.us.some.org:/pub/git/${name}.git
+    url = mirror3.us.some.org:/pub/git/${name}.git
+    push = +refs/heads/*:refs/heads/*
+    push = +refs/tags/*:refs/tags/*
+    threads = 3
+    authGroup = Public Mirror Group
+    authGroup = Second Public Mirror Group
+ ```
+
+Replication plugin resolves config files to the following configuration:
+
+```
+[gerrit]
+    autoReload = true
+    replicateOnStartup = false
+[replication]
+    lockErrorMaxRetries = 5
+    maxRetries = 5
+
+[remote "host-one"]
+    url = gerrit2@host-one.example.com:/some/path/${name}.git
+
+[remote "pubmirror"]
+    url = mirror1.us.some.org:/pub/git/${name}.git
+    url = mirror2.us.some.org:/pub/git/${name}.git
+    url = mirror3.us.some.org:/pub/git/${name}.git
+    push = +refs/heads/*:refs/heads/*
+    push = +refs/tags/*:refs/tags/*
+    threads = 3
+    authGroup = Public Mirror Group
+    authGroup = Second Public Mirror Group
+```
+
 File `secure.config`
 --------------------
 
diff --git a/src/main/resources/Documentation/metrics.md b/src/main/resources/Documentation/metrics.md
new file mode 100644
index 0000000..ef8b150
--- /dev/null
+++ b/src/main/resources/Documentation/metrics.md
@@ -0,0 +1,61 @@
+# Metrics
+
+Some metrics are emitted when replication occurs to a remote destination.
+The granularity of the metrics recorded is at destination level, however when a particular project replication is flagged
+as slow. This happens when the replication took longer than allowed threshold (see _remote.NAME.slowLatencyThreshold_ in [config.md](config.md))
+
+The reason only slow metrics are published, rather than all, is to contain their number, which, on a big Gerrit installation
+could potentially be considerably big.
+
+### Project level
+
+* plugins_replication_latency_slower_than_<threshold>_<destinationName>_<ProjectName> - Time spent pushing <ProjectName> to remote <destinationName> (in ms)
+
+### Destination level
+
+* plugins_replication_replication_delay_<destinationName> - Time spent waiting before pushing to remote <destinationName> (in ms)
+* plugins_replication_replication_retries_<destinationName> - Number of retries when pushing to remote <destinationName>
+* plugins_replication_replication_latency_<destinationName> - Time spent pushing to remote <destinationName> (in ms)
+
+### Example
+```
+# HELP plugins_replication_replication_delay_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_delay/destination, type=com.codahale.metrics.Histogram)
+# TYPE plugins_replication_replication_delay_destination summary
+plugins_replication_replication_delay_destinationName{quantile="0.5",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.75",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.95",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.98",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.99",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.999",} 65726.0
+plugins_replication_replication_delay_destinationName_count 3.0
+
+# HELP plugins_replication_replication_retries_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_retries/destination, type=com.codahale.metrics.Histogram)
+# TYPE plugins_replication_replication_retries_destination summary
+plugins_replication_replication_retries_destinationName{quantile="0.5",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.75",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.95",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.98",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.99",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.999",} 1.0
+plugins_replication_replication_retries_destinationName_count 3.0
+
+# HELP plugins_replication_replication_latency_destinationName Generated from Dropwizard metric import (metric=plugins/replication/replication_latency/destinationName, type=com.codahale.metrics.Timer)
+# TYPE plugins_replication_replication_latency_destinationName summary
+plugins_replication_replication_latency_destinationName{quantile="0.5",} 0.21199641400000002
+plugins_replication_replication_latency_destinationName{quantile="0.75",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.95",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.98",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.99",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.999",} 0.321083881
+plugins_replication_replication_latency_destinationName_count 2.0
+
+# HELP plugins_replication_latency_slower_than_60_destinationName_projectName Generated from Dropwizard metric import (metric=plugins/replication/latency_slower_than/60/destinationName/projectName, type=com.codahale.metrics.Histogram)
+# TYPE plugins_replication_latency_slower_than_60_destinationName_projectName summary
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.5",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.75",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.95",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.98",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.99",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.999",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName 1.0
+```
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
index 77dc1cc..2b6a8c4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
@@ -16,26 +16,28 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static java.nio.file.Files.createTempDirectory;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.getCurrentArguments;
-import static org.easymock.EasyMock.isA;
-import static org.easymock.EasyMock.replay;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+import com.google.common.eventbus.EventBus;
 import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Injector;
 import com.google.inject.Module;
+import com.google.inject.util.Providers;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.stream.Collectors;
-import org.easymock.IAnswer;
+import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.util.FS;
 import org.junit.Before;
 import org.junit.Ignore;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 @Ignore
 public abstract class AbstractConfigTest {
@@ -43,6 +45,11 @@
   protected final SitePaths sitePaths;
   protected final Destination.Factory destinationFactoryMock;
   protected final Path pluginDataPath;
+  protected ReplicationQueue replicationQueueMock;
+  protected WorkQueue workQueueMock;
+  protected EventBus eventBus = new EventBus();
+  protected FakeExecutorService executorService = new FakeExecutorService();
+  protected ConfigParser configParser;
 
   static class FakeDestination extends Destination {
     public final DestinationConfiguration config;
@@ -53,11 +60,9 @@
     }
 
     private static Injector injectorMock() {
-      Injector injector = createNiceMock(Injector.class);
-      Injector childInjectorMock = createNiceMock(Injector.class);
-      expect(injector.createChildInjector((Module) anyObject())).andReturn(childInjectorMock);
-      replay(childInjectorMock);
-      replay(injector);
+      Injector injector = mock(Injector.class);
+      Injector childInjectorMock = mock(Injector.class);
+      when(injector.createChildInjector(any(Module.class))).thenReturn(childInjectorMock);
       return injector;
     }
   }
@@ -66,21 +71,26 @@
     sitePath = createTempPath("site");
     sitePaths = new SitePaths(sitePath);
     pluginDataPath = createTempPath("data");
-    destinationFactoryMock = createMock(Destination.Factory.class);
+    destinationFactoryMock = mock(Destination.Factory.class);
+    configParser = new DestinationConfigParser();
   }
 
   @Before
   public void setup() {
-    expect(destinationFactoryMock.create(isA(DestinationConfiguration.class)))
-        .andAnswer(
-            new IAnswer<Destination>() {
+    when(destinationFactoryMock.create(any(DestinationConfiguration.class)))
+        .thenAnswer(
+            new Answer<Destination>() {
               @Override
-              public Destination answer() throws Throwable {
-                return new FakeDestination((DestinationConfiguration) getCurrentArguments()[0]);
+              public Destination answer(InvocationOnMock invocation) throws Throwable {
+                return new FakeDestination((DestinationConfiguration) invocation.getArguments()[0]);
               }
-            })
-        .anyTimes();
-    replay(destinationFactoryMock);
+            });
+
+    replicationQueueMock = mock(ReplicationQueue.class);
+    when(replicationQueueMock.isRunning()).thenReturn(Boolean.TRUE);
+
+    workQueueMock = mock(WorkQueue.class);
+    when(workQueueMock.createQueue(anyInt(), any(String.class))).thenReturn(executorService);
   }
 
   protected static Path createTempPath(String prefix) throws IOException {
@@ -88,8 +98,12 @@
   }
 
   protected FileBasedConfig newReplicationConfig() {
+    return newReplicationConfig("replication.config");
+  }
+
+  protected FileBasedConfig newReplicationConfig(String path) {
     FileBasedConfig replicationConfig =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+        new FileBasedConfig(sitePaths.etc_dir.resolve(path).toFile(), FS.DETECTED);
     return replicationConfig;
   }
 
@@ -113,4 +127,18 @@
 
     assertThatIsDestination(matchingDestinations.get(0), remoteName, remoteUrls);
   }
+
+  protected DestinationsCollection newDestinationsCollections(ReplicationConfig replicationConfig)
+      throws ConfigInvalidException {
+    return new DestinationsCollection(
+        destinationFactoryMock,
+        Providers.of(replicationQueueMock),
+        replicationConfig,
+        configParser,
+        eventBus);
+  }
+
+  protected ReplicationConfig newReplicationFileBasedConfig() {
+    return new ReplicationFileBasedConfig(sitePaths, pluginDataPath);
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
index 211cafa..b1b9453 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
@@ -15,192 +15,41 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import static com.google.common.truth.Truth.assertThat;
-import static org.easymock.EasyMock.anyInt;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
 
-import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Provider;
 import com.google.inject.util.Providers;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.junit.Before;
 import org.junit.Test;
 
 public class AutoReloadConfigDecoratorTest extends AbstractConfigTest {
-  private AutoReloadConfigDecorator autoReloadConfig;
-  private ReplicationQueue replicationQueueMock;
-  private WorkQueue workQueueMock;
-  private FakeExecutorService executorService = new FakeExecutorService();
-
-  public class FakeExecutorService implements ScheduledExecutorService {
-    public Runnable refreshCommand;
-
-    @Override
-    public void shutdown() {}
-
-    @Override
-    public List<Runnable> shutdownNow() {
-      return null;
-    }
-
-    @Override
-    public boolean isShutdown() {
-      return false;
-    }
-
-    @Override
-    public boolean isTerminated() {
-      return false;
-    }
-
-    @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
-      return false;
-    }
-
-    @Override
-    public <T> Future<T> submit(Callable<T> task) {
-      return null;
-    }
-
-    @Override
-    public <T> Future<T> submit(Runnable task, T result) {
-      return null;
-    }
-
-    @Override
-    public Future<?> submit(Runnable task) {
-      return null;
-    }
-
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException {
-      return null;
-    }
-
-    @Override
-    public <T> List<Future<T>> invokeAll(
-        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-        throws InterruptedException {
-      return null;
-    }
-
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException, ExecutionException {
-      return null;
-    }
-
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-      return null;
-    }
-
-    @Override
-    public void execute(Runnable command) {}
-
-    @Override
-    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
-      return null;
-    }
-
-    @Override
-    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
-      return null;
-    }
-
-    @Override
-    public ScheduledFuture<?> scheduleAtFixedRate(
-        Runnable command, long initialDelay, long period, TimeUnit unit) {
-      refreshCommand = command;
-      return null;
-    }
-
-    @Override
-    public ScheduledFuture<?> scheduleWithFixedDelay(
-        Runnable command, long initialDelay, long delay, TimeUnit unit) {
-      return null;
-    }
-  }
+  ReplicationConfig replicationConfig;
 
   public AutoReloadConfigDecoratorTest() throws IOException {
     super();
   }
 
-  @Override
-  @Before
-  public void setup() {
-    super.setup();
-
-    setupMocks();
-  }
-
-  private void setupMocks() {
-    replicationQueueMock = createNiceMock(ReplicationQueue.class);
-    expect(replicationQueueMock.isRunning()).andReturn(true);
-    replay(replicationQueueMock);
-
-    workQueueMock = createNiceMock(WorkQueue.class);
-    expect(workQueueMock.createQueue(anyInt(), anyObject(String.class))).andReturn(executorService);
-    replay(workQueueMock);
-  }
-
-  @Test
-  public void shouldLoadNotEmptyInitialReplicationConfig() throws Exception {
-    FileBasedConfig replicationConfig = newReplicationConfig();
-    String remoteName = "foo";
-    String remoteUrl = "ssh://git@git.somewhere.com/${name}";
-    replicationConfig.setString("remote", remoteName, "url", remoteUrl);
-    replicationConfig.save();
-
-    autoReloadConfig =
-        new AutoReloadConfigDecorator(
-            sitePaths,
-            destinationFactoryMock,
-            Providers.of(replicationQueueMock),
-            pluginDataPath,
-            "replication",
-            workQueueMock);
-
-    List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL);
-    assertThat(destinations).hasSize(1);
-    assertThatIsDestination(destinations.get(0), remoteName, remoteUrl);
-  }
-
   @Test
   public void shouldAutoReloadReplicationConfig() throws Exception {
-    FileBasedConfig replicationConfig = newReplicationConfig();
-    replicationConfig.setBoolean("gerrit", null, "autoReload", true);
+    FileBasedConfig fileConfig = newReplicationConfig();
+    fileConfig.setBoolean("gerrit", null, "autoReload", true);
     String remoteName1 = "foo";
     String remoteUrl1 = "ssh://git@git.foo.com/${name}";
-    replicationConfig.setString("remote", remoteName1, "url", remoteUrl1);
-    replicationConfig.save();
+    fileConfig.setString("remote", remoteName1, "url", remoteUrl1);
+    fileConfig.save();
 
-    autoReloadConfig =
-        new AutoReloadConfigDecorator(
-            sitePaths,
-            destinationFactoryMock,
-            Providers.of(replicationQueueMock),
-            pluginDataPath,
-            "replication",
-            workQueueMock);
-    autoReloadConfig.startup(workQueueMock);
+    replicationConfig = newReplicationFileBasedConfig();
 
-    List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL);
+    newAutoReloadConfig(() -> newReplicationFileBasedConfig()).start();
+
+    DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(1);
     assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
 
@@ -208,45 +57,195 @@
 
     String remoteName2 = "bar";
     String remoteUrl2 = "ssh://git@git.bar.com/${name}";
-    replicationConfig.setString("remote", remoteName2, "url", remoteUrl2);
-    replicationConfig.save();
+    fileConfig.setString("remote", remoteName2, "url", remoteUrl2);
+    fileConfig.save();
     executorService.refreshCommand.run();
 
-    destinations = autoReloadConfig.getDestinations(FilterType.ALL);
+    destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(2);
     assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
     assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
   }
 
   @Test
+  public void shouldAutoReloadFanoutReplicationConfigWhenConfigIsAdded() throws Exception {
+    String remoteName1 = "foo";
+    String remoteUrl1 = "ssh://git@git.foo.com/${name}";
+    FileBasedConfig remoteConfig = newReplicationConfig("replication/" + remoteName1 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl1);
+    remoteConfig.save();
+
+    replicationConfig = new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    newAutoReloadConfig(
+            () -> {
+              try {
+                return new FanoutReplicationConfig(sitePaths, pluginDataPath);
+              } catch (IOException | ConfigInvalidException e) {
+                throw new RuntimeException(e);
+              }
+            })
+        .start();
+
+    DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+
+    TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS
+
+    String remoteName2 = "foobar";
+    String remoteUrl2 = "ssh://git@git.foobar.com/${name}";
+    remoteConfig = newReplicationConfig("replication/" + remoteName2 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl2);
+    remoteConfig.save();
+    executorService.refreshCommand.run();
+
+    destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(2);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
+  }
+
+  @Test
+  public void shouldAutoReloadFanoutReplicationConfigWhenConfigIsRemoved() throws Exception {
+    String remoteName1 = "foo";
+    String remoteUrl1 = "ssh://git@git.foo.com/${name}";
+    FileBasedConfig remoteConfig = newReplicationConfig("replication/" + remoteName1 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl1);
+    remoteConfig.save();
+
+    String remoteName2 = "foobar";
+    String remoteUrl2 = "ssh://git@git.foobar.com/${name}";
+    remoteConfig = newReplicationConfig("replication/" + remoteName2 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl2);
+    remoteConfig.save();
+
+    replicationConfig = new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    newAutoReloadConfig(
+            () -> {
+              try {
+                return new FanoutReplicationConfig(sitePaths, pluginDataPath);
+              } catch (IOException | ConfigInvalidException e) {
+                throw new RuntimeException(e);
+              }
+            })
+        .start();
+
+    DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(2);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
+
+    TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS
+
+    assertThat(
+            sitePaths.etc_dir.resolve("replication/" + remoteName2 + ".config").toFile().delete())
+        .isTrue();
+
+    executorService.refreshCommand.run();
+
+    destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+  }
+
+  @Test
+  public void shouldAutoReloadFanoutReplicationConfigWhenConfigIsModified() throws Exception {
+    String remoteName1 = "foo";
+    String remoteUrl1 = "ssh://git@git.foo.com/${name}";
+    FileBasedConfig remoteConfig = newReplicationConfig("replication/" + remoteName1 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl1);
+    remoteConfig.save();
+
+    String remoteName2 = "bar";
+    String remoteUrl2 = "ssh://git@git.bar.com/${name}";
+    remoteConfig = newReplicationConfig("replication/" + remoteName2 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl2);
+    remoteConfig.save();
+
+    replicationConfig = new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    newAutoReloadConfig(
+            () -> {
+              try {
+                return new FanoutReplicationConfig(sitePaths, pluginDataPath);
+              } catch (IOException | ConfigInvalidException e) {
+                throw new RuntimeException(e);
+              }
+            })
+        .start();
+
+    DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(2);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
+
+    TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS
+
+    String remoteUrl3 = "ssh://git@git.foobar.com/${name}";
+    remoteConfig.setString("remote", null, "url", remoteUrl3);
+    remoteConfig.save();
+
+    executorService.refreshCommand.run();
+
+    destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(2);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl3);
+  }
+
+  @Test
   public void shouldNotAutoReloadReplicationConfigIfDisabled() throws Exception {
     String remoteName1 = "foo";
     String remoteUrl1 = "ssh://git@git.foo.com/${name}";
-    FileBasedConfig replicationConfig = newReplicationConfig();
-    replicationConfig.setBoolean("gerrit", null, "autoReload", false);
-    replicationConfig.setString("remote", remoteName1, "url", remoteUrl1);
-    replicationConfig.save();
+    FileBasedConfig fileConfig = newReplicationConfig();
+    fileConfig.setBoolean("gerrit", null, "autoReload", false);
+    fileConfig.setString("remote", remoteName1, "url", remoteUrl1);
+    fileConfig.save();
 
-    autoReloadConfig =
-        new AutoReloadConfigDecorator(
-            sitePaths,
-            destinationFactoryMock,
-            Providers.of(replicationQueueMock),
-            pluginDataPath,
-            "replication",
-            workQueueMock);
-    autoReloadConfig.startup(workQueueMock);
+    replicationConfig = newReplicationFileBasedConfig();
 
-    List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL);
+    DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(1);
     assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
 
     TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS
 
-    replicationConfig.setString("remote", "bar", "url", "ssh://git@git.bar.com/${name}");
-    replicationConfig.save();
+    fileConfig.setString("remote", "bar", "url", "ssh://git@git.bar.com/${name}");
+    fileConfig.save();
     executorService.refreshCommand.run();
 
-    assertThat(autoReloadConfig.getDestinations(FilterType.ALL)).isEqualTo(destinations);
+    assertThat(destinationsCollections.getAll(FilterType.ALL)).isEqualTo(destinations);
+  }
+
+  private AutoReloadConfigDecorator newAutoReloadConfig(
+      Supplier<ReplicationConfig> configSupplier) {
+    AutoReloadRunnable autoReloadRunnable =
+        new AutoReloadRunnable(
+            configParser,
+            new Provider<ReplicationConfig>() {
+
+              @Override
+              public ReplicationConfig get() {
+                return configSupplier.get();
+              }
+            },
+            eventBus,
+            Providers.of(replicationQueueMock));
+    return new AutoReloadConfigDecorator(
+        "replication",
+        workQueueMock,
+        newReplicationFileBasedConfig(),
+        autoReloadRunnable,
+        eventBus);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
new file mode 100644
index 0000000..e7339d9
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
@@ -0,0 +1,122 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Provider;
+import com.google.inject.util.Providers;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoReloadRunnableTest {
+
+  private SitePaths sitePaths;
+  private EventBus eventBus;
+  private ReloadTrackerSubscriber onReloadSubscriber;
+  private String pluginName;
+  private ReplicationQueue replicationQueueMock;
+
+  @Before
+  public void setUp() throws IOException {
+    Path tmp = Files.createTempFile(pluginName, "_site");
+    Files.deleteIfExists(tmp);
+    sitePaths = new SitePaths(tmp);
+    pluginName = "replication";
+    eventBus = new EventBus();
+    onReloadSubscriber = new ReloadTrackerSubscriber();
+    eventBus.register(onReloadSubscriber);
+
+    replicationQueueMock = mock(ReplicationQueue.class);
+    when(replicationQueueMock.isRunning()).thenReturn(Boolean.TRUE);
+  }
+
+  @Test
+  public void configurationIsReloadedWhenParsingSucceeds() {
+    ConfigParser parser = new TestValidConfigurationListener();
+
+    attemptAutoReload(parser);
+
+    assertThat(onReloadSubscriber.reloaded).isTrue();
+  }
+
+  @Test
+  public void configurationIsNotReloadedWhenParsingFails() {
+    ConfigParser parser = new TestInvalidConfigurationListener();
+
+    attemptAutoReload(parser);
+
+    assertThat(onReloadSubscriber.reloaded).isFalse();
+  }
+
+  private void attemptAutoReload(ConfigParser validator) {
+    final AutoReloadRunnable autoReloadRunnable =
+        new AutoReloadRunnable(
+            validator, newVersionConfigProvider(), eventBus, Providers.of(replicationQueueMock));
+
+    autoReloadRunnable.run();
+  }
+
+  private Provider<ReplicationConfig> newVersionConfigProvider() {
+    return new Provider<ReplicationConfig>() {
+      @Override
+      public ReplicationConfig get() {
+        return new ReplicationFileBasedConfig(sitePaths, sitePaths.data_dir) {
+          @Override
+          public String getVersion() {
+            return String.format("%s", System.nanoTime());
+          }
+        };
+      }
+    };
+  }
+
+  private static class ReloadTrackerSubscriber {
+    public boolean reloaded = false;
+
+    @Subscribe
+    public void onReload(
+        @SuppressWarnings("unused") List<DestinationConfiguration> destinationConfigurations) {
+      reloaded = true;
+    }
+  }
+
+  private static class TestValidConfigurationListener implements ConfigParser {
+    @Override
+    public List<RemoteConfiguration> parseRemotes(Config newConfig) {
+      return Collections.emptyList();
+    }
+  }
+
+  private static class TestInvalidConfigurationListener implements ConfigParser {
+    @Override
+    public List<RemoteConfiguration> parseRemotes(Config configurationChangeEvent)
+        throws ConfigInvalidException {
+      throw new ConfigInvalidException("expected test failure");
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java b/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java
new file mode 100644
index 0000000..2f7059e
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java
@@ -0,0 +1,118 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class FakeExecutorService implements ScheduledExecutorService {
+  public Runnable refreshCommand = () -> {};
+
+  @Override
+  public void shutdown() {}
+
+  @Override
+  public List<Runnable> shutdownNow() {
+    return null;
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return false;
+  }
+
+  @Override
+  public boolean isTerminated() {
+    return false;
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+    return false;
+  }
+
+  @Override
+  public <T> Future<T> submit(Callable<T> task) {
+    return null;
+  }
+
+  @Override
+  public <T> Future<T> submit(Runnable task, T result) {
+    return null;
+  }
+
+  @Override
+  public Future<?> submit(Runnable task) {
+    return null;
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException {
+    return null;
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(
+      Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException {
+    return null;
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException, ExecutionException {
+    return null;
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    return null;
+  }
+
+  @Override
+  public void execute(Runnable command) {}
+
+  @Override
+  public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+    return null;
+  }
+
+  @Override
+  public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+    return null;
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleAtFixedRate(
+      Runnable command, long initialDelay, long period, TimeUnit unit) {
+    refreshCommand = command;
+    return null;
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleWithFixedDelay(
+      Runnable command, long initialDelay, long delay, TimeUnit unit) {
+    return null;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfigTest.java
new file mode 100644
index 0000000..8cba4bb
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfigTest.java
@@ -0,0 +1,286 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.io.MoreFiles;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import java.io.IOException;
+import java.util.List;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FanoutReplicationConfigTest extends AbstractConfigTest {
+
+  public FanoutReplicationConfigTest() throws IOException {
+    super();
+  }
+
+  String remoteName1 = "foo";
+  String remoteUrl1 = "ssh://git@git.somewhere.com/${name}";
+  String remoteName2 = "bar";
+  String remoteUrl2 = "ssh://git@git.elsewhere.com/${name}";
+
+  @Before
+  public void setupTests() {
+    FileBasedConfig config = newReplicationConfig();
+    try {
+      config.save();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void shouldSkipRemoteConfigFromReplicationConfig() throws Exception {
+    String remoteName = "foo";
+    String remoteUrl = "ssh://git@git.somewhere.com/${name}";
+
+    FileBasedConfig config = newReplicationConfig();
+    config.setString("remote", remoteName, "url", remoteUrl);
+    config.save();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+
+    assertThatIsDestination(destinations.get(0), remoteName2, remoteUrl2);
+  }
+
+  @Test
+  public void shouldLoadDestinationsFromMultipleFiles() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(2);
+
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
+  }
+
+  @Test
+  public void shouldIgnoreDestinationsFromSubdirectories() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config = newRemoteConfig("subdirectory/" + remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+
+    assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
+  }
+
+  @Test
+  public void shouldIgnoreNonConfigFiles() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config =
+        new FileBasedConfig(
+            sitePaths.etc_dir.resolve("replication/" + remoteName2 + ".yaml").toFile(),
+            FS.DETECTED);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+
+    assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
+  }
+
+  @Test(expected = ConfigInvalidException.class)
+  public void shouldThrowConfigInvalidExceptionWhenUrlIsMissingName() throws Exception {
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", "ssh://git@git.elsewhere.com/name");
+    config.save();
+
+    newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+  }
+
+  @Test
+  public void shouldIgnoreEmptyConfigFile() throws Exception {
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(0);
+  }
+
+  @Test
+  public void shouldIgnoreConfigWhenMoreThanOneRemoteInASingleFile() throws Exception {
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.setString("remote", remoteName2, "url", remoteUrl2);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(0);
+  }
+
+  @Test
+  public void shouldIgnoreConfigRemoteSection() throws Exception {
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("replication", null, "url", remoteUrl1);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(0);
+  }
+
+  @Test
+  public void shouldReturnSameVersionWhenNoChanges() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    FanoutReplicationConfig objectUnderTest =
+        new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    String version = objectUnderTest.getVersion();
+
+    objectUnderTest = new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    assertThat(objectUnderTest.getVersion()).isEqualTo(version);
+  }
+
+  @Test
+  public void shouldReturnNewVersionWhenConfigFileAdded() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    FanoutReplicationConfig objectUnderTest =
+        new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    String version = objectUnderTest.getVersion();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    assertThat(objectUnderTest.getVersion()).isNotEqualTo(version);
+  }
+
+  @Test
+  public void shouldReturnNewVersionWhenConfigFileIsModified() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    FanoutReplicationConfig objectUnderTest =
+        new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    String version = objectUnderTest.getVersion();
+
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    assertThat(objectUnderTest.getVersion()).isNotEqualTo(version);
+  }
+
+  @Test
+  public void shouldReturnNewVersionWhenConfigFileRemoved() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    FanoutReplicationConfig objectUnderTest =
+        new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    String version = objectUnderTest.getVersion();
+    assertThat(
+            sitePaths.etc_dir.resolve("replication/" + remoteName2 + ".config").toFile().delete())
+        .isTrue();
+
+    assertThat(objectUnderTest.getVersion()).isNotEqualTo(version);
+  }
+
+  @Test
+  public void shouldReturnReplicationConfigVersionWhenReplicationConfigDirectoryRemoved()
+      throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    FanoutReplicationConfig objectUnderTest =
+        new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    String replicationConfigVersion =
+        new ReplicationFileBasedConfig(sitePaths, pluginDataPath).getVersion();
+
+    MoreFiles.deleteRecursively(sitePaths.etc_dir.resolve("replication"), ALLOW_INSECURE);
+
+    assertThat(objectUnderTest.getVersion()).isEqualTo(replicationConfigVersion);
+  }
+
+  protected FileBasedConfig newRemoteConfig(String configFileName) {
+    return new FileBasedConfig(
+        sitePaths.etc_dir.resolve("replication/" + configFileName + ".config").toFile(),
+        FS.DETECTED);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
index 0f6d629..2ee9a39 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
@@ -14,11 +14,10 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
-import static org.easymock.EasyMock.verify;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.permissions.PermissionBackendException;
@@ -36,14 +35,12 @@
 
   @Before
   public void setUp() throws Exception {
-    dispatcherMock = createMock(EventDispatcher.class);
-    replay(dispatcherMock);
+    dispatcherMock = mock(EventDispatcher.class);
     gitUpdateProcessing = new GitUpdateProcessing(dispatcherMock);
   }
 
   @Test
   public void headRefReplicated() throws URISyntaxException, PermissionBackendException {
-    reset(dispatcherMock);
     RefReplicatedEvent expectedEvent =
         new RefReplicatedEvent(
             "someProject",
@@ -51,9 +48,6 @@
             "someHost",
             RefPushResult.SUCCEEDED,
             RemoteRefUpdate.Status.OK);
-    dispatcherMock.postEvent(RefReplicatedEventEquals.eqEvent(expectedEvent));
-    expectLastCall().once();
-    replay(dispatcherMock);
 
     gitUpdateProcessing.onRefReplicatedToOneNode(
         "someProject",
@@ -61,12 +55,11 @@
         new URIish("git://someHost/someProject.git"),
         RefPushResult.SUCCEEDED,
         RemoteRefUpdate.Status.OK);
-    verify(dispatcherMock);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
   }
 
   @Test
   public void changeRefReplicated() throws URISyntaxException, PermissionBackendException {
-    reset(dispatcherMock);
     RefReplicatedEvent expectedEvent =
         new RefReplicatedEvent(
             "someProject",
@@ -74,9 +67,6 @@
             "someHost",
             RefPushResult.FAILED,
             RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD);
-    dispatcherMock.postEvent(RefReplicatedEventEquals.eqEvent(expectedEvent));
-    expectLastCall().once();
-    replay(dispatcherMock);
 
     gitUpdateProcessing.onRefReplicatedToOneNode(
         "someProject",
@@ -84,19 +74,15 @@
         new URIish("git://someHost/someProject.git"),
         RefPushResult.FAILED,
         RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD);
-    verify(dispatcherMock);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
   }
 
   @Test
   public void onAllNodesReplicated() throws PermissionBackendException {
-    reset(dispatcherMock);
     RefReplicationDoneEvent expectedDoneEvent =
         new RefReplicationDoneEvent("someProject", "refs/heads/master", 5);
-    dispatcherMock.postEvent(RefReplicationDoneEventEquals.eqEvent(expectedDoneEvent));
-    expectLastCall().once();
-    replay(dispatcherMock);
 
     gitUpdateProcessing.onRefReplicatedToAllNodes("someProject", "refs/heads/master", 5);
-    verify(dispatcherMock);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedDoneEvent));
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index c010ddb..c09fcd1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -14,19 +14,18 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import static com.googlesource.gerrit.plugins.replication.RemoteRefUpdateCollectionMatcher.eqRemoteRef;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.getCurrentArguments;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.eclipse.jgit.lib.Ref.Storage.NEW;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.metrics.Timer1;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.PerThreadRequestScope;
 import com.google.gerrit.server.permissions.PermissionBackend;
@@ -45,8 +44,6 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import junit.framework.AssertionFailedError;
-import org.easymock.IAnswer;
 import org.eclipse.jgit.errors.NotSupportedException;
 import org.eclipse.jgit.errors.TransportException;
 import org.eclipse.jgit.lib.Config;
@@ -68,6 +65,8 @@
 import org.eclipse.jgit.util.FS;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class PushOneTest {
   private static final int TEST_PUSH_TIMEOUT_SECS = 10;
@@ -86,7 +85,7 @@
   private IdGenerator idGeneratorMock;
   private ReplicationStateListeners replicationStateListenersMock;
   private ReplicationMetrics replicationMetricsMock;
-  private Timer1.Context timerContextMock;
+  private Timer1.Context<String> timerContextMock;
   private ProjectCache projectCacheMock;
   private TransportFactory transportFactoryMock;
   private Transport transportMock;
@@ -108,7 +107,7 @@
 
   @Before
   public void setup() throws Exception {
-    projectNameKey = new Project.NameKey("fooProject");
+    projectNameKey = Project.nameKey("fooProject");
     urish = new URIish("http://foo.com/fooProject.git");
 
     newLocalRef =
@@ -126,6 +125,7 @@
     setupMocks();
   }
 
+  @SuppressWarnings("unchecked")
   private void setupMocks() throws Exception {
     FileBasedConfig config = new FileBasedConfig(new Config(), new File("/foo"), FS.DETECTED);
     config.setString("remote", "Replication", "push", "foo");
@@ -134,8 +134,8 @@
     setupRepositoryMock(config);
     setupGitRepoManagerMock();
 
-    projectStateMock = createNiceMock(ProjectState.class);
-    forProjectMock = createNiceMock(ForProject.class);
+    projectStateMock = mock(ProjectState.class);
+    forProjectMock = mock(ForProject.class);
     setupWithUserMock();
     setupPermissionBackedMock();
 
@@ -144,46 +144,22 @@
     setupRefSpecMock();
     setupRemoteConfigMock();
 
-    credentialsFactory = createNiceMock(CredentialsFactory.class);
+    credentialsFactory = mock(CredentialsFactory.class);
 
     setupFetchConnectionMock();
     setupPushConnectionMock();
     setupRequestScopeMock();
-    idGeneratorMock = createNiceMock(IdGenerator.class);
-    replicationStateListenersMock = createNiceMock(ReplicationStateListeners.class);
+    idGeneratorMock = mock(IdGenerator.class);
+    replicationStateListenersMock = mock(ReplicationStateListeners.class);
 
-    timerContextMock = createNiceMock(Timer1.Context.class);
+    timerContextMock = mock(Timer1.Context.class);
     setupReplicationMetricsMock();
 
     setupTransportMock();
 
     setupProjectCacheMock();
 
-    replicationConfigMock = createNiceMock(ReplicationConfig.class);
-
-    replay(
-        gitRepositoryManagerMock,
-        refUpdateMock,
-        repositoryMock,
-        permissionBackendMock,
-        destinationMock,
-        remoteConfigMock,
-        credentialsFactory,
-        threadRequestScoperMock,
-        idGeneratorMock,
-        replicationStateListenersMock,
-        replicationMetricsMock,
-        projectCacheMock,
-        timerContextMock,
-        transportFactoryMock,
-        projectStateMock,
-        withUserMock,
-        forProjectMock,
-        fetchConnection,
-        pushConnection,
-        refSpecMock,
-        refDatabaseMock,
-        replicationConfigMock);
+    replicationConfigMock = mock(ReplicationConfig.class);
   }
 
   @Test
@@ -212,10 +188,7 @@
 
     PushResult pushResult = new PushResult();
 
-    expect(transportMock.push(anyObject(), eqRemoteRef(expectedUpdates)))
-        .andReturn(pushResult)
-        .once();
-    replay(transportMock);
+    when(transportMock.push(any(), eq(expectedUpdates))).thenReturn(pushResult);
 
     PushOne pushOne = createPushOne(replicationPushFilter);
 
@@ -223,8 +196,6 @@
     pushOne.run();
 
     isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
-
-    verify(transportMock);
   }
 
   @Test
@@ -241,12 +212,6 @@
               }
             });
 
-    // easymock way to check if method was never called
-    expect(transportMock.push(anyObject(), anyObject()))
-        .andThrow(new AssertionFailedError())
-        .anyTimes();
-    replay(transportMock);
-
     PushOne pushOne = createPushOne(replicationPushFilter);
 
     pushOne.addRef(PushOne.ALL_REFS);
@@ -254,7 +219,7 @@
 
     isCallFinished.await(10, TimeUnit.SECONDS);
 
-    verify(transportMock);
+    verify(transportMock, never()).push(any(), any());
   }
 
   private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
@@ -281,31 +246,31 @@
   }
 
   private void setupProjectCacheMock() throws IOException {
-    projectCacheMock = createNiceMock(ProjectCache.class);
-    expect(projectCacheMock.checkedGet(projectNameKey)).andReturn(projectStateMock);
+    projectCacheMock = mock(ProjectCache.class);
+    when(projectCacheMock.checkedGet(projectNameKey)).thenReturn(projectStateMock);
   }
 
   private void setupTransportMock() throws NotSupportedException, TransportException {
-    transportMock = createNiceMock(Transport.class);
-    expect(transportMock.openFetch()).andReturn(fetchConnection);
-    transportFactoryMock = createNiceMock(TransportFactory.class);
-    expect(transportFactoryMock.open(repositoryMock, urish)).andReturn(transportMock).anyTimes();
+    transportMock = mock(Transport.class);
+    when(transportMock.openFetch()).thenReturn(fetchConnection);
+    transportFactoryMock = mock(TransportFactory.class);
+    when(transportFactoryMock.open(repositoryMock, urish)).thenReturn(transportMock);
   }
 
   private void setupReplicationMetricsMock() {
-    replicationMetricsMock = createNiceMock(ReplicationMetrics.class);
-    expect(replicationMetricsMock.start(anyObject())).andReturn(timerContextMock);
+    replicationMetricsMock = mock(ReplicationMetrics.class);
+    when(replicationMetricsMock.start(any())).thenReturn(timerContextMock);
   }
 
   private void setupRequestScopeMock() {
-    threadRequestScoperMock = createNiceMock(PerThreadRequestScope.Scoper.class);
-    expect(threadRequestScoperMock.scope(anyObject()))
-        .andAnswer(
-            new IAnswer<Callable<Object>>() {
+    threadRequestScoperMock = mock(PerThreadRequestScope.Scoper.class);
+    when(threadRequestScoperMock.scope(any()))
+        .thenAnswer(
+            new Answer<Callable<Object>>() {
               @SuppressWarnings("unchecked")
               @Override
-              public Callable<Object> answer() throws Throwable {
-                Callable<Object> originalCall = (Callable<Object>) getCurrentArguments()[0];
+              public Callable<Object> answer(InvocationOnMock invocation) throws Throwable {
+                Callable<Object> originalCall = (Callable<Object>) invocation.getArguments()[0];
                 return new Callable<Object>() {
 
                   @Override
@@ -316,66 +281,64 @@
                   }
                 };
               }
-            })
-        .anyTimes();
+            });
   }
 
   private void setupPushConnectionMock() {
-    pushConnection = createNiceMock(PushConnection.class);
-    expect(pushConnection.getRefsMap()).andReturn(remoteRefs);
+    pushConnection = mock(PushConnection.class);
+    when(pushConnection.getRefsMap()).thenReturn(remoteRefs);
   }
 
   private void setupFetchConnectionMock() {
-    fetchConnection = createNiceMock(FetchConnection.class);
-    expect(fetchConnection.getRefsMap()).andReturn(remoteRefs);
+    fetchConnection = mock(FetchConnection.class);
+    when(fetchConnection.getRefsMap()).thenReturn(remoteRefs);
   }
 
   private void setupRemoteConfigMock() {
-    remoteConfigMock = createNiceMock(RemoteConfig.class);
-    expect(remoteConfigMock.getPushRefSpecs()).andReturn(ImmutableList.of(refSpecMock));
+    remoteConfigMock = mock(RemoteConfig.class);
+    when(remoteConfigMock.getPushRefSpecs()).thenReturn(ImmutableList.of(refSpecMock));
   }
 
   private void setupRefSpecMock() {
-    refSpecMock = createNiceMock(RefSpec.class);
-    expect(refSpecMock.matchSource(anyObject(String.class))).andReturn(true);
-    expect(refSpecMock.expandFromSource(anyObject(String.class))).andReturn(refSpecMock);
-    expect(refSpecMock.getDestination()).andReturn("fooProject").anyTimes();
-    expect(refSpecMock.isForceUpdate()).andReturn(false).anyTimes();
+    refSpecMock = mock(RefSpec.class);
+    when(refSpecMock.matchSource(any(String.class))).thenReturn(true);
+    when(refSpecMock.expandFromSource(any(String.class))).thenReturn(refSpecMock);
+    when(refSpecMock.getDestination()).thenReturn("fooProject");
+    when(refSpecMock.isForceUpdate()).thenReturn(false);
   }
 
   private void setupDestinationMock() {
-    destinationMock = createNiceMock(Destination.class);
-    expect(destinationMock.requestRunway(anyObject())).andReturn(RunwayStatus.allowed());
+    destinationMock = mock(Destination.class);
+    when(destinationMock.requestRunway(any())).thenReturn(RunwayStatus.allowed());
   }
 
   private void setupPermissionBackedMock() {
-    permissionBackendMock = createNiceMock(PermissionBackend.class);
-    expect(permissionBackendMock.currentUser()).andReturn(withUserMock);
+    permissionBackendMock = mock(PermissionBackend.class);
+    when(permissionBackendMock.currentUser()).thenReturn(withUserMock);
   }
 
   private void setupWithUserMock() {
-    withUserMock = createNiceMock(WithUser.class);
-    expect(withUserMock.project(projectNameKey)).andReturn(forProjectMock);
+    withUserMock = mock(WithUser.class);
+    when(withUserMock.project(projectNameKey)).thenReturn(forProjectMock);
   }
 
   private void setupGitRepoManagerMock() throws IOException {
-    gitRepositoryManagerMock = createNiceMock(GitRepositoryManager.class);
-    expect(gitRepositoryManagerMock.openRepository(projectNameKey)).andReturn(repositoryMock);
+    gitRepositoryManagerMock = mock(GitRepositoryManager.class);
+    when(gitRepositoryManagerMock.openRepository(projectNameKey)).thenReturn(repositoryMock);
   }
 
   private void setupRepositoryMock(FileBasedConfig config) throws IOException {
-    repositoryMock = createNiceMock(Repository.class);
-    refDatabaseMock = createNiceMock(RefDatabase.class);
-    expect(repositoryMock.getConfig()).andReturn(config).anyTimes();
-    expect(repositoryMock.getRefDatabase()).andReturn(refDatabaseMock);
-    expect(refDatabaseMock.getRefs()).andReturn(localRefs);
-    expect(repositoryMock.updateRef("fooProject")).andReturn(refUpdateMock);
+    repositoryMock = mock(Repository.class);
+    refDatabaseMock = mock(RefDatabase.class);
+    when(repositoryMock.getConfig()).thenReturn(config);
+    when(repositoryMock.getRefDatabase()).thenReturn(refDatabaseMock);
+    when(refDatabaseMock.getRefs()).thenReturn(localRefs);
+    when(repositoryMock.updateRef("fooProject")).thenReturn(refUpdateMock);
   }
 
   private void setupRefUpdateMock() {
-    refUpdateMock = createNiceMock(RefUpdate.class);
-    expect(refUpdateMock.getOldObjectId())
-        .andReturn(ObjectId.fromString("0000000000000000000000000000000000000001"))
-        .anyTimes();
+    refUpdateMock = mock(RefUpdate.class);
+    when(refUpdateMock.getOldObjectId())
+        .thenReturn(ObjectId.fromString("0000000000000000000000000000000000000001"));
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java
deleted file mode 100644
index 983e97f..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java
+++ /dev/null
@@ -1,83 +0,0 @@
-// Copyright (C) 2013 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-
-public class RefReplicatedEventEquals implements IArgumentMatcher {
-
-  private RefReplicatedEvent expected;
-
-  public RefReplicatedEventEquals(RefReplicatedEvent expected) {
-    this.expected = expected;
-  }
-
-  public static final RefReplicatedEvent eqEvent(RefReplicatedEvent refReplicatedEvent) {
-    EasyMock.reportMatcher(new RefReplicatedEventEquals(refReplicatedEvent));
-    return null;
-  }
-
-  @Override
-  public boolean matches(Object actual) {
-    if (!(actual instanceof RefReplicatedEvent)) {
-      return false;
-    }
-    RefReplicatedEvent actualRefReplicatedEvent = (RefReplicatedEvent) actual;
-    if (!equals(expected.project, actualRefReplicatedEvent.project)) {
-      return false;
-    }
-    if (!equals(expected.ref, actualRefReplicatedEvent.ref)) {
-      return false;
-    }
-    if (!equals(expected.targetNode, actualRefReplicatedEvent.targetNode)) {
-      return false;
-    }
-    if (!equals(expected.status, actualRefReplicatedEvent.status)) {
-      return false;
-    }
-    if (!equals(expected.refStatus, actualRefReplicatedEvent.refStatus)) {
-      return false;
-    }
-    return true;
-  }
-
-  private static boolean equals(Object object1, Object object2) {
-    if (object1 == object2) {
-      return true;
-    }
-    if (object1 != null && !object1.equals(object2)) {
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public void appendTo(StringBuffer buffer) {
-    buffer.append("eqEvent(");
-    buffer.append(expected.getClass().getName());
-    buffer.append(" with project \"");
-    buffer.append(expected.project);
-    buffer.append("\" and ref \"");
-    buffer.append(expected.ref);
-    buffer.append("\" and targetNode \"");
-    buffer.append(expected.targetNode);
-    buffer.append("\" and status \"");
-    buffer.append(expected.status);
-    buffer.append("\" and refStatus \"");
-    buffer.append(expected.refStatus);
-    buffer.append("\")");
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java
deleted file mode 100644
index d1284e1..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright (C) 2013 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-
-public class RefReplicationDoneEventEquals implements IArgumentMatcher {
-
-  private RefReplicationDoneEvent expected;
-
-  public RefReplicationDoneEventEquals(RefReplicationDoneEvent expected) {
-    this.expected = expected;
-  }
-
-  public static final RefReplicationDoneEvent eqEvent(RefReplicationDoneEvent refReplicatedEvent) {
-    EasyMock.reportMatcher(new RefReplicationDoneEventEquals(refReplicatedEvent));
-    return null;
-  }
-
-  @Override
-  public boolean matches(Object actual) {
-    if (!(actual instanceof RefReplicationDoneEvent)) {
-      return false;
-    }
-    RefReplicationDoneEvent actualRefReplicatedDoneEvent = (RefReplicationDoneEvent) actual;
-    if (!equals(expected.project, actualRefReplicatedDoneEvent.project)) {
-      return false;
-    }
-    if (!equals(expected.ref, actualRefReplicatedDoneEvent.ref)) {
-      return false;
-    }
-    if (expected.nodesCount != actualRefReplicatedDoneEvent.nodesCount) {
-      return false;
-    }
-    return true;
-  }
-
-  private static boolean equals(Object object1, Object object2) {
-    if (object1 == object2) {
-      return true;
-    }
-    if (object1 != null && !object1.equals(object2)) {
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public void appendTo(StringBuffer buffer) {
-    buffer.append("eqEvent(");
-    buffer.append(expected.getClass().getName());
-    buffer.append(" with project \"");
-    buffer.append(expected.project);
-    buffer.append("\" and ref \"");
-    buffer.append(expected.ref);
-    buffer.append("\" and nodesCount \"");
-    buffer.append(expected.nodesCount);
-    buffer.append("\")");
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
deleted file mode 100644
index 111a792..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import java.util.Collection;
-import java.util.Objects;
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-import org.eclipse.jgit.transport.RemoteRefUpdate;
-
-public class RemoteRefUpdateCollectionMatcher implements IArgumentMatcher {
-  Collection<RemoteRefUpdate> expectedRemoteRefs;
-
-  public static Collection<RemoteRefUpdate> eqRemoteRef(
-      Collection<RemoteRefUpdate> expectedRemoteRefs) {
-    EasyMock.reportMatcher(new RemoteRefUpdateCollectionMatcher(expectedRemoteRefs));
-    return null;
-  }
-
-  public RemoteRefUpdateCollectionMatcher(Collection<RemoteRefUpdate> expectedRemoteRefs) {
-    this.expectedRemoteRefs = expectedRemoteRefs;
-  }
-
-  @Override
-  public boolean matches(Object argument) {
-    if (!(argument instanceof Collection)) return false;
-
-    @SuppressWarnings("unchecked")
-    Collection<RemoteRefUpdate> refs = (Collection<RemoteRefUpdate>) argument;
-
-    if (expectedRemoteRefs.size() != refs.size()) return false;
-    return refs.stream()
-        .allMatch(
-            ref -> expectedRemoteRefs.stream().anyMatch(expectedRef -> compare(ref, expectedRef)));
-  }
-
-  @Override
-  public void appendTo(StringBuffer buffer) {
-    buffer.append("expected:" + expectedRemoteRefs.toString());
-  }
-
-  private boolean compare(RemoteRefUpdate ref, RemoteRefUpdate expectedRef) {
-    return Objects.equals(ref.getRemoteName(), expectedRef.getRemoteName())
-        && Objects.equals(ref.getStatus(), expectedRef.getStatus())
-        && Objects.equals(ref.getExpectedOldObjectId(), expectedRef.getExpectedOldObjectId())
-        && Objects.equals(ref.getNewObjectId(), expectedRef.getNewObjectId())
-        && Objects.equals(ref.isFastForward(), expectedRef.isFastForward())
-        && Objects.equals(ref.getSrcRef(), expectedRef.getSrcRef())
-        && Objects.equals(ref.isForceUpdate(), expectedRef.isForceUpdate())
-        && Objects.equals(ref.getMessage(), expectedRef.getMessage());
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
index 0a89106..f393036 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -21,7 +21,7 @@
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import java.io.IOException;
@@ -126,6 +126,7 @@
     config.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY_MINUTES);
     config.setBoolean("remote", remoteName, "mirror", mirror);
     project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+    config.setBoolean("gerrit", null, "autoReload", true);
     config.save();
     return config;
   }
@@ -155,6 +156,14 @@
   }
 
   protected void reloadConfig() {
-    plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).forceReload();
+    getAutoReloadConfigDecoratorInstance().reload();
+  }
+
+  protected AutoReloadConfigDecorator getAutoReloadConfigDecoratorInstance() {
+    return getInstance(AutoReloadConfigDecorator.class);
+  }
+
+  protected <T> T getInstance(Class<T> classObj) {
+    return plugin.getSysInjector().getInstance(classObj);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
new file mode 100644
index 0000000..c32a55d
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -0,0 +1,271 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.common.io.MoreFiles;
+import com.google.common.io.RecursiveDeleteOption;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.google.inject.Key;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.junit.After;
+import org.junit.Test;
+
+@UseLocalDisk
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationFanoutIT extends LightweightPluginDaemonTest {
+  private static final Optional<String> ALL_PROJECTS = Optional.empty();
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int TEST_REPLICATION_DELAY = 1;
+  private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2);
+
+  @Inject private SitePaths sitePaths;
+  @Inject private ProjectOperations projectOperations;
+  private Path pluginDataDir;
+  private Path gitPath;
+  private Path storagePath;
+  private FileBasedConfig config;
+  private ReplicationTasksStorage tasksStorage;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    gitPath = sitePaths.site_path.resolve("git");
+
+    config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    setAutoReload();
+    config.save();
+
+    setReplicationDestination("remote1", "suffix1", Optional.of("not-used-project"));
+
+    super.setUpTestPlugin();
+
+    pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
+    storagePath = pluginDataDir.resolve("ref-updates");
+    tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+    cleanupReplicationTasks();
+  }
+
+  @After
+  public void cleanUp() throws IOException {
+    if (Files.exists(sitePaths.etc_dir.resolve("replication"))) {
+      MoreFiles.deleteRecursively(
+          sitePaths.etc_dir.resolve("replication"), RecursiveDeleteOption.ALLOW_INSECURE);
+    }
+  }
+
+  @Test
+  public void shouldReplicateNewBranch() throws Exception {
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Project.NameKey targetProject = createTestProject(project + "replica");
+    String newBranch = "refs/heads/mybranch";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(project.get()).branch(newBranch).create(input);
+
+    assertThat(listIncompleteTasks("refs/heads/(mybranch|master)")).hasSize(2);
+
+    try (Repository repo = repoManager.openRepository(targetProject);
+        Repository sourceRepo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, newBranch) != null);
+
+      Ref masterRef = getRef(sourceRepo, master);
+      Ref targetBranchRef = getRef(repo, newBranch);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(masterRef.getObjectId());
+    }
+  }
+
+  @Test
+  public void shouldReplicateNewBranchToTwoRemotes() throws Exception {
+    Project.NameKey targetProject1 = createTestProject(project + "replica1");
+    Project.NameKey targetProject2 = createTestProject(project + "replica2");
+
+    setReplicationDestination("foo1", "replica1", ALL_PROJECTS);
+    setReplicationDestination("foo2", "replica2", ALL_PROJECTS);
+    reloadConfig();
+
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().refName();
+
+    assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
+
+    try (Repository repo1 = repoManager.openRepository(targetProject1);
+        Repository repo2 = repoManager.openRepository(targetProject2)) {
+      waitUntil(
+          () ->
+              (checkedGetRef(repo1, sourceRef) != null && checkedGetRef(repo2, sourceRef) != null));
+
+      Ref targetBranchRef1 = getRef(repo1, sourceRef);
+      assertThat(targetBranchRef1).isNotNull();
+      assertThat(targetBranchRef1.getObjectId()).isEqualTo(sourceCommit.getId());
+
+      Ref targetBranchRef2 = getRef(repo2, sourceRef);
+      assertThat(targetBranchRef2).isNotNull();
+      assertThat(targetBranchRef2.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  @Test
+  public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
+    List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+
+    FileBasedConfig dest1 = setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+    FileBasedConfig dest2 = setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+    dest1.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    dest2.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    dest1.save();
+    dest2.save();
+    reloadConfig();
+
+    createChange();
+
+    assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+
+    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+  }
+
+  private Ref getRef(Repository repo, String branchName) throws IOException {
+    return repo.getRefDatabase().exactRef(branchName);
+  }
+
+  private Ref checkedGetRef(Repository repo, String branchName) {
+    try {
+      return repo.getRefDatabase().exactRef(branchName);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
+      return null;
+    }
+  }
+
+  private void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
+    setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project);
+  }
+
+  private FileBasedConfig setReplicationDestination(
+      String remoteName, List<String> replicaSuffixes, Optional<String> allProjects)
+      throws IOException {
+    FileBasedConfig remoteConfig =
+        new FileBasedConfig(
+            sitePaths.etc_dir.resolve("replication/" + remoteName + ".config").toFile(),
+            FS.DETECTED);
+
+    setReplicationDestination(remoteConfig, replicaSuffixes, allProjects);
+    return remoteConfig;
+  }
+
+  private void setAutoReload() throws IOException {
+    config.setBoolean("gerrit", null, "autoReload", true);
+    config.save();
+  }
+
+  private void setReplicationDestination(
+      FileBasedConfig config, List<String> replicaSuffixes, Optional<String> project)
+      throws IOException {
+
+    List<String> replicaUrls =
+        replicaSuffixes.stream()
+            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+            .collect(toList());
+    config.setStringList("remote", null, "url", replicaUrls);
+    config.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY);
+    project.ifPresent(prj -> config.setString("remote", null, "projects", prj));
+
+    config.save();
+  }
+
+  private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
+    WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
+  }
+
+  private void reloadConfig() {
+    getAutoReloadConfigDecoratorInstance().reload();
+  }
+
+  private AutoReloadConfigDecorator getAutoReloadConfigDecoratorInstance() {
+    return getInstance(AutoReloadConfigDecorator.class);
+  }
+
+  private <T> T getInstance(Class<T> classObj) {
+    return plugin.getSysInjector().getInstance(classObj);
+  }
+
+  private Project.NameKey createTestProject(String name) throws Exception {
+    return projectOperations.newProject().name(name).create();
+  }
+
+  @SuppressWarnings(
+      "SynchronizeOnNonFinalField") // tasksStorage is non-final but only set in setUpTestPlugin()
+  private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) {
+    Pattern refmaskPattern = Pattern.compile(refRegex);
+    synchronized (tasksStorage) {
+      return Stream.concat(tasksStorage.listWaiting().stream(), tasksStorage.listRunning().stream())
+          .filter(task -> refmaskPattern.matcher(task.ref).matches())
+          .collect(toList());
+    }
+  }
+
+  public void cleanupReplicationTasks() throws IOException {
+    cleanupReplicationTasks(storagePath);
+  }
+
+  private void cleanupReplicationTasks(Path basePath) throws IOException {
+    try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) {
+      for (Path path : files) {
+        if (Files.isDirectory(path)) {
+          cleanupReplicationTasks(path);
+        } else {
+          path.toFile().delete();
+        }
+      }
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
index 36cc209..efacae7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
@@ -19,7 +19,6 @@
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import java.io.IOException;
 import java.util.List;
-import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.junit.Test;
 
@@ -37,8 +36,10 @@
     config.setString("remote", remoteName, "url", remoteUrl);
     config.save();
 
-    ReplicationFileBasedConfig replicationConfig = newReplicationFileBasedConfig();
-    List<Destination> destinations = replicationConfig.getDestinations(FilterType.ALL);
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(newReplicationFileBasedConfig());
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(1);
 
     assertThatIsDestination(destinations.get(0), remoteName, remoteUrl);
@@ -55,18 +56,13 @@
     config.setString("remote", remoteName2, "url", remoteUrl2);
     config.save();
 
-    ReplicationFileBasedConfig replicationConfig = newReplicationFileBasedConfig();
-    List<Destination> destinations = replicationConfig.getDestinations(FilterType.ALL);
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(newReplicationFileBasedConfig());
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(2);
 
-    assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
-    assertThatIsDestination(destinations.get(1), remoteName2, remoteUrl2);
-  }
-
-  private ReplicationFileBasedConfig newReplicationFileBasedConfig()
-      throws ConfigInvalidException, IOException {
-    ReplicationFileBasedConfig replicationConfig =
-        new ReplicationFileBasedConfig(sitePaths, destinationFactoryMock, pluginDataPath);
-    return replicationConfig;
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index a18e8f1..391c62d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -21,14 +21,19 @@
 import com.google.gerrit.acceptance.PushOneCommit.Result;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.extensions.api.changes.NotifyHandling;
 import com.google.gerrit.extensions.api.projects.BranchInput;
 import com.google.gerrit.extensions.common.ProjectInfo;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.inject.Inject;
+import com.google.inject.Key;
 import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.time.Duration;
 import java.util.function.Supplier;
 import org.eclipse.jgit.lib.Constants;
@@ -56,6 +61,19 @@
               + TEST_PROJECT_CREATION_SECONDS);
 
   @Inject private DynamicSet<ProjectDeletedListener> deletedListeners;
+  private Path pluginDataDir;
+  private Path storagePath;
+  private ReplicationTasksStorage tasksStorage;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    super.setUpTestPlugin();
+
+    pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
+    storagePath = pluginDataDir.resolve("ref-updates");
+    tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+    cleanupReplicationTasks();
+  }
 
   @Test
   public void shouldReplicateNewProject() throws Exception {
@@ -65,7 +83,7 @@
     Project.NameKey sourceProject = createTestProject("foo");
 
     WaitUtil.waitUntil(
-        () -> nonEmptyProjectExists(new Project.NameKey(sourceProject + "replica.git")),
+        () -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")),
         TEST_NEW_PROJECT_TIMEOUT);
 
     ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get();
@@ -109,7 +127,7 @@
 
     Result pushResult = createChange();
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -154,7 +172,7 @@
 
     Result pushResult = createChange();
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     try (Repository repo1 = repoManager.openRepository(targetProject1);
         Repository repo2 = repoManager.openRepository(targetProject2)) {
@@ -295,10 +313,10 @@
     setReplicationDestination(remoteName, "replica", ALL_PROJECTS);
 
     Result pushResult = createChange();
-    shutdownConfig();
+    shutdownDestinations();
 
     pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     assertThrows(
         InterruptedException.class,
@@ -321,10 +339,10 @@
     reloadConfig();
 
     Result pushResult = createChange();
-    shutdownConfig();
+    shutdownDestinations();
 
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -346,7 +364,7 @@
     replicationQueueStart();
 
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -356,6 +374,19 @@
     }
   }
 
+  @Test
+  public void shouldCleanupTasksAfterNewProjectReplication() throws Exception {
+    setReplicationDestination("task_cleanup_project", "replica", ALL_PROJECTS);
+    config.setInt("remote", "task_cleanup_project", "replicationRetry", 0);
+    config.save();
+    reloadConfig();
+    assertThat(tasksStorage.listRunning()).hasSize(0);
+    Project.NameKey sourceProject = createTestProject("task_cleanup_project");
+
+    waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")));
+    waitUntil(() -> tasksStorage.listRunning().size() == 0);
+  }
+
   private Ref getRef(Repository repo, String branchName) throws IOException {
     return repo.getRefDatabase().exactRef(branchName);
   }
@@ -370,8 +401,8 @@
     WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
   }
 
-  private void shutdownConfig() {
-    getAutoReloadConfigDecoratorInstance().shutdown();
+  private void shutdownDestinations() {
+    getInstance(DestinationsCollection.class).shutdown();
   }
 
   private void replicationQueueStart() {
@@ -382,16 +413,24 @@
     getReplicationQueueInstance().stop();
   }
 
-  private AutoReloadConfigDecorator getAutoReloadConfigDecoratorInstance() {
-    return getInstance(AutoReloadConfigDecorator.class);
-  }
-
   private ReplicationQueue getReplicationQueueInstance() {
     return getInstance(ReplicationQueue.class);
   }
 
-  private <T> T getInstance(Class<T> classObj) {
-    return plugin.getSysInjector().getInstance(classObj);
+  public void cleanupReplicationTasks() throws IOException {
+    cleanupReplicationTasks(storagePath);
+  }
+
+  private void cleanupReplicationTasks(Path basePath) throws IOException {
+    try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) {
+      for (Path path : files) {
+        if (Files.isDirectory(path)) {
+          cleanupReplicationTasks(path);
+        } else {
+          path.toFile().delete();
+        }
+      }
+    }
   }
 
   private boolean nonEmptyProjectExists(Project.NameKey name) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
index 193af1e..e0de577 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
@@ -15,11 +15,9 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import static com.google.common.truth.Truth.assertThat;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.resetToDefault;
-import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
 import java.net.URISyntaxException;
@@ -35,8 +33,7 @@
 
   @Before
   public void setUp() throws Exception {
-    pushResultProcessingMock = createNiceMock(PushResultProcessing.class);
-    replay(pushResultProcessingMock);
+    pushResultProcessingMock = mock(PushResultProcessing.class);
     replicationState = new ReplicationState(pushResultProcessingMock);
   }
 
@@ -53,52 +50,36 @@
 
   @Test
   public void shouldFireOneReplicationEventWhenNothingToReplicate() {
-    resetToDefault(pushResultProcessingMock);
-
-    // expected event
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(0);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.markAllPushTasksScheduled();
-    verify(pushResultProcessingMock);
+
+    // expected event
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(0);
   }
 
   @Test
   public void shouldFireEventsForReplicationOfOneRefToOneNode() throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri = new URIish("git://someHost/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "someRef", 1);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(1);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "someRef");
     replicationState.markAllPushTasksScheduled();
     replicationState.notifyRefReplicated(
         "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "someRef", 1);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(1);
   }
 
   @Test
   public void shouldFireEventsForReplicationOfOneRefToMultipleNodes() throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri1 = new URIish("git://someHost1/someRepo.git");
     URIish uri2 = new URIish("git://someHost2/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "someRef", uri2, RefPushResult.FAILED, RemoteRefUpdate.Status.NON_EXISTING);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "someRef", 2);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "someRef");
     replicationState.increasePushTaskCount("someProject", "someRef");
@@ -107,33 +88,29 @@
         "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.notifyRefReplicated(
         "someProject", "someRef", uri2, RefPushResult.FAILED, RemoteRefUpdate.Status.NON_EXISTING);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject",
+            "someRef",
+            uri2,
+            RefPushResult.FAILED,
+            RemoteRefUpdate.Status.NON_EXISTING);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "someRef", 2);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2);
   }
 
   @Test
   public void shouldFireEventsForReplicationOfMultipleRefsToMultipleNodes()
       throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri1 = new URIish("git://host1/someRepo.git");
     URIish uri2 = new URIish("git://host2/someRepo.git");
     URIish uri3 = new URIish("git://host3/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri3, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref1", 3);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref2", 2);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(5);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "ref1");
     replicationState.increasePushTaskCount("someProject", "ref1");
@@ -151,24 +128,32 @@
         "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.notifyRefReplicated(
         "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri3, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref1", 3);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref2", 2);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(5);
   }
 
   @Test
   public void shouldFireEventsForReplicationSameRefDifferentProjects() throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri = new URIish("git://host1/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("project1", "ref1", 1);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("project2", "ref2", 1);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("project1", "ref1");
     replicationState.increasePushTaskCount("project2", "ref2");
@@ -177,25 +162,24 @@
         "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.notifyRefReplicated(
         "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("project1", "ref1", 1);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("project2", "ref2", 1);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2);
   }
 
   @Test
   public void shouldFireEventsWhenSomeReplicationCompleteBeforeAllTasksAreScheduled()
       throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri1 = new URIish("git://host1/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref1", 1);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref2", 1);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "ref1");
     replicationState.increasePushTaskCount("someProject", "ref2");
@@ -204,7 +188,17 @@
     replicationState.notifyRefReplicated(
         "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.markAllPushTasksScheduled();
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref1", 1);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref2", 1);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
index 3b7bb3e..c0ae479 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -20,9 +20,10 @@
 
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.api.projects.BranchInput;
-import com.google.gerrit.reviewdb.client.Project;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.regex.Pattern;
@@ -60,7 +61,7 @@
 
     createChange();
 
-    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+    assertThat(listWaitingReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
   }
 
   @Test
@@ -75,7 +76,7 @@
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, null, new ReplicationState(NO_OP), false);
 
-    assertThat(listReplicationTasks(".*all.*")).hasSize(1);
+    assertThat(listWaitingReplicationTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
   }
 
   @Test
@@ -88,12 +89,21 @@
     setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
     reloadConfig();
 
-    String changeRef = createChange().getPatchSet().getRefName();
-    changeReplicationTasksForRemote(changeRef, remote1).forEach(tasksStorage::delete);
+    String changeRef = createChange().getPatchSet().refName();
+    changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
+        .forEach(
+            (update) -> {
+              try {
+                UriUpdates uriUpdates = TestUriUpdates.create(update);
+                tasksStorage.start(uriUpdates);
+                tasksStorage.finish(uriUpdates);
+              } catch (URISyntaxException e) {
+              }
+            });
     reloadConfig();
 
-    assertThat(changeReplicationTasksForRemote(changeRef, remote2).count()).isEqualTo(1);
-    assertThat(changeReplicationTasksForRemote(changeRef, remote1).count()).isEqualTo(0);
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef, remote2).count()).isEqualTo(1);
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef, remote1).count()).isEqualTo(0);
   }
 
   @Test
@@ -108,8 +118,17 @@
     setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
     reloadConfig();
 
-    String changeRef = createChange().getPatchSet().getRefName();
-    changeReplicationTasksForRemote(changeRef, remote1).forEach(tasksStorage::delete);
+    String changeRef = createChange().getPatchSet().refName();
+    changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
+        .forEach(
+            (update) -> {
+              try {
+                UriUpdates uriUpdates = TestUriUpdates.create(update);
+                tasksStorage.start(uriUpdates);
+                tasksStorage.finish(uriUpdates);
+              } catch (URISyntaxException e) {
+              }
+            });
 
     setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
     setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
@@ -129,14 +148,14 @@
     setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
     reloadConfig();
 
-    String changeRef1 = createChange().getPatchSet().getRefName();
-    String changeRef2 = createChange().getPatchSet().getRefName();
+    String changeRef1 = createChange().getPatchSet().refName();
+    String changeRef2 = createChange().getPatchSet().refName();
     reloadConfig();
 
-    assertThat(changeReplicationTasksForRemote(changeRef1, remote1).count()).isEqualTo(1);
-    assertThat(changeReplicationTasksForRemote(changeRef1, remote2).count()).isEqualTo(1);
-    assertThat(changeReplicationTasksForRemote(changeRef2, remote1).count()).isEqualTo(1);
-    assertThat(changeReplicationTasksForRemote(changeRef2, remote2).count()).isEqualTo(1);
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef1, remote1).count()).isEqualTo(1);
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef1, remote2).count()).isEqualTo(1);
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef2, remote1).count()).isEqualTo(1);
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef2, remote2).count()).isEqualTo(1);
   }
 
   @Test
@@ -151,8 +170,8 @@
     setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
     reloadConfig();
 
-    String changeRef1 = createChange().getPatchSet().getRefName();
-    String changeRef2 = createChange().getPatchSet().getRefName();
+    String changeRef1 = createChange().getPatchSet().refName();
+    String changeRef2 = createChange().getPatchSet().refName();
 
     setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
     setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
@@ -179,8 +198,8 @@
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false);
 
-    assertThat(tasksStorage.list()).hasSize(1);
-    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.list()) {
+    assertThat(tasksStorage.listWaiting()).hasSize(1);
+    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.listWaiting()) {
       assertThat(task.uri).isEqualTo(expectedURI);
       assertThat(task.ref).isEqualTo(PushOne.ALL_REFS);
     }
@@ -201,8 +220,8 @@
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false);
 
-    assertThat(tasksStorage.list()).hasSize(1);
-    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.list()) {
+    assertThat(tasksStorage.listWaiting()).hasSize(1);
+    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.listWaiting()) {
       assertThat(task.uri).isEqualTo(expectedURI);
       assertThat(task.ref).isEqualTo(PushOne.ALL_REFS);
     }
@@ -235,19 +254,26 @@
 
     gApi.projects().name(project.get()).branch(branchToDelete).delete();
 
-    assertThat(listReplicationTasks(branchToDelete)).hasSize(1);
+    assertThat(listWaitingReplicationTasks(branchToDelete)).hasSize(1);
   }
 
-  private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
+  private Stream<ReplicateRefUpdate> waitingChangeReplicationTasksForRemote(
       String changeRef, String remote) {
-    return tasksStorage.list().stream()
+    return tasksStorage.listWaiting().stream()
         .filter(task -> changeRef.equals(task.ref))
         .filter(task -> remote.equals(task.remote));
   }
 
-  private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
+  private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
+      Stream<ReplicateRefUpdate> updates, String changeRef, String remote) {
+    return updates
+        .filter(task -> changeRef.equals(task.ref))
+        .filter(task -> remote.equals(task.remote));
+  }
+
+  private List<ReplicateRefUpdate> listWaitingReplicationTasks(String refRegex) {
     Pattern refmaskPattern = Pattern.compile(refRegex);
-    return tasksStorage.list().stream()
+    return tasksStorage.listWaiting().stream()
         .filter(task -> refmaskPattern.matcher(task.ref).matches())
         .collect(toList());
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
new file mode 100644
index 0000000..ca69644
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
@@ -0,0 +1,380 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.jimfs.Configuration;
+import com.google.common.jimfs.Jimfs;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.Task;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystem;
+import java.nio.file.Files;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicationTasksStorageTaskTest {
+  protected static final String PROJECT = "myProject";
+  protected static final String REF = "myRef";
+  protected static final String REMOTE = "myDest";
+  protected static final URIish URISH = getUrish("http://example.com/" + PROJECT + ".git");
+  protected static final ReplicateRefUpdate REF_UPDATE =
+      new ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE);
+
+  protected ReplicationTasksStorage tasksStorage;
+  protected FileSystem fileSystem;
+
+  @Before
+  public void setUp() throws Exception {
+    fileSystem = Jimfs.newFileSystem(Configuration.unix());
+    tasksStorage = new ReplicationTasksStorage(fileSystem.getPath("replication_site"));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fileSystem.close();
+  }
+
+  @Test
+  public void createdTaskIsWaiting() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+    assertNotWaiting(original);
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    assertNotWaiting(persistedView);
+
+    original.create();
+    assertIsWaiting(original);
+    assertIsWaiting(persistedView);
+  }
+
+  @Test
+  public void startedTaskIsRunning() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+
+    original.create();
+    original.start();
+    assertNotWaiting(original);
+    assertIsRunning(original);
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    assertNotWaiting(persistedView);
+    assertIsRunning(persistedView);
+  }
+
+  @Test
+  public void resetTaskIsWaiting() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+
+    original.create();
+    original.start();
+    original.reset();
+    assertIsWaiting(original);
+    assertNotRunning(original);
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    assertIsWaiting(persistedView);
+    assertNotRunning(persistedView);
+  }
+
+  @Test
+  public void finishedTaskIsRemoved() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+
+    original.create();
+    original.start();
+    original.finish();
+    assertNotWaiting(original);
+    assertNotRunning(original);
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    assertNotWaiting(persistedView);
+    assertNotRunning(persistedView);
+  }
+
+  @Test
+  public void createWaitingTaskIsDeduped() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+    String key = original.create();
+
+    Task secondUpdate = tasksStorage.new Task(REF_UPDATE);
+    assertEquals(secondUpdate.create(), key);
+    assertIsWaiting(secondUpdate);
+    assertIsWaiting(original);
+  }
+
+  @Test
+  public void createWaitingTaskWhileTaskIsRunning() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+    original.create();
+    original.start();
+    assertIsRunning(original);
+
+    Task secondUpdate = tasksStorage.new Task(REF_UPDATE);
+    secondUpdate.create();
+
+    assertIsWaiting(secondUpdate);
+    assertIsRunning(original);
+  }
+
+  @Test
+  public void canCompleteTwoTasks() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+    original.create();
+    original.start();
+    original.finish();
+
+    Task secondUpdate = tasksStorage.new Task(REF_UPDATE);
+    secondUpdate.create();
+    assertIsWaiting(secondUpdate);
+    secondUpdate.start();
+    assertIsRunning(secondUpdate);
+    secondUpdate.finish();
+    assertNotWaiting(secondUpdate);
+    assertNotRunning(secondUpdate);
+  }
+
+  @Test
+  public void canStartResetTask() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+
+    original.create();
+    original.start();
+    original.reset();
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    persistedView.start();
+    assertIsRunning(persistedView);
+    assertIsRunning(original);
+  }
+
+  @Test
+  public void canResetPreviouslyResetAndStartedTask() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+
+    original.create();
+    original.start();
+    original.reset();
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    persistedView.start();
+    persistedView.reset();
+    assertIsWaiting(persistedView);
+    assertIsWaiting(original);
+  }
+
+  @Test
+  public void multipleActorsCanUpdateSameTask() throws Exception {
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+
+    Task fromEvent = tasksStorage.new Task(REF_UPDATE);
+    fromEvent.create();
+    assertIsWaiting(persistedView);
+
+    Task fromPusherStart = tasksStorage.new Task(REF_UPDATE);
+    fromPusherStart.start();
+    assertIsRunning(persistedView);
+
+    Task fromPusherEnd = tasksStorage.new Task(REF_UPDATE);
+    fromPusherEnd.finish();
+    assertNotWaiting(persistedView);
+    assertNotRunning(persistedView);
+  }
+
+  @Test
+  public void canHaveTwoWaitingTasksForDifferentRefs() throws Exception {
+    Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE));
+    Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE));
+    updateA.create();
+    updateB.create();
+    assertIsWaiting(updateA);
+    assertIsWaiting(updateB);
+  }
+
+  @Test
+  public void canHaveTwoRunningTasksForDifferentRefs() throws Exception {
+    Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE));
+    Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE));
+    updateA.create();
+    updateB.create();
+    updateA.start();
+    updateB.start();
+    assertIsRunning(updateA);
+    assertIsRunning(updateB);
+  }
+
+  @Test
+  public void canHaveTwoWaitingTasksForDifferentProjects() throws Exception {
+    Task updateA =
+        tasksStorage
+        .new Task(
+            new ReplicateRefUpdate(
+                "projectA", REF, getUrish("http://example.com/projectA.git"), REMOTE));
+    Task updateB =
+        tasksStorage
+        .new Task(
+            new ReplicateRefUpdate(
+                "projectB", REF, getUrish("http://example.com/projectB.git"), REMOTE));
+    updateA.create();
+    updateB.create();
+    assertIsWaiting(updateA);
+    assertIsWaiting(updateB);
+  }
+
+  @Test
+  public void canHaveTwoRunningTasksForDifferentProjects() throws Exception {
+    Task updateA =
+        tasksStorage
+        .new Task(
+            new ReplicateRefUpdate(
+                "projectA", REF, getUrish("http://example.com/projectA.git"), REMOTE));
+    Task updateB =
+        tasksStorage
+        .new Task(
+            new ReplicateRefUpdate(
+                "projectB", REF, getUrish("http://example.com/projectB.git"), REMOTE));
+    updateA.create();
+    updateB.create();
+    updateA.start();
+    updateB.start();
+    assertIsRunning(updateA);
+    assertIsRunning(updateB);
+  }
+
+  @Test
+  public void canHaveTwoWaitingTasksForDifferentRemotes() throws Exception {
+    Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteA"));
+    Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteB"));
+    updateA.create();
+    updateB.create();
+    assertIsWaiting(updateA);
+    assertIsWaiting(updateB);
+  }
+
+  @Test
+  public void canHaveTwoRunningTasksForDifferentRemotes() throws Exception {
+    Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteA"));
+    Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteB"));
+    updateA.create();
+    updateB.create();
+    updateA.start();
+    updateB.start();
+    assertIsRunning(updateA);
+    assertIsRunning(updateB);
+  }
+
+  @Test
+  public void illegalFinishNonRunningTaskIsGraceful() throws Exception {
+    Task task = tasksStorage.new Task(REF_UPDATE);
+    task.finish();
+    assertNotWaiting(task);
+    assertNotRunning(task);
+  }
+
+  @Test
+  public void illegalResetNonRunningTaskIsGraceful() throws Exception {
+    Task task = tasksStorage.new Task(REF_UPDATE);
+    task.reset();
+    assertNotWaiting(task);
+    assertNotRunning(task);
+  }
+
+  @Test
+  public void illegalResetFinishedTaskIsGraceful() throws Exception {
+    Task task = tasksStorage.new Task(REF_UPDATE);
+
+    task.create();
+    task.start();
+    task.finish();
+    task.reset();
+    assertNotWaiting(task);
+    assertNotRunning(task);
+  }
+
+  @Test
+  public void illegalFinishFinishedTaskIsGraceful() throws Exception {
+    Task task = tasksStorage.new Task(REF_UPDATE);
+
+    task.create();
+    task.start();
+    task.finish();
+    task.finish();
+    assertNotWaiting(task);
+    assertNotRunning(task);
+  }
+
+  @Test
+  public void illegalFinishResetTaskIsGraceful() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+
+    original.create();
+    original.start();
+    original.reset();
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    persistedView.finish();
+    assertIsWaiting(persistedView);
+  }
+
+  @Test
+  public void illegalResetResetTaskIsGraceful() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+
+    original.create();
+    original.start();
+    original.reset();
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    persistedView.reset();
+    assertIsWaiting(persistedView);
+  }
+
+  private void assertIsWaiting(Task task) {
+    assertTrue(whiteBoxIsWaiting(task));
+  }
+
+  private void assertNotWaiting(Task task) {
+    assertFalse(whiteBoxIsWaiting(task));
+  }
+
+  private void assertIsRunning(Task task) {
+    assertTrue(whiteBoxIsRunning(task));
+  }
+
+  private void assertNotRunning(Task task) {
+    assertFalse(whiteBoxIsRunning(task));
+  }
+
+  private boolean whiteBoxIsRunning(Task task) {
+    return Files.exists(task.running);
+  }
+
+  private boolean whiteBoxIsWaiting(Task task) {
+    return Files.exists(task.waiting);
+  }
+
+  public static URIish getUrish(String uri) {
+    try {
+      return new URIish(uri);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("Cannot instantiate URIish object", e);
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
index 3ee85d7..0a92c76 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -25,6 +25,7 @@
 import java.net.URISyntaxException;
 import java.nio.file.FileSystem;
 import java.nio.file.Path;
+import java.util.List;
 import java.util.Objects;
 import org.eclipse.jgit.transport.URIish;
 import org.junit.After;
@@ -42,12 +43,14 @@
   protected ReplicationTasksStorage storage;
   protected FileSystem fileSystem;
   protected Path storageSite;
+  protected UriUpdates uriUpdates;
 
   @Before
   public void setUp() throws Exception {
     fileSystem = Jimfs.newFileSystem(Configuration.unix());
     storageSite = fileSystem.getPath("replication_site");
     storage = new ReplicationTasksStorage(storageSite);
+    uriUpdates = TestUriUpdates.create(REF_UPDATE);
   }
 
   @After
@@ -57,48 +60,64 @@
 
   @Test
   public void canListEmptyStorage() throws Exception {
-    assertThat(storage.list()).isEmpty();
+    assertThat(storage.listWaiting()).isEmpty();
+    assertThat(storage.listRunning()).isEmpty();
   }
 
   @Test
-  public void canListPersistedUpdate() throws Exception {
-    storage.persist(REF_UPDATE);
-    assertContainsExactly(storage, REF_UPDATE);
+  public void canListWaitingUpdate() throws Exception {
+    storage.create(REF_UPDATE);
+    assertContainsExactly(storage.listWaiting(), REF_UPDATE);
   }
 
   @Test
-  public void canDeletePersistedUpdate() throws Exception {
-    storage.persist(REF_UPDATE);
-    storage.delete(REF_UPDATE);
-    assertThat(storage.list()).isEmpty();
+  public void canStartWaitingUpdate() throws Exception {
+    storage.create(REF_UPDATE);
+    storage.start(uriUpdates);
+    assertThat(storage.listWaiting()).isEmpty();
+    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+  }
+
+  @Test
+  public void canFinishRunningUpdate() throws Exception {
+    storage.create(REF_UPDATE);
+    storage.start(uriUpdates);
+    storage.finish(uriUpdates);
+    assertNoIncompleteTasks(storage);
   }
 
   @Test
   public void instancesOfTheSameStorageHaveTheSameElements() throws Exception {
     ReplicationTasksStorage persistedView = new ReplicationTasksStorage(storageSite);
 
-    assertThat(storage.list()).isEmpty();
-    assertThat(persistedView.list()).isEmpty();
+    assertThat(storage.listWaiting()).isEmpty();
+    assertThat(persistedView.listWaiting()).isEmpty();
 
-    storage.persist(REF_UPDATE);
-    assertContainsExactly(storage, REF_UPDATE);
-    assertContainsExactly(persistedView, REF_UPDATE);
+    storage.create(REF_UPDATE);
+    assertContainsExactly(storage.listWaiting(), REF_UPDATE);
+    assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
 
-    storage.delete(REF_UPDATE);
-    assertThat(storage.list()).isEmpty();
-    assertThat(persistedView.list()).isEmpty();
+    storage.start(uriUpdates);
+    assertThat(storage.listWaiting()).isEmpty();
+    assertThat(persistedView.listWaiting()).isEmpty();
+    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+
+    storage.finish(uriUpdates);
+    assertThat(storage.listRunning()).isEmpty();
+    assertThat(persistedView.listRunning()).isEmpty();
   }
 
   @Test
-  public void sameRefUpdatePersistedTwiceIsStoredOnce() throws Exception {
-    String key = storage.persist(REF_UPDATE);
-    String secondKey = storage.persist(REF_UPDATE);
+  public void sameRefUpdateCreatedTwiceIsStoredOnce() throws Exception {
+    String key = storage.create(REF_UPDATE);
+    String secondKey = storage.create(REF_UPDATE);
     assertEquals(key, secondKey);
-    assertContainsExactly(storage, REF_UPDATE);
+    assertContainsExactly(storage.listWaiting(), REF_UPDATE);
   }
 
   @Test
-  public void canPersistDifferentUris() throws Exception {
+  public void canCreateDifferentUris() throws Exception {
     ReplicateRefUpdate updateB =
         new ReplicateRefUpdate(
             PROJECT,
@@ -106,32 +125,56 @@
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
             REMOTE);
 
-    String keyA = storage.persist(REF_UPDATE);
-    String keyB = storage.persist(updateB);
-    assertThat(storage.list()).hasSize(2);
+    String keyA = storage.create(REF_UPDATE);
+    String keyB = storage.create(updateB);
+    assertThat(storage.listWaiting()).hasSize(2);
     assertNotEquals(keyA, keyB);
   }
 
   @Test
-  public void canDeleteDifferentUris() throws Exception {
+  public void canStartDifferentUris() throws Exception {
     ReplicateRefUpdate updateB =
         new ReplicateRefUpdate(
             PROJECT,
             REF,
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
             REMOTE);
-    storage.persist(REF_UPDATE);
-    storage.persist(updateB);
+    UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+    storage.create(REF_UPDATE);
+    storage.create(updateB);
 
-    storage.delete(REF_UPDATE);
-    assertContainsExactly(storage, updateB);
+    storage.start(uriUpdates);
+    assertContainsExactly(storage.listWaiting(), updateB);
+    assertContainsExactly(storage.listRunning(), REF_UPDATE);
 
-    storage.delete(updateB);
-    assertThat(storage.list()).isEmpty();
+    storage.start(uriUpdatesB);
+    assertThat(storage.listWaiting()).isEmpty();
+    assertContainsExactly(storage.listRunning(), REF_UPDATE, updateB);
   }
 
   @Test
-  public void differentUrisPersistedTwiceIsStoredOnce() throws Exception {
+  public void canFinishDifferentUris() throws Exception {
+    ReplicateRefUpdate updateB =
+        new ReplicateRefUpdate(
+            PROJECT,
+            REF,
+            getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
+            REMOTE);
+    UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+    storage.create(REF_UPDATE);
+    storage.create(updateB);
+    storage.start(uriUpdates);
+    storage.start(uriUpdatesB);
+
+    storage.finish(uriUpdates);
+    assertContainsExactly(storage.listRunning(), updateB);
+
+    storage.finish(uriUpdatesB);
+    assertThat(storage.listRunning()).isEmpty();
+  }
+
+  @Test
+  public void differentUrisCreatedTwiceIsStoredOnce() throws Exception {
     ReplicateRefUpdate updateB =
         new ReplicateRefUpdate(
             PROJECT,
@@ -139,72 +182,188 @@
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
             REMOTE);
 
-    storage.persist(REF_UPDATE);
-    storage.persist(updateB);
-    storage.persist(REF_UPDATE);
-    storage.persist(updateB);
-    assertThat(storage.list()).hasSize(2);
+    storage.create(REF_UPDATE);
+    storage.create(updateB);
+    storage.create(REF_UPDATE);
+    storage.create(updateB);
+    assertThat(storage.listWaiting()).hasSize(2);
   }
 
   @Test
-  public void canPersistMulipleRefsForSameUri() throws Exception {
+  public void canCreateMulipleRefsForSameUri() throws Exception {
     ReplicateRefUpdate refA = new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE);
     ReplicateRefUpdate refB = new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE);
 
-    String keyA = storage.persist(refA);
-    String keyB = storage.persist(refB);
-    assertThat(storage.list()).hasSize(2);
+    String keyA = storage.create(refA);
+    String keyB = storage.create(refB);
+    assertThat(storage.listWaiting()).hasSize(2);
     assertNotEquals(keyA, keyB);
   }
 
   @Test
-  public void canDeleteMulipleRefsForSameUri() throws Exception {
-    ReplicateRefUpdate refA = new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE);
-    ReplicateRefUpdate refB = new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE);
-    storage.persist(refA);
-    storage.persist(refB);
+  public void canFinishMulipleRefsForSameUri() throws Exception {
+    ReplicateRefUpdate refUpdateA = new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE);
+    ReplicateRefUpdate refUpdateB = new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE);
+    UriUpdates uriUpdatesA = TestUriUpdates.create(refUpdateA);
+    UriUpdates uriUpdatesB = TestUriUpdates.create(refUpdateB);
+    storage.create(refUpdateA);
+    storage.create(refUpdateB);
+    storage.start(uriUpdatesA);
+    storage.start(uriUpdatesB);
 
-    storage.delete(refA);
-    assertContainsExactly(storage, refB);
+    storage.finish(uriUpdatesA);
+    assertContainsExactly(storage.listRunning(), refUpdateB);
 
-    storage.delete(refB);
-    assertThat(storage.list()).isEmpty();
+    storage.finish(uriUpdatesB);
+    assertThat(storage.listRunning()).isEmpty();
   }
 
-  @Test(expected = Test.None.class /* no exception expected */)
-  public void illegalDeleteNonPersistedIsGraceful() throws Exception {
-    storage.delete(REF_UPDATE);
+  @Test
+  public void canResetUpdate() throws Exception {
+    storage.create(REF_UPDATE);
+    storage.start(uriUpdates);
+
+    storage.reset(uriUpdates);
+    assertContainsExactly(storage.listWaiting(), REF_UPDATE);
+    assertThat(storage.listRunning()).isEmpty();
   }
 
-  @Test(expected = Test.None.class /* no exception expected */)
-  public void illegalDoubleDeleteIsGraceful() throws Exception {
-    storage.persist(REF_UPDATE);
-    storage.delete(REF_UPDATE);
+  @Test
+  public void canCompleteResetUpdate() throws Exception {
+    storage.create(REF_UPDATE);
+    storage.start(uriUpdates);
+    storage.reset(uriUpdates);
 
-    storage.delete(REF_UPDATE);
+    storage.start(uriUpdates);
+    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+    assertThat(storage.listWaiting()).isEmpty();
+
+    storage.finish(uriUpdates);
+    assertNoIncompleteTasks(storage);
   }
 
-  @Test(expected = Test.None.class /* no exception expected */)
-  public void illegalDoubleDeleteDifferentUriIsGraceful() throws Exception {
+  @Test
+  public void canResetAllEmpty() throws Exception {
+    storage.resetAll();
+    assertNoIncompleteTasks(storage);
+  }
+
+  @Test
+  public void canResetAllUpdate() throws Exception {
+    storage.create(REF_UPDATE);
+    storage.start(uriUpdates);
+
+    storage.resetAll();
+    assertContainsExactly(storage.listWaiting(), REF_UPDATE);
+    assertThat(storage.listRunning()).isEmpty();
+  }
+
+  @Test
+  public void canCompleteResetAllUpdate() throws Exception {
+    storage.create(REF_UPDATE);
+    storage.start(uriUpdates);
+    storage.resetAll();
+
+    storage.start(uriUpdates);
+    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+    assertThat(storage.listWaiting()).isEmpty();
+
+    storage.finish(uriUpdates);
+    assertNoIncompleteTasks(storage);
+  }
+
+  @Test
+  public void canResetAllMultipleUpdates() throws Exception {
     ReplicateRefUpdate updateB =
         new ReplicateRefUpdate(
             PROJECT,
             REF,
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
             REMOTE);
-    storage.persist(REF_UPDATE);
-    storage.persist(updateB);
-    storage.delete(REF_UPDATE);
-    storage.delete(updateB);
+    UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+    storage.create(REF_UPDATE);
+    storage.create(updateB);
+    storage.start(uriUpdates);
+    storage.start(uriUpdatesB);
 
-    storage.delete(REF_UPDATE);
-    storage.delete(updateB);
-    assertThat(storage.list()).isEmpty();
+    storage.resetAll();
+    assertContainsExactly(storage.listWaiting(), REF_UPDATE, updateB);
+  }
+
+  @Test
+  public void canCompleteMultipleResetAllUpdates() throws Exception {
+    ReplicateRefUpdate updateB =
+        new ReplicateRefUpdate(
+            PROJECT,
+            REF,
+            getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
+            REMOTE);
+    UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+    storage.create(REF_UPDATE);
+    storage.create(updateB);
+    storage.start(uriUpdates);
+    storage.start(uriUpdatesB);
+    storage.resetAll();
+
+    storage.start(uriUpdates);
+    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+    assertContainsExactly(storage.listWaiting(), updateB);
+
+    storage.start(uriUpdatesB);
+    assertContainsExactly(storage.listRunning(), REF_UPDATE, updateB);
+    assertThat(storage.listWaiting()).isEmpty();
+
+    storage.finish(uriUpdates);
+    storage.finish(uriUpdatesB);
+    assertNoIncompleteTasks(storage);
+  }
+
+  @Test(expected = Test.None.class /* no exception expected */)
+  public void illegalFinishUncreatedIsGraceful() throws Exception {
+    storage.finish(uriUpdates);
+  }
+
+  @Test(expected = Test.None.class /* no exception expected */)
+  public void illegalDoubleFinishIsGraceful() throws Exception {
+    storage.create(REF_UPDATE);
+    storage.start(uriUpdates);
+    storage.finish(uriUpdates);
+
+    storage.finish(uriUpdates);
+  }
+
+  @Test(expected = Test.None.class /* no exception expected */)
+  public void illegalDoubleFinishDifferentUriIsGraceful() throws Exception {
+    ReplicateRefUpdate updateB =
+        new ReplicateRefUpdate(
+            PROJECT,
+            REF,
+            getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
+            REMOTE);
+    UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+    storage.create(REF_UPDATE);
+    storage.create(updateB);
+    storage.start(uriUpdates);
+    storage.start(uriUpdatesB);
+    storage.finish(uriUpdates);
+    storage.finish(uriUpdatesB);
+
+    storage.finish(uriUpdates);
+    storage.finish(uriUpdatesB);
+    assertThat(storage.listRunning()).isEmpty();
+  }
+
+  private void assertNoIncompleteTasks(ReplicationTasksStorage storage) {
+    assertThat(storage.listWaiting()).isEmpty();
+    assertThat(storage.listRunning()).isEmpty();
   }
 
   private void assertContainsExactly(
-      ReplicationTasksStorage tasksStorage, ReplicateRefUpdate update) {
-    assertTrue(equals(tasksStorage.list().get(0), update));
+      List<ReplicateRefUpdate> all, ReplicateRefUpdate... refUpdates) {
+    assertThat(all).hasSize(refUpdates.length);
+    for (int i = 0; i < refUpdates.length; i++) {
+      assertTrue(equals(all.get(i), refUpdates[i]));
+    }
   }
 
   private boolean equals(ReplicateRefUpdate one, ReplicateRefUpdate two) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
new file mode 100644
index 0000000..2fd3ee3
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
@@ -0,0 +1,51 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.auto.value.AutoValue;
+import com.google.gerrit.entities.Project;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Set;
+import org.eclipse.jgit.transport.URIish;
+
+@AutoValue
+public abstract class TestUriUpdates implements UriUpdates {
+  public static TestUriUpdates create(ReplicateRefUpdate update) throws URISyntaxException {
+    return create(
+        Project.nameKey(update.project),
+        new URIish(update.uri),
+        update.remote,
+        Collections.singleton(update.ref));
+  }
+
+  public static TestUriUpdates create(
+      Project.NameKey project, URIish uri, String remote, Set<String> refs) {
+    return new AutoValue_TestUriUpdates(project, uri, remote, refs);
+  }
+
+  @Override
+  public abstract Project.NameKey getProjectNameKey();
+
+  @Override
+  public abstract URIish getURI();
+
+  @Override
+  public abstract String getRemoteName();
+
+  @Override
+  public abstract Set<String> getRefs();
+}