Merge branch 'stable-3.0' into stable-3.1

* stable-3.0:
  Fix naming for delay for draining the replication event queue

Change-Id: If0462b358be65730f4d6531b1260f2ad3d9d972e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
index acbf763..f3d2103 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 
 public interface AdminApi {
   public boolean createProject(Project.NameKey project, String head);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index a43d7d9..fe5dbad 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -14,133 +14,46 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Multimap;
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.common.FileUtil;
-import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
 import com.google.gerrit.extensions.annotations.PluginName;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
-import com.google.inject.Provider;
 import com.google.inject.Singleton;
-import java.io.IOException;
 import java.nio.file.Path;
-import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import org.eclipse.jgit.errors.ConfigInvalidException;
-import org.eclipse.jgit.transport.URIish;
+import org.eclipse.jgit.lib.Config;
 
 @Singleton
-public class AutoReloadConfigDecorator implements ReplicationConfig {
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+public class AutoReloadConfigDecorator implements ReplicationConfig, LifecycleListener {
   private static final long RELOAD_DELAY = 120;
   private static final long RELOAD_INTERVAL = 60;
 
-  private volatile ReplicationFileBasedConfig currentConfig;
-  private long currentConfigTs;
-  private long lastFailedConfigTs;
+  private volatile ReplicationConfig currentConfig;
 
-  private final SitePaths site;
-  private final Destination.Factory destinationFactory;
-  private final Path pluginDataDir;
-  // Use Provider<> instead of injecting the ReplicationQueue because of circular dependency with
-  // ReplicationConfig
-  private final Provider<ReplicationQueue> replicationQueue;
   private final ScheduledExecutorService autoReloadExecutor;
   private ScheduledFuture<?> autoReloadRunnable;
-
-  private volatile boolean shuttingDown;
+  private final AutoReloadRunnable reloadRunner;
 
   @Inject
   public AutoReloadConfigDecorator(
-      SitePaths site,
-      Destination.Factory destinationFactory,
-      Provider<ReplicationQueue> replicationQueue,
-      @PluginData Path pluginDataDir,
       @PluginName String pluginName,
-      WorkQueue workQueue)
-      throws ConfigInvalidException, IOException {
-    this.site = site;
-    this.destinationFactory = destinationFactory;
-    this.pluginDataDir = pluginDataDir;
-    this.currentConfig = loadConfig();
-    this.currentConfigTs = getLastModified(currentConfig);
-    this.replicationQueue = replicationQueue;
+      WorkQueue workQueue,
+      @MainReplicationConfig ReplicationConfig replicationConfig,
+      AutoReloadRunnable reloadRunner,
+      EventBus eventBus) {
+    this.currentConfig = replicationConfig;
     this.autoReloadExecutor = workQueue.createQueue(1, pluginName + "_auto-reload-config");
-  }
-
-  private static long getLastModified(ReplicationFileBasedConfig cfg) {
-    return FileUtil.lastModified(cfg.getCfgPath());
-  }
-
-  private ReplicationFileBasedConfig loadConfig() throws ConfigInvalidException, IOException {
-    return new ReplicationFileBasedConfig(site, destinationFactory, pluginDataDir);
-  }
-
-  private synchronized boolean isAutoReload() {
-    return currentConfig.getConfig().getBoolean("gerrit", "autoReload", false);
-  }
-
-  @Override
-  public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
-    reloadIfNeeded();
-    return currentConfig.getDestinations(uri, project, ref);
-  }
-
-  @Override
-  public synchronized List<Destination> getDestinations(FilterType filterType) {
-    return currentConfig.getDestinations(filterType);
-  }
-
-  @Override
-  public synchronized Multimap<Destination, URIish> getURIs(
-      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
-    return currentConfig.getURIs(remoteName, projectName, filterType);
-  }
-
-  private synchronized void reloadIfNeeded() {
-    reload(false);
+    this.reloadRunner = reloadRunner;
+    eventBus.register(this);
   }
 
   @VisibleForTesting
-  public void forceReload() {
-    reload(true);
-  }
-
-  private void reload(boolean force) {
-    if (force || isAutoReload()) {
-      ReplicationQueue queue = replicationQueue.get();
-
-      long lastModified = getLastModified(currentConfig);
-      try {
-        if (force
-            || (!shuttingDown
-                && lastModified > currentConfigTs
-                && lastModified > lastFailedConfigTs
-                && queue.isRunning()
-                && !queue.isReplaying())) {
-          queue.stop();
-          currentConfig = loadConfig();
-          currentConfigTs = lastModified;
-          lastFailedConfigTs = 0;
-          logger.atInfo().log(
-              "Configuration reloaded: %d destinations",
-              currentConfig.getDestinations(FilterType.ALL).size());
-        }
-      } catch (Exception e) {
-        logger.atSevere().withCause(e).log(
-            "Cannot reload replication configuration: keeping existing settings");
-        lastFailedConfigTs = lastModified;
-        return;
-      } finally {
-        queue.start();
-      }
-    }
+  public void reload() {
+    reloadRunner.reload();
   }
 
   @Override
@@ -159,45 +72,36 @@
   }
 
   @Override
-  public synchronized boolean isEmpty() {
-    return currentConfig.isEmpty();
-  }
-
-  @Override
   public Path getEventsDirectory() {
     return currentConfig.getEventsDirectory();
   }
 
-  /* shutdown() cannot be set as a synchronized method because
-   * it may need to wait for pending events to complete;
-   * e.g. when enabling the drain of replication events before
-   * shutdown.
-   *
-   * As a rule of thumb for synchronized methods, because they
-   * implicitly define a critical section and associated lock,
-   * they should never hold waiting for another resource, otherwise
-   * the risk of deadlock is very high.
-   *
-   * See more background about deadlocks, what they are and how to
-   * prevent them at: https://en.wikipedia.org/wiki/Deadlock
-   */
   @Override
-  public int shutdown() {
-    this.shuttingDown = true;
-    if (autoReloadRunnable != null) {
-      autoReloadRunnable.cancel(false);
-      autoReloadRunnable = null;
-    }
-    return currentConfig.shutdown();
+  public synchronized void start() {
+    autoReloadRunnable =
+        autoReloadExecutor.scheduleAtFixedRate(
+            reloadRunner, RELOAD_DELAY, RELOAD_INTERVAL, TimeUnit.SECONDS);
   }
 
   @Override
-  public synchronized void startup(WorkQueue workQueue) {
-    shuttingDown = false;
-    currentConfig.startup(workQueue);
-    autoReloadRunnable =
-        autoReloadExecutor.scheduleAtFixedRate(
-            this::reloadIfNeeded, RELOAD_DELAY, RELOAD_INTERVAL, TimeUnit.SECONDS);
+  public synchronized void stop() {
+    if (autoReloadRunnable != null) {
+      if (!autoReloadRunnable.cancel(true)) {
+        throw new IllegalStateException(
+            "Unable to cancel replication reload task: cannot guarantee orderly shutdown");
+      }
+      autoReloadRunnable = null;
+    }
+  }
+
+  @Override
+  public String getVersion() {
+    return currentConfig.getVersion();
+  }
+
+  @Subscribe
+  public void onReload(ReplicationConfig newConfig) {
+    currentConfig = newConfig;
   }
 
   @Override
@@ -209,4 +113,9 @@
   public synchronized int getSshCommandTimeout() {
     return currentConfig.getSshCommandTimeout();
   }
+
+  @Override
+  public Config getConfig() {
+    return currentConfig.getConfig();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java
new file mode 100644
index 0000000..71f7c67
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java
@@ -0,0 +1,79 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import java.util.List;
+
+public class AutoReloadRunnable implements Runnable {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final EventBus eventBus;
+  private final Provider<ObservableQueue> queueObserverProvider;
+  private final ConfigParser configParser;
+  private ReplicationConfig loadedConfig;
+  private Provider<ReplicationConfig> replicationConfigProvider;
+  private String loadedConfigVersion;
+  private String lastFailedConfigVersion;
+
+  @Inject
+  public AutoReloadRunnable(
+      ConfigParser configParser,
+      @MainReplicationConfig Provider<ReplicationConfig> replicationConfigProvider,
+      EventBus eventBus,
+      Provider<ObservableQueue> queueObserverProvider) {
+    this.replicationConfigProvider = replicationConfigProvider;
+    this.loadedConfig = replicationConfigProvider.get();
+    this.loadedConfigVersion = loadedConfig.getVersion();
+    this.lastFailedConfigVersion = "";
+    this.eventBus = eventBus;
+    this.queueObserverProvider = queueObserverProvider;
+    this.configParser = configParser;
+  }
+
+  @Override
+  public synchronized void run() {
+    String pendingConfigVersion = loadedConfig.getVersion();
+    ObservableQueue queue = queueObserverProvider.get();
+    if (pendingConfigVersion.equals(loadedConfigVersion)
+        || pendingConfigVersion.equals(lastFailedConfigVersion)
+        || !queue.isRunning()
+        || queue.isReplaying()) {
+      return;
+    }
+
+    reload();
+  }
+
+  synchronized void reload() {
+    String pendingConfigVersion = loadedConfig.getVersion();
+    try {
+      ReplicationConfig newConfig = replicationConfigProvider.get();
+      final List<RemoteConfiguration> newValidDestinations =
+          configParser.parseRemotes(newConfig.getConfig());
+      loadedConfig = newConfig;
+      loadedConfigVersion = newConfig.getVersion();
+      lastFailedConfigVersion = "";
+      eventBus.post(newValidDestinations);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log(
+          "Cannot reload replication configuration: keeping existing settings");
+      lastFailedConfigVersion = pendingConfigVersion;
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
index 98f364d..5bae0af 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
@@ -31,11 +31,10 @@
   private final AtomicReference<SecureCredentialsFactory> secureCredentialsFactory;
   private volatile long secureCredentialsFactoryLoadTs;
   private final SitePaths site;
-  private ReplicationFileBasedConfig config;
+  private ReplicationConfig config;
 
   @Inject
-  public AutoReloadSecureCredentialsFactoryDecorator(
-      SitePaths site, ReplicationFileBasedConfig config)
+  public AutoReloadSecureCredentialsFactoryDecorator(SitePaths site, ReplicationConfig config)
       throws ConfigInvalidException, IOException {
     this.site = site;
     this.config = config;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java
new file mode 100644
index 0000000..29ea706
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.util.List;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
+
+/** Parser for parsing {@link Config} to a collection of {@link RemoteConfiguration} objects */
+public interface ConfigParser {
+
+  /**
+   * parse the new replication config
+   *
+   * @param config new configuration to parse
+   * @return List of parsed {@link RemoteConfiguration}
+   * @throws ConfigInvalidException if the new configuration is not valid.
+   */
+  List<RemoteConfiguration> parseRemotes(Config config) throws ConfigInvalidException;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
index a8dede3..fa26e82 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
@@ -16,8 +16,8 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
@@ -31,7 +31,7 @@
   }
 
   private final RemoteConfig config;
-  private final ReplicationConfig replicationConfig;
+  private final DestinationsCollection destinations;
   private final DynamicItem<AdminApiFactory> adminApiFactory;
   private final Project.NameKey project;
   private final String head;
@@ -39,21 +39,20 @@
   @Inject
   CreateProjectTask(
       RemoteConfig config,
-      ReplicationConfig replicationConfig,
+      DestinationsCollection destinations,
       DynamicItem<AdminApiFactory> adminApiFactory,
       @Assisted Project.NameKey project,
       @Assisted String head) {
     this.config = config;
-    this.replicationConfig = replicationConfig;
+    this.destinations = destinations;
     this.adminApiFactory = adminApiFactory;
     this.project = project;
     this.head = head;
   }
 
   public boolean create() {
-    return replicationConfig
-        .getURIs(Optional.of(config.getName()), project, FilterType.PROJECT_CREATION).values()
-        .stream()
+    return destinations.getURIs(Optional.of(config.getName()), project, FilterType.PROJECT_CREATION)
+        .values().stream()
         .map(u -> createProject(u, project, head))
         .reduce(true, (a, b) -> a && b);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
index f9b2ad7..4617672 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
@@ -16,8 +16,8 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.ioutil.HexFormat;
 import com.google.gerrit.server.util.IdGenerator;
 import com.google.inject.Inject;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 3b8208b..35470eb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -26,13 +26,13 @@
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Lists;
 import com.google.gerrit.common.data.GroupReference;
+import com.google.gerrit.entities.AccountGroup;
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.extensions.config.FactoryModule;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
-import com.google.gerrit.reviewdb.client.AccountGroup;
-import com.google.gerrit.reviewdb.client.Branch;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.reviewdb.client.RefNames;
 import com.google.gerrit.server.CurrentUser;
 import com.google.gerrit.server.PluginUser;
 import com.google.gerrit.server.account.GroupBackend;
@@ -60,7 +60,6 @@
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.servlet.RequestScoped;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
-import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
@@ -554,6 +553,7 @@
             postReplicationFailedEvent(pushOp, status);
             if (pushOp.setToRetry()) {
               postReplicationScheduledEvent(pushOp);
+              replicationTasksStorage.get().reset(pushOp);
               @SuppressWarnings("unused")
               ScheduledFuture<?> ignored2 =
                   pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES);
@@ -580,36 +580,21 @@
       if (inFlightOp != null) {
         return RunwayStatus.denied(inFlightOp.getId());
       }
+      replicationTasksStorage.get().start(op);
       inFlight.put(op.getURI(), op);
     }
     return RunwayStatus.allowed();
   }
 
-  void notifyFinished(PushOne task) {
+  void notifyFinished(PushOne op) {
     synchronized (stateLock) {
-      inFlight.remove(task.getURI());
-      if (!task.wasCanceled()) {
-        for (String ref : task.getRefs()) {
-          if (!refHasPendingPush(task.getURI(), ref)) {
-            replicationTasksStorage
-                .get()
-                .delete(
-                    new ReplicateRefUpdate(
-                        task.getProjectNameKey().get(), ref, task.getURI(), getRemoteConfigName()));
-          }
-        }
+      if (!op.isRetrying()) {
+        replicationTasksStorage.get().finish(op);
       }
+      inFlight.remove(op.getURI());
     }
   }
 
-  private boolean refHasPendingPush(URIish opUri, String ref) {
-    return pushContainsRef(pending.get(opUri), ref) || pushContainsRef(inFlight.get(opUri), ref);
-  }
-
-  private boolean pushContainsRef(PushOne op, String ref) {
-    return op != null && op.getRefs().contains(ref);
-  }
-
   boolean wouldPush(URIish uri, Project.NameKey project, String ref) {
     return matches(uri, project) && wouldPushProject(project) && wouldPushRef(ref);
   }
@@ -634,21 +619,7 @@
   }
 
   boolean isSingleProjectMatch() {
-    List<String> projects = config.getProjects();
-    boolean ret = (projects.size() == 1);
-    if (ret) {
-      String projectMatch = projects.get(0);
-      if (ReplicationFilter.getPatternType(projectMatch)
-          != ReplicationFilter.PatternType.EXACT_MATCH) {
-        // projectMatch is either regular expression, or wild-card.
-        //
-        // Even though they might refer to a single project now, they need not
-        // after new projects have been created. Hence, we do not treat them as
-        // matching a single project.
-        ret = false;
-      }
-    }
-    return ret;
+    return config.isSingleProjectMatch();
   }
 
   boolean wouldPushRef(String ref) {
@@ -775,6 +746,10 @@
     return config.getDelay() * 1000L;
   }
 
+  int getSlowLatencyThreshold() {
+    return config.getSlowLatencyThreshold();
+  }
+
   private static boolean matches(URIish uri, String urlMatch) {
     if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
       return true;
@@ -794,7 +769,7 @@
       ReplicationScheduledEvent event =
           new ReplicationScheduledEvent(project.get(), ref, targetNode);
       try {
-        eventDispatcher.get().postEvent(new Branch.NameKey(project, ref), event);
+        eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
       } catch (PermissionBackendException e) {
         repLog.error("error posting event", e);
       }
@@ -808,7 +783,7 @@
       RefReplicatedEvent event =
           new RefReplicatedEvent(project.get(), ref, targetNode, RefPushResult.FAILED, status);
       try {
-        eventDispatcher.get().postEvent(new Branch.NameKey(project, ref), event);
+        eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
       } catch (PermissionBackendException e) {
         repLog.error("error posting event", e);
       }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java
new file mode 100644
index 0000000..4050c9c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java
@@ -0,0 +1,103 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.flogger.FluentLogger;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.URIish;
+
+/**
+ * Implementation of {@link ConfigParser} for parsing {@link Config} to a collection of {@link
+ * DestinationConfiguration} objects
+ */
+public class DestinationConfigParser implements ConfigParser {
+
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.replication.ConfigParser#parseRemotes(org.eclipse.jgit.lib.Config)
+   */
+  @Override
+  public List<RemoteConfiguration> parseRemotes(Config config) throws ConfigInvalidException {
+
+    if (config.getSections().isEmpty()) {
+      logger.atWarning().log("Replication config does not exist or it's empty; not replicating");
+      return Collections.emptyList();
+    }
+
+    boolean defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
+
+    ImmutableList.Builder<RemoteConfiguration> confs = ImmutableList.builder();
+    for (RemoteConfig c : allRemotes(config)) {
+      if (c.getURIs().isEmpty()) {
+        continue;
+      }
+
+      // If destination for push is not set assume equal to source.
+      for (RefSpec ref : c.getPushRefSpecs()) {
+        if (ref.getDestination() == null) {
+          ref.setDestination(ref.getSource());
+        }
+      }
+
+      if (c.getPushRefSpecs().isEmpty()) {
+        c.addPushRefSpec(
+            new RefSpec()
+                .setSourceDestination("refs/*", "refs/*")
+                .setForceUpdate(defaultForceUpdate));
+      }
+
+      DestinationConfiguration destinationConfiguration = new DestinationConfiguration(c, config);
+
+      if (!destinationConfiguration.isSingleProjectMatch()) {
+        for (URIish u : c.getURIs()) {
+          if (u.getPath() == null || !u.getPath().contains("${name}")) {
+            throw new ConfigInvalidException(
+                String.format(
+                    "remote.%s.url \"%s\" lacks ${name} placeholder in %s",
+                    c.getName(), u, config));
+          }
+        }
+      }
+
+      confs.add(destinationConfiguration);
+    }
+
+    return confs.build();
+  }
+
+  private static List<RemoteConfig> allRemotes(Config cfg) throws ConfigInvalidException {
+    Set<String> names = cfg.getSubsections("remote");
+    List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size());
+    for (String name : names) {
+      try {
+        result.add(new RemoteConfig(cfg, name));
+      } catch (URISyntaxException e) {
+        throw new ConfigInvalidException(
+            String.format("remote %s has invalid URL in %s", name, cfg), e);
+      }
+    }
+    return result;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index f688cfc..4b757ea 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -16,13 +16,16 @@
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.config.ConfigUtil;
+import java.util.concurrent.TimeUnit;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.transport.RemoteConfig;
 
-public class DestinationConfiguration {
+public class DestinationConfiguration implements RemoteConfiguration {
   static final int DEFAULT_REPLICATION_DELAY = 15;
   static final int DEFAULT_RESCHEDULE_DELAY = 3;
   static final int DEFAULT_DRAIN_QUEUE_ATTEMPTS = 0;
+  private static final int DEFAULT_SLOW_LATENCY_THRESHOLD_SECS = 900;
 
   private final int delay;
   private final int rescheduleDelay;
@@ -41,6 +44,7 @@
   private final ImmutableList<String> authGroupNames;
   private final RemoteConfig remoteConfig;
   private final int maxRetries;
+  private final int slowLatencyThreshold;
 
   protected DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) {
     this.remoteConfig = remoteConfig;
@@ -67,16 +71,29 @@
     maxRetries =
         getInt(
             remoteConfig, cfg, "replicationMaxRetries", cfg.getInt("replication", "maxRetries", 0));
+
+    slowLatencyThreshold =
+        (int)
+            ConfigUtil.getTimeUnit(
+                cfg,
+                "remote",
+                remoteConfig.getName(),
+                "slowLatencyThreshold",
+                DEFAULT_SLOW_LATENCY_THRESHOLD_SECS,
+                TimeUnit.SECONDS);
   }
 
+  @Override
   public int getDelay() {
     return delay;
   }
 
+  @Override
   public int getRescheduleDelay() {
     return rescheduleDelay;
   }
 
+  @Override
   public int getRetryDelay() {
     return retryDelay;
   }
@@ -93,26 +110,32 @@
     return lockErrorMaxRetries;
   }
 
+  @Override
   public ImmutableList<String> getUrls() {
     return urls;
   }
 
+  @Override
   public ImmutableList<String> getAdminUrls() {
     return adminUrls;
   }
 
+  @Override
   public ImmutableList<String> getProjects() {
     return projects;
   }
 
+  @Override
   public ImmutableList<String> getAuthGroupNames() {
     return authGroupNames;
   }
 
+  @Override
   public String getRemoteNameStyle() {
     return remoteNameStyle;
   }
 
+  @Override
   public boolean replicatePermissions() {
     return replicatePermissions;
   }
@@ -129,10 +152,12 @@
     return replicateHiddenProjects;
   }
 
+  @Override
   public RemoteConfig getRemoteConfig() {
     return remoteConfig;
   }
 
+  @Override
   public int getMaxRetries() {
     return maxRetries;
   }
@@ -140,4 +165,9 @@
   private static int getInt(RemoteConfig rc, Config cfg, String name, int defValue) {
     return cfg.getInt("remote", rc.getName(), name, defValue);
   }
+
+  @Override
+  public int getSlowLatencyThreshold() {
+    return slowLatencyThreshold;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
new file mode 100644
index 0000000..eaf5b27
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -0,0 +1,269 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp;
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
+import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.Destination.Factory;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Predicate;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.transport.URIish;
+
+@Singleton
+public class DestinationsCollection implements ReplicationDestinations {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final Factory destinationFactory;
+  private final Provider<ReplicationQueue> replicationQueue;
+  private volatile List<Destination> destinations;
+  private boolean shuttingDown;
+
+  public static class EventQueueNotEmptyException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public EventQueueNotEmptyException(String errorMessage) {
+      super(errorMessage);
+    }
+  }
+
+  @Inject
+  public DestinationsCollection(
+      Destination.Factory destinationFactory,
+      Provider<ReplicationQueue> replicationQueue,
+      ReplicationConfig replicationConfig,
+      ConfigParser configParser,
+      EventBus eventBus)
+      throws ConfigInvalidException {
+    this.destinationFactory = destinationFactory;
+    this.replicationQueue = replicationQueue;
+    this.destinations =
+        allDestinations(
+            destinationFactory, configParser.parseRemotes(replicationConfig.getConfig()));
+    eventBus.register(this);
+  }
+
+  @Override
+  public Multimap<Destination, URIish> getURIs(
+      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
+    if (getAll(filterType).isEmpty()) {
+      return ImmutableMultimap.of();
+    }
+
+    SetMultimap<Destination, URIish> uris = HashMultimap.create();
+    for (Destination config : getAll(filterType)) {
+      if (filterType != FilterType.PROJECT_DELETION && !config.wouldPushProject(projectName)) {
+        continue;
+      }
+
+      if (remoteName.isPresent() && !config.getRemoteConfigName().equals(remoteName.get())) {
+        continue;
+      }
+
+      boolean adminURLUsed = false;
+
+      for (String url : config.getAdminUrls()) {
+        if (Strings.isNullOrEmpty(url)) {
+          continue;
+        }
+
+        URIish uri;
+        try {
+          uri = new URIish(url);
+        } catch (URISyntaxException e) {
+          repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage());
+          continue;
+        }
+
+        if (!isGerrit(uri) && !isGerritHttp(uri)) {
+          String path =
+              replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
+          if (path == null) {
+            repLog.warn("adminURL {} does not contain ${name}", uri);
+            continue;
+          }
+
+          uri = uri.setPath(path);
+          if (!isSSH(uri)) {
+            repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri);
+            continue;
+          }
+        }
+        uris.put(config, uri);
+        adminURLUsed = true;
+      }
+
+      if (!adminURLUsed) {
+        for (URIish uri : config.getURIs(projectName, "*")) {
+          uris.put(config, uri);
+        }
+      }
+    }
+    return uris;
+  }
+
+  @Override
+  public List<Destination> getAll(FilterType filterType) {
+    Predicate<? super Destination> filter;
+    switch (filterType) {
+      case PROJECT_CREATION:
+        filter = dest -> dest.isCreateMissingRepos();
+        break;
+      case PROJECT_DELETION:
+        filter = dest -> dest.isReplicateProjectDeletions();
+        break;
+      case ALL:
+      default:
+        filter = dest -> true;
+        break;
+    }
+    return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList());
+  }
+
+  @Override
+  public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
+    List<Destination> dests = new ArrayList<>();
+    for (Destination dest : getAll(FilterType.ALL)) {
+      if (dest.wouldPush(uri, project, ref)) {
+        dests.add(dest);
+      }
+    }
+    return dests;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return destinations.isEmpty();
+  }
+
+  @Override
+  public synchronized void startup(WorkQueue workQueue) {
+    shuttingDown = false;
+    for (Destination cfg : destinations) {
+      cfg.start(workQueue);
+    }
+  }
+
+  /* shutdown() cannot be set as a synchronized method because
+   * it may need to wait for pending events to complete;
+   * e.g. when enabling the drain of replication events before
+   * shutdown.
+   *
+   * As a rule of thumb for synchronized methods, because they
+   * implicitly define a critical section and associated lock,
+   * they should never hold waiting for another resource, otherwise
+   * the risk of deadlock is very high.
+   *
+   * See more background about deadlocks, what they are and how to
+   * prevent them at: https://en.wikipedia.org/wiki/Deadlock
+   */
+  @Override
+  public int shutdown() {
+    synchronized (this) {
+      shuttingDown = true;
+    }
+
+    int discarded = 0;
+    for (Destination cfg : destinations) {
+      try {
+        drainReplicationEvents(cfg);
+      } catch (EventQueueNotEmptyException e) {
+        logger.atWarning().log("Event queue not empty: %s", e.getMessage());
+      } finally {
+        discarded += cfg.shutdown();
+      }
+    }
+    return discarded;
+  }
+
+  void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
+    int drainQueueAttempts = destination.getDrainQueueAttempts();
+    if (drainQueueAttempts == 0) {
+      return;
+    }
+    int pending = destination.getQueueInfo().pending.size();
+    int inFlight = destination.getQueueInfo().inFlight.size();
+    while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
+      try {
+        logger.atInfo().log(
+            "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
+            inFlight, pending);
+        Thread.sleep(destination.getReplicationDelayMilliseconds());
+      } catch (InterruptedException ie) {
+        logger.atWarning().withCause(ie).log(
+            "Wait for replication events to drain has been interrupted");
+      }
+      pending = destination.getQueueInfo().pending.size();
+      inFlight = destination.getQueueInfo().inFlight.size();
+      drainQueueAttempts--;
+    }
+    if (pending > 0 || inFlight > 0) {
+      throw new EventQueueNotEmptyException(
+          String.format("Pending: %d - InFlight: %d", pending, inFlight));
+    }
+  }
+
+  @Subscribe
+  public synchronized void onReload(List<RemoteConfiguration> remoteConfigurations) {
+    if (shuttingDown) {
+      logger.atWarning().log("Shutting down: configuration reload ignored");
+      return;
+    }
+
+    try {
+      replicationQueue.get().stop();
+      destinations = allDestinations(destinationFactory, remoteConfigurations);
+      logger.atInfo().log("Configuration reloaded: %d destinations", getAll(FilterType.ALL).size());
+    } finally {
+      replicationQueue.get().start();
+    }
+  }
+
+  private List<Destination> allDestinations(
+      Destination.Factory destinationFactory, List<RemoteConfiguration> remoteConfigurations) {
+
+    ImmutableList.Builder<Destination> dest = ImmutableList.builder();
+    for (RemoteConfiguration c : remoteConfigurations) {
+      if (c instanceof DestinationConfiguration) {
+        dest.add(destinationFactory.create((DestinationConfiguration) c));
+      }
+    }
+    return dest.build();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
new file mode 100644
index 0000000..4cc9974
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
@@ -0,0 +1,182 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.io.Files.getNameWithoutExtension;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.collect.Lists;
+import com.google.common.flogger.FluentLogger;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.internal.storage.file.FileSnapshot;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+
+public class FanoutReplicationConfig implements ReplicationConfig {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final ReplicationFileBasedConfig replicationConfig;
+  private final Config config;
+  private final Path remoteConfigsDirPath;
+
+  @Inject
+  public FanoutReplicationConfig(SitePaths site, @PluginData Path pluginDataDir)
+      throws IOException, ConfigInvalidException {
+
+    remoteConfigsDirPath = site.etc_dir.resolve("replication");
+    replicationConfig = new ReplicationFileBasedConfig(site, pluginDataDir);
+    config = replicationConfig.getConfig();
+    removeRemotes(config);
+
+    try (Stream<Path> files = Files.list(remoteConfigsDirPath)) {
+      files
+          .filter(Files::isRegularFile)
+          .filter(FanoutReplicationConfig::isConfig)
+          .map(FanoutReplicationConfig::loadConfig)
+          .filter(Optional::isPresent)
+          .map(Optional::get)
+          .filter(FanoutReplicationConfig::isValid)
+          .forEach(cfg -> addRemoteConfig(cfg, config));
+    } catch (IllegalStateException e) {
+      throw new ConfigInvalidException(e.getMessage());
+    }
+  }
+
+  private static void removeRemotes(Config config) {
+    Set<String> remoteNames = config.getSubsections("remote");
+    if (remoteNames.size() > 0) {
+      logger.atSevere().log(
+          "When replication directory is present replication.config file cannot contain remote configuration. Ignoring: %s",
+          String.join(",", remoteNames));
+
+      for (String name : remoteNames) {
+        config.unsetSection("remote", name);
+      }
+    }
+  }
+
+  private static void addRemoteConfig(FileBasedConfig source, Config destination) {
+    String remoteName = getNameWithoutExtension(source.getFile().getName());
+    for (String name : source.getNames("remote")) {
+      destination.setStringList(
+          "remote",
+          remoteName,
+          name,
+          Lists.newArrayList(source.getStringList("remote", null, name)));
+    }
+  }
+
+  private static boolean isValid(Config cfg) {
+    if (cfg.getSections().size() != 1 || !cfg.getSections().contains("remote")) {
+      logger.atSevere().log(
+          "Remote replication configuration file %s must contain only one remote section.", cfg);
+      return false;
+    }
+    if (cfg.getSubsections("remote").size() > 0) {
+      logger.atSevere().log(
+          "Remote replication configuration file %s cannot contain remote subsections.", cfg);
+      return false;
+    }
+
+    return true;
+  }
+
+  private static Optional<FileBasedConfig> loadConfig(Path path) {
+    FileBasedConfig cfg = new FileBasedConfig(path.toFile(), FS.DETECTED);
+    try {
+      cfg.load();
+    } catch (IOException | ConfigInvalidException e) {
+      logger.atSevere().withCause(e).log(
+          "Cannot load remote replication configuration file %s.", path);
+      return Optional.empty();
+    }
+    return Optional.of(cfg);
+  }
+
+  private static boolean isConfig(Path p) {
+    return p.toString().endsWith(".config");
+  }
+
+  @Override
+  public boolean isReplicateAllOnPluginStart() {
+    return replicationConfig.isReplicateAllOnPluginStart();
+  }
+
+  @Override
+  public boolean isDefaultForceUpdate() {
+    return replicationConfig.isDefaultForceUpdate();
+  }
+
+  @Override
+  public int getMaxRefsToLog() {
+    return replicationConfig.getMaxRefsToLog();
+  }
+
+  @Override
+  public Path getEventsDirectory() {
+    return replicationConfig.getEventsDirectory();
+  }
+
+  @Override
+  public int getSshConnectionTimeout() {
+    return replicationConfig.getSshConnectionTimeout();
+  }
+
+  @Override
+  public int getSshCommandTimeout() {
+    return replicationConfig.getSshCommandTimeout();
+  }
+
+  @Override
+  public String getVersion() {
+    Hasher hasher = Hashing.murmur3_128().newHasher();
+    hasher.putString(replicationConfig.getVersion(), UTF_8);
+    try (Stream<Path> files = Files.list(remoteConfigsDirPath)) {
+      files
+          .filter(Files::isRegularFile)
+          .filter(FanoutReplicationConfig::isConfig)
+          .sorted()
+          .map(Path::toFile)
+          .map(FileSnapshot::save)
+          .forEach(
+              fileSnapshot ->
+                  // hashCode is based on file size, file key and last modified time
+                  hasher.putInt(fileSnapshot.hashCode()));
+      return hasher.hash().toString();
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log(
+          "Cannot list remote configuration files from %s. Returning replication.config file version",
+          remoteConfigsDirPath);
+      return replicationConfig.getVersion();
+    }
+  }
+
+  @Override
+  public Config getConfig() {
+    return config;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
index eac56df..66130f9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
@@ -18,8 +18,8 @@
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.restapi.Url;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import java.io.IOException;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
index 6dcc80e..d195aa3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.ssh.SshAddressesModule;
 import java.io.IOException;
 import java.io.OutputStream;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
index 07978ab..9264d9b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
@@ -40,11 +40,11 @@
   @Option(name = "--json", usage = "output in json format")
   private boolean json;
 
-  @Inject private ReplicationConfig config;
+  @Inject private ReplicationDestinations destinations;
 
   @Override
   protected void run() {
-    for (Destination d : config.getDestinations(FilterType.ALL)) {
+    for (Destination d : destinations.getAll(FilterType.ALL)) {
       if (matches(d.getRemoteConfigName())) {
         printRemote(d);
       }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
index aa6e16c..da960e6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -16,7 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import java.io.File;
 import java.io.IOException;
 import org.eclipse.jgit.internal.storage.file.FileRepository;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/MainReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/MainReplicationConfig.java
new file mode 100644
index 0000000..e8d95ec
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/MainReplicationConfig.java
@@ -0,0 +1,23 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@BindingAnnotation
+@Retention(RetentionPolicy.RUNTIME)
+public @interface MainReplicationConfig {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java
new file mode 100644
index 0000000..a347f3a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java
@@ -0,0 +1,55 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.io.IOException;
+import java.util.Locale;
+
+/** Some NFS utilities */
+public class Nfs {
+  /**
+   * Determine if a throwable or a cause in its causal chain is a Stale NFS File Handle
+   *
+   * @param throwable
+   * @return a boolean true if the throwable or a cause in its causal chain is a Stale NFS File
+   *     Handle
+   */
+  public static boolean isStaleFileHandleInCausalChain(Throwable throwable) {
+    while (throwable != null) {
+      if (throwable instanceof IOException && isStaleFileHandle((IOException) throwable)) {
+        return true;
+      }
+      throwable = throwable.getCause();
+    }
+    return false;
+  }
+
+  /**
+   * Determine if an IOException is a Stale NFS File Handle
+   *
+   * @param ioe
+   * @return a boolean true if the IOException is a Stale NFS FIle Handle
+   */
+  public static boolean isStaleFileHandle(IOException ioe) {
+    String msg = ioe.getMessage();
+    return msg != null && msg.toLowerCase(Locale.ROOT).matches(".*stale .*file .*handle.*");
+  }
+
+  public static <T extends Throwable> void throwIfNotStaleFileHandle(T e) throws T {
+    if (!isStaleFileHandleInCausalChain(e)) {
+      throw e;
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java
new file mode 100644
index 0000000..1007ae5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+/** Allows a queue activity to be observed */
+public interface ObservableQueue {
+  /**
+   * Indicates whether the observed queue is running
+   *
+   * @return true, when the queue is running, false otherwise
+   */
+  boolean isRunning();
+
+  /**
+   * Indicates whether the observed queue is replaying queued events
+   *
+   * @return true, when the queue is replaying, false otherwise
+   */
+  boolean isReplaying();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
index 833b02b..4f60319 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.gerrit.common.Nullable;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.inject.Inject;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index b488264..634f44d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -16,6 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.joining;
 import static java.util.stream.Collectors.toMap;
 
@@ -24,13 +25,13 @@
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Sets;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.ResourceConflictException;
 import com.google.gerrit.metrics.Timer1;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.reviewdb.client.RefNames;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.PerThreadRequestScope;
 import com.google.gerrit.server.git.ProjectRunnable;
@@ -217,6 +218,10 @@
     return maxRetries == 0 || retryCount <= maxRetries;
   }
 
+  private void retryDone() {
+    this.retrying = false;
+  }
+
   void canceledByReplication() {
     canceled = true;
   }
@@ -334,14 +339,20 @@
     }
 
     repLog.info("Replication to {} started...", uri);
-    Timer1.Context context = metrics.start(config.getName());
+    Timer1.Context<String> destinationContext = metrics.start(config.getName());
     try {
-      long startedAt = context.getStartTime();
+      long startedAt = destinationContext.getStartTime();
       long delay = NANOSECONDS.toMillis(startedAt - createdAt);
       metrics.record(config.getName(), delay, retryCount);
       git = gitManager.openRepository(projectName);
       runImpl();
-      long elapsed = NANOSECONDS.toMillis(context.stop());
+      long elapsed = NANOSECONDS.toMillis(destinationContext.stop());
+
+      if (elapsed > SECONDS.toMillis(pool.getSlowLatencyThreshold())) {
+        metrics.recordSlowProjectReplication(
+            config.getName(), projectName.get(), pool.getSlowLatencyThreshold(), elapsed);
+      }
+      retryDone();
       repLog.info(
           "Replication to {} completed in {}ms, {}ms delay, {} retries",
           uri,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
index fccdb7b..d1ab790 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
@@ -14,9 +14,10 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.events.RefEvent;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import java.util.Objects;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.RemoteRefUpdate.Status;
 
@@ -45,11 +46,40 @@
 
   @Override
   public Project.NameKey getProjectNameKey() {
-    return new Project.NameKey(project);
+    return Project.nameKey(project);
   }
 
   @Override
   public String getRefName() {
     return ref;
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof RefReplicatedEvent)) {
+      return false;
+    }
+    RefReplicatedEvent event = (RefReplicatedEvent) other;
+    if (!Objects.equals(event.project, this.project)) {
+      return false;
+    }
+    if (!Objects.equals(event.ref, this.ref)) {
+      return false;
+    }
+    if (!Objects.equals(event.targetNode, this.targetNode)) {
+      return false;
+    }
+    if (!Objects.equals(event.status, this.status)) {
+      return false;
+    }
+    if (!Objects.equals(event.refStatus, this.refStatus)) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
index 4789a96..e663194 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
@@ -14,8 +14,9 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.events.RefEvent;
+import java.util.Objects;
 
 public class RefReplicationDoneEvent extends RefEvent {
   public static final String TYPE = "ref-replication-done";
@@ -33,11 +34,35 @@
 
   @Override
   public Project.NameKey getProjectNameKey() {
-    return new Project.NameKey(project);
+    return Project.nameKey(project);
   }
 
   @Override
   public String getRefName() {
     return ref;
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof RefReplicationDoneEvent)) {
+      return false;
+    }
+
+    RefReplicationDoneEvent event = (RefReplicationDoneEvent) other;
+    if (!Objects.equals(event.project, this.project)) {
+      return false;
+    }
+    if (!Objects.equals(event.ref, this.ref)) {
+      return false;
+    }
+    if (event.nodesCount != this.nodesCount) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
new file mode 100644
index 0000000..b66e73c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
@@ -0,0 +1,122 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.eclipse.jgit.transport.RemoteConfig;
+
+/** Remote configuration for a replication endpoint */
+public interface RemoteConfiguration {
+  /**
+   * Time to wait before scheduling a remote replication operation. Setting to 0 effectively
+   * disables the delay.
+   *
+   * @return the delay value in seconds
+   */
+  int getDelay();
+  /**
+   * Time to wait before rescheduling a remote replication operation, which might have failed the
+   * first time round. Setting to 0 effectively disables the delay.
+   *
+   * @return the delay value in seconds
+   */
+  int getRescheduleDelay();
+  /**
+   * Time to wait before retrying a failed remote replication operation, Setting to 0 effectively
+   * disables the delay.
+   *
+   * @return the delay value in seconds
+   */
+  int getRetryDelay();
+  /**
+   * List of the remote endpoint addresses used for replication.
+   *
+   * @return list of remote URL strings
+   */
+  ImmutableList<String> getUrls();
+  /**
+   * List of alternative remote endpoint addresses, used for admin operations, such as repository
+   * creation
+   *
+   * @return list of remote URL strings
+   */
+  ImmutableList<String> getAdminUrls();
+  /**
+   * List of repositories that should be replicated
+   *
+   * @return list of project strings
+   */
+  ImmutableList<String> getProjects();
+  /**
+   * List of groups that should be used to access the repositories.
+   *
+   * @return list of group strings
+   */
+  ImmutableList<String> getAuthGroupNames();
+  /**
+   * Influence how the name of the remote repository should be computed.
+   *
+   * @return a string representing a remote style name
+   */
+  String getRemoteNameStyle();
+  /**
+   * If true, permissions-only projects and the refs/meta/config branch will also be replicated
+   *
+   * @return a string representing a remote style name
+   */
+  boolean replicatePermissions();
+  /**
+   * the JGIT remote configuration representing the replication for this endpoint
+   *
+   * @return The remote config {@link RemoteConfig}
+   */
+  RemoteConfig getRemoteConfig();
+  /**
+   * Number of times to retry a replication operation
+   *
+   * @return the number of retries
+   */
+  int getMaxRetries();
+
+  /**
+   * the time duration after which the replication for a project should be considered “slow”
+   *
+   * @return the slow latency threshold
+   */
+  int getSlowLatencyThreshold();
+
+  /**
+   * Whether the remote configuration is for a single project only
+   *
+   * @return true, when configuration is for a single project, false otherwise
+   */
+  default boolean isSingleProjectMatch() {
+    List<String> projects = getProjects();
+    boolean ret = (projects.size() == 1);
+    if (ret) {
+      String projectMatch = projects.get(0);
+      if (ReplicationFilter.getPatternType(projectMatch)
+          != ReplicationFilter.PatternType.EXACT_MATCH) {
+        // projectMatch is either regular expression, or wild-card.
+        //
+        // Even though they might refer to a single project now, they need not
+        // after new projects have been created. Hence, we do not treat them as
+        // matching a single project.
+        ret = false;
+      }
+    }
+    return ret;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
index e685215..7538298 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -16,7 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import java.io.IOException;
 import java.io.OutputStream;
 import org.eclipse.jgit.transport.URIish;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index ccdead8..b978952 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -11,46 +11,81 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
+
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.common.collect.Multimap;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.git.WorkQueue;
 import java.nio.file.Path;
-import java.util.List;
-import java.util.Optional;
-import org.eclipse.jgit.transport.URIish;
+import org.eclipse.jgit.lib.Config;
 
+/** Configuration of all the replication end points. */
 public interface ReplicationConfig {
 
+  /** Filter for accessing replication projects. */
   enum FilterType {
     PROJECT_CREATION,
     PROJECT_DELETION,
     ALL
   }
 
-  List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref);
-
-  List<Destination> getDestinations(FilterType filterType);
-
-  Multimap<Destination, URIish> getURIs(
-      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType);
-
+  /**
+   * Returns current replication configuration of whether to replicate or not all the projects when
+   * the plugin starts.
+   *
+   * @return true if replication at plugin start, false otherwise.
+   */
   boolean isReplicateAllOnPluginStart();
 
+  /**
+   * Returns the default behaviour of the replication plugin when pushing to remote replication
+   * ends. Even though the property name has the 'update' suffix, it actually refers to Git push
+   * operation and not to a Git update.
+   *
+   * @return true if forced push is the default, false otherwise.
+   */
   boolean isDefaultForceUpdate();
 
+  /**
+   * Returns the maximum number of ref-specs to log into the replication_log whenever a push
+   * operation is completed against a replication end.
+   *
+   * @return maximum number of refs to log, zero if unlimited.
+   */
   int getMaxRefsToLog();
 
-  boolean isEmpty();
-
+  /**
+   * Configured location where the replication events are stored on the filesystem for being resumed
+   * and kept across restarts.
+   *
+   * @return path to store persisted events.
+   */
   Path getEventsDirectory();
 
-  int shutdown();
-
-  void startup(WorkQueue workQueue);
-
+  /**
+   * Timeout for establishing SSH connection to remote.
+   *
+   * @return connection timeout, zero if infinite.
+   */
   int getSshConnectionTimeout();
 
+  /**
+   * Timeout for executing an SSH command on remote.
+   *
+   * @return command timeout, zero if infinite.
+   */
   int getSshCommandTimeout();
+
+  /**
+   * Current logical version string of the current configuration loaded in memory, depending on the
+   * actual implementation of the configuration on the persistent storage.
+   *
+   * @return current logical version number.
+   */
+  String getVersion();
+
+  /**
+   * Return a copy of the current config.
+   *
+   * @return the config.
+   */
+  Config getConfig();
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
new file mode 100644
index 0000000..18ccc66
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
@@ -0,0 +1,73 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.collect.Multimap;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.WorkQueue;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import java.util.List;
+import java.util.Optional;
+import org.eclipse.jgit.transport.URIish;
+
+/** Git destinations currently active for replication. */
+public interface ReplicationDestinations {
+
+  /**
+   * Return all the URIs associated to a project and a filter criteria.
+   *
+   * @param remoteName name of the replication end or empty if selecting all ends.
+   * @param projectName name of the project
+   * @param filterType type of filter criteria for selecting projects
+   * @return the multi-map of destinations and the associated replication URIs
+   */
+  Multimap<Destination, URIish> getURIs(
+      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType);
+
+  /**
+   * List of currently active replication destinations.
+   *
+   * @param filterType type project filtering
+   * @return the list of active destinations
+   */
+  List<Destination> getAll(FilterType filterType);
+
+  /**
+   * Return the active replication destinations for a uri/project/ref triplet.
+   *
+   * @param uriish uri of the destinations
+   * @param project name of the project
+   * @param ref ref name
+   * @return the list of active destinations
+   */
+  List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref);
+
+  /** @return true if there are no destinations, false otherwise. */
+  boolean isEmpty();
+
+  /**
+   * Start replicating to all destinations.
+   *
+   * @param workQueue execution queue for scheduling the replication events.
+   */
+  void startup(WorkQueue workQueue);
+
+  /**
+   * Stop the replication to all destinations.
+   *
+   * @return number of events cancelled during shutdown.
+   */
+  int shutdown();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 3094929..e99f6b1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -13,52 +13,22 @@
 // limitations under the License.
 package com.googlesource.gerrit.plugins.replication;
 
-import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
-import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp;
-import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static java.util.stream.Collectors.toList;
 
 import com.google.common.base.Strings;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.SetMultimap;
-import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.annotations.PluginData;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.config.ConfigUtil;
 import com.google.gerrit.server.config.SitePaths;
-import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
-import com.google.inject.Singleton;
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Predicate;
 import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.transport.RemoteConfig;
-import org.eclipse.jgit.transport.URIish;
 import org.eclipse.jgit.util.FS;
 
-@Singleton
 public class ReplicationFileBasedConfig implements ReplicationConfig {
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final int DEFAULT_SSH_CONNECTION_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes
 
-  private List<Destination> destinations;
   private final SitePaths site;
   private Path cfgPath;
   private boolean replicateAllOnPluginStart;
@@ -70,188 +40,24 @@
   private final Path pluginDataDir;
 
   @Inject
-  public ReplicationFileBasedConfig(
-      SitePaths site, Destination.Factory destinationFactory, @PluginData Path pluginDataDir)
-      throws ConfigInvalidException, IOException {
+  public ReplicationFileBasedConfig(SitePaths site, @PluginData Path pluginDataDir) {
     this.site = site;
     this.cfgPath = site.etc_dir.resolve("replication.config");
     this.config = new FileBasedConfig(cfgPath.toFile(), FS.DETECTED);
-    this.destinations = allDestinations(destinationFactory);
-    this.pluginDataDir = pluginDataDir;
-  }
-
-  @Override
-  public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
-    List<Destination> dests = new ArrayList<>();
-    for (Destination dest : getDestinations(FilterType.ALL)) {
-      if (dest.wouldPush(uri, project, ref)) {
-        dests.add(dest);
-      }
-    }
-    return dests;
-  }
-
-  /*
-   * (non-Javadoc)
-   * @see
-   * com.googlesource.gerrit.plugins.replication.ReplicationConfig#getDestinations
-   * (com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType)
-   */
-  @Override
-  public List<Destination> getDestinations(FilterType filterType) {
-    Predicate<? super Destination> filter;
-    switch (filterType) {
-      case PROJECT_CREATION:
-        filter = dest -> dest.isCreateMissingRepos();
-        break;
-      case PROJECT_DELETION:
-        filter = dest -> dest.isReplicateProjectDeletions();
-        break;
-      case ALL:
-      default:
-        filter = dest -> true;
-        break;
-    }
-    return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList());
-  }
-
-  private List<Destination> allDestinations(Destination.Factory destinationFactory)
-      throws ConfigInvalidException, IOException {
-    if (!config.getFile().exists()) {
-      logger.atWarning().log("Config file %s does not exist; not replicating", config.getFile());
-      return Collections.emptyList();
-    }
-    if (config.getFile().length() == 0) {
-      logger.atInfo().log("Config file %s is empty; not replicating", config.getFile());
-      return Collections.emptyList();
-    }
-
     try {
       config.load();
     } catch (ConfigInvalidException e) {
-      throw new ConfigInvalidException(
-          String.format("Config file %s is invalid: %s", config.getFile(), e.getMessage()), e);
+      repLog.error("Config file {} is invalid: {}", cfgPath, e.getMessage(), e);
     } catch (IOException e) {
-      throw new IOException(
-          String.format("Cannot read %s: %s", config.getFile(), e.getMessage()), e);
+      repLog.error("Cannot read {}: {}", cfgPath, e.getMessage(), e);
     }
-
-    replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
-
-    defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
-
-    maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0);
-
-    sshCommandTimeout =
-        (int) ConfigUtil.getTimeUnit(config, "gerrit", null, "sshCommandTimeout", 0, SECONDS);
-    sshConnectionTimeout =
-        (int)
-            ConfigUtil.getTimeUnit(
-                config,
-                "gerrit",
-                null,
-                "sshConnectionTimeout",
-                DEFAULT_SSH_CONNECTION_TIMEOUT_MS,
-                MILLISECONDS);
-
-    ImmutableList.Builder<Destination> dest = ImmutableList.builder();
-    for (RemoteConfig c : allRemotes(config)) {
-      if (c.getURIs().isEmpty()) {
-        continue;
-      }
-
-      // If destination for push is not set assume equal to source.
-      for (RefSpec ref : c.getPushRefSpecs()) {
-        if (ref.getDestination() == null) {
-          ref.setDestination(ref.getSource());
-        }
-      }
-
-      if (c.getPushRefSpecs().isEmpty()) {
-        c.addPushRefSpec(
-            new RefSpec()
-                .setSourceDestination("refs/*", "refs/*")
-                .setForceUpdate(defaultForceUpdate));
-      }
-
-      Destination destination = destinationFactory.create(new DestinationConfiguration(c, config));
-
-      if (!destination.isSingleProjectMatch()) {
-        for (URIish u : c.getURIs()) {
-          if (u.getPath() == null || !u.getPath().contains("${name}")) {
-            throw new ConfigInvalidException(
-                String.format(
-                    "remote.%s.url \"%s\" lacks ${name} placeholder in %s",
-                    c.getName(), u, config.getFile()));
-          }
-        }
-      }
-
-      dest.add(destination);
-    }
-    return dest.build();
+    this.replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
+    this.defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
+    this.maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0);
+    this.pluginDataDir = pluginDataDir;
   }
 
-  @Override
-  public Multimap<Destination, URIish> getURIs(
-      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
-    if (getDestinations(filterType).isEmpty()) {
-      return ImmutableMultimap.of();
-    }
-
-    SetMultimap<Destination, URIish> uris = HashMultimap.create();
-    for (Destination config : getDestinations(filterType)) {
-      if (filterType != FilterType.PROJECT_DELETION && !config.wouldPushProject(projectName)) {
-        continue;
-      }
-
-      if (remoteName.isPresent() && !config.getRemoteConfigName().equals(remoteName.get())) {
-        continue;
-      }
-
-      boolean adminURLUsed = false;
-
-      for (String url : config.getAdminUrls()) {
-        if (Strings.isNullOrEmpty(url)) {
-          continue;
-        }
-
-        URIish uri;
-        try {
-          uri = new URIish(url);
-        } catch (URISyntaxException e) {
-          repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage());
-          continue;
-        }
-
-        if (!isGerrit(uri) && !isGerritHttp(uri)) {
-          String path =
-              replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
-          if (path == null) {
-            repLog.warn("adminURL {} does not contain ${name}", uri);
-            continue;
-          }
-
-          uri = uri.setPath(path);
-          if (!isSSH(uri)) {
-            repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri);
-            continue;
-          }
-        }
-        uris.put(config, uri);
-        adminURLUsed = true;
-      }
-
-      if (!adminURLUsed) {
-        for (URIish uri : config.getURIs(projectName, "*")) {
-          uris.put(config, uri);
-        }
-      }
-    }
-    return uris;
-  }
-
-  static String replaceName(String in, String name, boolean keyIsOptional) {
+  public static String replaceName(String in, String name, boolean keyIsOptional) {
     String key = "${name}";
     int n = in.indexOf(key);
     if (0 <= n) {
@@ -284,28 +90,6 @@
     return maxRefsToLog;
   }
 
-  private static List<RemoteConfig> allRemotes(FileBasedConfig cfg) throws ConfigInvalidException {
-    Set<String> names = cfg.getSubsections("remote");
-    List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size());
-    for (String name : names) {
-      try {
-        result.add(new RemoteConfig(cfg, name));
-      } catch (URISyntaxException e) {
-        throw new ConfigInvalidException(
-            String.format("remote %s has invalid URL in %s", name, cfg.getFile()), e);
-      }
-    }
-    return result;
-  }
-
-  /* (non-Javadoc)
-   * @see com.googlesource.gerrit.plugins.replication.ReplicationConfig#isEmpty()
-   */
-  @Override
-  public boolean isEmpty() {
-    return destinations.isEmpty();
-  }
-
   @Override
   public Path getEventsDirectory() {
     String eventsDirectory = config.getString("replication", null, "eventsDirectory");
@@ -320,66 +104,13 @@
   }
 
   @Override
-  public int shutdown() {
-    int discarded = 0;
-    for (Destination cfg : destinations) {
-      try {
-        drainReplicationEvents(cfg);
-      } catch (EventQueueNotEmptyException e) {
-        logger.atWarning().log("Event queue not empty: %s", e.getMessage());
-      } finally {
-        discarded += cfg.shutdown();
-      }
-    }
-    return discarded;
-  }
-
-  void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
-    int drainQueueAttempts = destination.getDrainQueueAttempts();
-    if (drainQueueAttempts == 0) {
-      return;
-    }
-    int pending = destination.getQueueInfo().pending.size();
-    int inFlight = destination.getQueueInfo().inFlight.size();
-
-    while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
-      try {
-        logger.atInfo().log(
-            "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
-            inFlight, pending);
-        Thread.sleep(destination.getReplicationDelayMilliseconds());
-      } catch (InterruptedException ie) {
-        logger.atWarning().withCause(ie).log(
-            "Wait for replication events to drain has been interrupted");
-      }
-      pending = destination.getQueueInfo().pending.size();
-      inFlight = destination.getQueueInfo().inFlight.size();
-      drainQueueAttempts--;
-    }
-
-    if (pending > 0 || inFlight > 0) {
-      throw new EventQueueNotEmptyException(
-          String.format("Pending: %d - InFlight: %d", pending, inFlight));
-    }
-  }
-
-  public static class EventQueueNotEmptyException extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    public EventQueueNotEmptyException(String errorMessage) {
-      super(errorMessage);
-    }
-  }
-
-  FileBasedConfig getConfig() {
+  public Config getConfig() {
     return config;
   }
 
   @Override
-  public void startup(WorkQueue workQueue) {
-    for (Destination cfg : destinations) {
-      cfg.start(workQueue);
-    }
+  public String getVersion() {
+    return Long.toString(config.getFile().lastModified());
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
index 05bbb03..5b4204e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.gerrit.common.data.AccessSection;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import java.util.Collections;
 import java.util.List;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java
index afc7926..1bc17ec 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java
@@ -14,11 +14,14 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.metrics.Description;
 import com.google.gerrit.metrics.Field;
 import com.google.gerrit.metrics.Histogram1;
+import com.google.gerrit.metrics.Histogram3;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.server.logging.PluginMetadata;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -27,10 +30,37 @@
   private final Timer1<String> executionTime;
   private final Histogram1<String> executionDelay;
   private final Histogram1<String> executionRetries;
+  private final Histogram3<Integer, String, String> slowProjectReplicationLatency;
 
   @Inject
-  ReplicationMetrics(MetricMaker metricMaker) {
-    Field<String> DEST_FIELD = Field.ofString("destination");
+  ReplicationMetrics(@PluginName String pluginName, MetricMaker metricMaker) {
+    Field<String> DEST_FIELD =
+        Field.ofString(
+                "destination",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(PluginMetadata.create("destination", fieldValue)))
+            .build();
+
+    Field<String> PROJECT_FIELD =
+        Field.ofString(
+                "project",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(PluginMetadata.create("project", fieldValue)))
+            .build();
+
+    Field<Integer> SLOW_THRESHOLD_FIELD =
+        Field.ofInteger(
+                "slow_threshold",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(
+                            PluginMetadata.create("slow_threshold", fieldValue.toString())))
+            .build();
 
     executionTime =
         metricMaker.newTimer(
@@ -55,6 +85,17 @@
                 .setCumulative()
                 .setUnit("retries"),
             DEST_FIELD);
+
+    slowProjectReplicationLatency =
+        metricMaker.newHistogram(
+            "latency_slower_than_threshold" + "",
+            new Description(
+                    "latency for project to destination, where latency was slower than threshold")
+                .setCumulative()
+                .setUnit(Description.Units.MILLISECONDS),
+            SLOW_THRESHOLD_FIELD,
+            PROJECT_FIELD,
+            DEST_FIELD);
   }
 
   /**
@@ -63,7 +104,7 @@
    * @param name the destination name.
    * @return the timer context.
    */
-  Timer1.Context start(String name) {
+  Timer1.Context<String> start(String name) {
     return executionTime.start(name);
   }
 
@@ -78,4 +119,17 @@
     executionDelay.record(name, delay);
     executionRetries.record(name, retries);
   }
+
+  /**
+   * Record replication latency for project to destination, where latency was slower than threshold
+   *
+   * @param destinationName the destination name.
+   * @param projectName the project name.
+   * @param slowThreshold replication initialDelay in milliseconds.
+   * @param latency number of retries.
+   */
+  void recordSlowProjectReplication(
+      String destinationName, String projectName, Integer slowThreshold, long latency) {
+    slowProjectReplicationLatency.record(slowThreshold, destinationName, projectName, latency);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index 835d068..c2b96a1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -16,6 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.StartReplicationCapability.START_REPLICATION;
 
+import com.google.common.eventbus.EventBus;
 import com.google.gerrit.extensions.annotations.Exports;
 import com.google.gerrit.extensions.config.CapabilityDefinition;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
@@ -23,18 +24,38 @@
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.events.EventTypes;
 import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.ProvisionException;
 import com.google.inject.Scopes;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.internal.UniqueAnnotations;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.transport.SshSessionFactory;
+import org.eclipse.jgit.util.FS;
 
 class ReplicationModule extends AbstractModule {
+  private final SitePaths site;
+  private final Path cfgPath;
+
+  @Inject
+  public ReplicationModule(SitePaths site) {
+    this.site = site;
+    cfgPath = site.etc_dir.resolve("replication.config");
+  }
+
   @Override
   protected void configure() {
     install(new FactoryModuleBuilder().build(Destination.Factory.class));
     bind(ReplicationQueue.class).in(Scopes.SINGLETON);
+    bind(ObservableQueue.class).to(ReplicationQueue.class);
     bind(LifecycleListener.class)
         .annotatedWith(UniqueAnnotations.create())
         .to(ReplicationQueue.class);
@@ -57,7 +78,22 @@
 
     install(new FactoryModuleBuilder().build(PushAll.Factory.class));
 
-    bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
+    bind(EventBus.class).in(Scopes.SINGLETON);
+    bind(ReplicationDestinations.class).to(DestinationsCollection.class);
+    bind(ConfigParser.class).to(DestinationConfigParser.class).in(Scopes.SINGLETON);
+
+    if (getReplicationConfig().getBoolean("gerrit", "autoReload", false)) {
+      bind(ReplicationConfig.class)
+          .annotatedWith(MainReplicationConfig.class)
+          .to(getReplicationConfigClass());
+      bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class).in(Scopes.SINGLETON);
+      bind(LifecycleListener.class)
+          .annotatedWith(UniqueAnnotations.create())
+          .to(AutoReloadConfigDecorator.class);
+    } else {
+      bind(ReplicationConfig.class).to(getReplicationConfigClass()).in(Scopes.SINGLETON);
+    }
+
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
     DynamicSet.bind(binder(), ReplicationStateListener.class).to(ReplicationStateLogger.class);
 
@@ -68,4 +104,22 @@
 
     bind(TransportFactory.class).to(TransportFactoryImpl.class).in(Scopes.SINGLETON);
   }
+
+  private FileBasedConfig getReplicationConfig() {
+    File replicationConfigFile = cfgPath.toFile();
+    FileBasedConfig config = new FileBasedConfig(replicationConfigFile, FS.DETECTED);
+    try {
+      config.load();
+    } catch (IOException | ConfigInvalidException e) {
+      throw new ProvisionException("Unable to load " + replicationConfigFile.getAbsolutePath(), e);
+    }
+    return config;
+  }
+
+  private Class<? extends ReplicationConfig> getReplicationConfigClass() {
+    if (Files.exists(site.etc_dir.resolve("replication"))) {
+      return FanoutReplicationConfig.class;
+    }
+    return ReplicationFileBasedConfig.class;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index cf344f8..6a89e80 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -18,15 +18,16 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import com.google.gerrit.common.UsedAt;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
@@ -41,7 +42,8 @@
 
 /** Manages automatic replication to remote repositories. */
 public class ReplicationQueue
-    implements LifecycleListener,
+    implements ObservableQueue,
+        LifecycleListener,
         GitReferenceUpdatedListener,
         ProjectDeletedListener,
         HeadUpdatedListener {
@@ -52,7 +54,7 @@
 
   private final WorkQueue workQueue;
   private final DynamicItem<EventDispatcher> dispatcher;
-  private final ReplicationConfig config;
+  private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency
   private final ReplicationTasksStorage replicationTasksStorage;
   private volatile boolean running;
   private volatile boolean replaying;
@@ -61,13 +63,13 @@
   @Inject
   ReplicationQueue(
       WorkQueue wq,
-      ReplicationConfig rc,
+      Provider<ReplicationDestinations> rd,
       DynamicItem<EventDispatcher> dis,
       ReplicationStateListeners sl,
       ReplicationTasksStorage rts) {
     workQueue = wq;
     dispatcher = dis;
-    config = rc;
+    destinations = rd;
     stateLog = sl;
     replicationTasksStorage = rts;
     beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
@@ -76,8 +78,9 @@
   @Override
   public void start() {
     if (!running) {
-      config.startup(workQueue);
+      destinations.get().startup(workQueue);
       running = true;
+      replicationTasksStorage.resetAll();
       firePendingEvents();
       fireBeforeStartupEvents();
     }
@@ -86,16 +89,18 @@
   @Override
   public void stop() {
     running = false;
-    int discarded = config.shutdown();
+    int discarded = destinations.get().shutdown();
     if (discarded > 0) {
       repLog.warn("Canceled {} replication events during shutdown", discarded);
     }
   }
 
+  @Override
   public boolean isRunning() {
     return running;
   }
 
+  @Override
   public boolean isReplaying() {
     return replaying;
   }
@@ -107,48 +112,42 @@
   @VisibleForTesting
   public void scheduleFullSync(
       Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
-    if (!running) {
-      stateLog.warn("Replication plugin did not finish startup before event", state);
-      return;
-    }
-
-    for (Destination cfg : config.getDestinations(FilterType.ALL)) {
-      if (cfg.wouldPushProject(project)) {
-        for (URIish uri : cfg.getURIs(project, urlMatch)) {
-          cfg.schedule(project, PushOne.ALL_REFS, uri, state, now);
-          replicationTasksStorage.persist(
-              new ReplicateRefUpdate(
-                  project.get(), PushOne.ALL_REFS, uri, cfg.getRemoteConfigName()));
-        }
-      }
-    }
+    fire(project, urlMatch, PushOne.ALL_REFS, state, now);
   }
 
   @Override
   public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
-    onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+    fire(event.getProjectName(), event.getRefName());
   }
 
-  private void onGitReferenceUpdated(String projectName, String refName) {
+  private void fire(String projectName, String refName) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+    fire(Project.nameKey(projectName), null, refName, state, false);
+    state.markAllPushTasksScheduled();
+  }
+
+  private void fire(
+      Project.NameKey project,
+      String urlMatch,
+      String refName,
+      ReplicationState state,
+      boolean now) {
     if (!running) {
       stateLog.warn(
           "Replication plugin did not finish startup before event, event replication is postponed",
           state);
-      beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(projectName, refName));
+      beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(project.get(), refName));
       return;
     }
 
-    Project.NameKey project = new Project.NameKey(projectName);
-    for (Destination cfg : config.getDestinations(FilterType.ALL)) {
-      pushReference(cfg, project, refName, state);
+    for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
+      pushReference(cfg, project, urlMatch, refName, state, now);
     }
-    state.markAllPushTasksScheduled();
   }
 
   private void fire(URIish uri, Project.NameKey project, String refName) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
-    for (Destination dest : config.getDestinations(uri, project, refName)) {
+    for (Destination dest : destinations.get().getDestinations(uri, project, refName)) {
       dest.schedule(project, refName, uri, state);
     }
     state.markAllPushTasksScheduled();
@@ -156,26 +155,29 @@
 
   @UsedAt(UsedAt.Project.COLLABNET)
   public void pushReference(Destination cfg, Project.NameKey project, String refName) {
-    pushReference(cfg, project, refName, null);
+    pushReference(cfg, project, null, refName, null, true);
   }
 
   private void pushReference(
-      Destination cfg, Project.NameKey project, String refName, ReplicationState state) {
+      Destination cfg,
+      Project.NameKey project,
+      String urlMatch,
+      String refName,
+      ReplicationState state,
+      boolean now) {
     boolean withoutState = state == null;
     if (withoutState) {
       state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
     }
-
     if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
-      for (URIish uri : cfg.getURIs(project, null)) {
-        replicationTasksStorage.persist(
+      for (URIish uri : cfg.getURIs(project, urlMatch)) {
+        replicationTasksStorage.create(
             new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
-        cfg.schedule(project, refName, uri, state);
+        cfg.schedule(project, refName, uri, state, now);
       }
     } else {
       repLog.debug("Skipping ref {} on project {}", refName, project.get());
     }
-
     if (withoutState) {
       state.markAllPushTasksScheduled();
     }
@@ -185,13 +187,13 @@
     replaying = true;
     try {
       replaying = true;
-      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) {
+      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
         if (t == null) {
           repLog.warn("Encountered null replication event in ReplicationTasksStorage");
           continue;
         }
         try {
-          fire(new URIish(t.uri), new Project.NameKey(t.project), t.ref);
+          fire(new URIish(t.uri), Project.nameKey(t.project), t.ref);
         } catch (URISyntaxException e) {
           repLog.error("Encountered malformed URI for persisted event %s", t);
         }
@@ -203,15 +205,15 @@
 
   @Override
   public void onProjectDeleted(ProjectDeletedListener.Event event) {
-    Project.NameKey p = new Project.NameKey(event.getProjectName());
-    config.getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries().stream()
+    Project.NameKey p = Project.nameKey(event.getProjectName());
+    destinations.get().getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries().stream()
         .forEach(e -> e.getKey().scheduleDeleteProject(e.getValue(), p));
   }
 
   @Override
   public void onHeadUpdated(HeadUpdatedListener.Event event) {
-    Project.NameKey p = new Project.NameKey(event.getProjectName());
-    config.getURIs(Optional.empty(), p, FilterType.ALL).entries().stream()
+    Project.NameKey p = Project.nameKey(event.getProjectName());
+    destinations.get().getURIs(Optional.empty(), p, FilterType.ALL).entries().stream()
         .forEach(e -> e.getKey().scheduleUpdateHead(e.getValue(), p, event.getNewHeadName()));
   }
 
@@ -221,7 +223,7 @@
       String eventKey = String.format("%s:%s", event.projectName(), event.refName());
       if (!eventsReplayed.contains(eventKey)) {
         repLog.info("Firing pending task {}", event);
-        onGitReferenceUpdated(event.projectName(), event.refName());
+        fire(event.projectName(), event.refName());
         eventsReplayed.add(eventKey);
       }
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
index aa965fe..28f6b6b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.events.RefEvent;
 
 public class ReplicationScheduledEvent extends RefEvent {
@@ -38,6 +38,6 @@
 
   @Override
   public Project.NameKey getProjectNameKey() {
-    return new Project.NameKey(project);
+    return Project.nameKey(project);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index b1fbb10..98bc40f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -24,15 +24,38 @@
 import com.google.inject.ProvisionException;
 import com.google.inject.Singleton;
 import java.io.IOException;
+import java.nio.file.DirectoryIteratorException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.List;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.transport.URIish;
 
+/**
+ * A persistent store for replication tasks.
+ *
+ * <p>The data of this store lives under <replication_data>/ref-updates where replication_data is
+ * determined by the replication.eventsDirectory config option and defaults to
+ * <site_dir>/data/replication. Atomic renames must be supported from anywhere within the store to
+ * anywhere within the store. This generally means that all the contents of the store needs to live
+ * on the same filesystem.
+ *
+ * <p>Individual tasks are stored in files under the following directories using the sha1 of the
+ * task:
+ *
+ * <p><code>
+ *   .../building/<tmp_name>  new replication tasks under construction
+ *   .../running/<sha1>       running replication tasks
+ *   .../waiting/<sha1>       outstanding replication tasks
+ * </code>
+ *
+ * <p>Tasks are moved atomically via a rename between those directories to indicate the current
+ * state of each task.
+ */
 @Singleton
 public class ReplicationTasksStorage {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -45,6 +68,10 @@
     public final String uri;
     public final String remote;
 
+    public ReplicateRefUpdate(PushOne push, String ref) {
+      this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName());
+    }
+
     public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
       this.project = project;
       this.ref = ref;
@@ -61,29 +88,20 @@
   private static final Gson GSON = new Gson();
 
   private final Path refUpdates;
+  private final Path buildingUpdates;
+  private final Path runningUpdates;
+  private final Path waitingUpdates;
 
   @Inject
   ReplicationTasksStorage(ReplicationConfig config) {
     refUpdates = config.getEventsDirectory().resolve("ref-updates");
+    buildingUpdates = refUpdates.resolve("building");
+    runningUpdates = refUpdates.resolve("running");
+    waitingUpdates = refUpdates.resolve("waiting");
   }
 
-  public String persist(ReplicateRefUpdate r) {
-    String json = GSON.toJson(r) + "\n";
-    String key = r.project + "\n" + r.ref + "\n" + r.uri + "\n" + r.remote;
-    String eventKey = sha1(key).name();
-    Path file = refUpdates().resolve(eventKey);
-
-    if (Files.exists(file)) {
-      return eventKey;
-    }
-
-    try {
-      logger.atFine().log("CREATE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
-      Files.write(file, json.getBytes(UTF_8));
-    } catch (IOException e) {
-      logger.atWarning().withCause(e).log("Couldn't persist event %s", json);
-    }
-    return eventKey;
+  public synchronized String create(ReplicateRefUpdate r) {
+    return new Task(r).create();
   }
 
   @VisibleForTesting
@@ -91,27 +109,57 @@
     this.disableDeleteForTesting = deleteDisabled;
   }
 
+  @VisibleForTesting
   public void delete(ReplicateRefUpdate r) {
-    String key = r.project + "\n" + r.ref + "\n" + r.uri + "\n" + r.remote;
-    String taskKey = sha1(key).name();
-    Path file = refUpdates().resolve(taskKey);
+    new Task(r).delete();
+  }
 
-    if (disableDeleteForTesting) {
-      logger.atFine().log("DELETE %s (%s:%s => %s) DISABLED", file, r.project, r.ref, r.uri);
-      return;
-    }
-
-    try {
-      logger.atFine().log("DELETE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
-      Files.delete(file);
-    } catch (IOException e) {
-      logger.atSevere().withCause(e).log("Error while deleting event %s", taskKey);
+  public synchronized void start(PushOne push) {
+    for (String ref : push.getRefs()) {
+      new Task(new ReplicateRefUpdate(push, ref)).start();
     }
   }
 
-  public List<ReplicateRefUpdate> list() {
+  public synchronized void reset(PushOne push) {
+    for (String ref : push.getRefs()) {
+      new Task(new ReplicateRefUpdate(push, ref)).reset();
+    }
+  }
+
+  public synchronized void resetAll() {
+    for (ReplicateRefUpdate r : listRunning()) {
+      new Task(r).reset();
+    }
+  }
+
+  public synchronized void finish(PushOne push) {
+    for (String ref : push.getRefs()) {
+      new Task(new ReplicateRefUpdate(push, ref)).finish();
+    }
+  }
+
+  public synchronized List<ReplicateRefUpdate> listWaiting() {
+    return list(createDir(waitingUpdates));
+  }
+
+  @VisibleForTesting
+  public synchronized List<ReplicateRefUpdate> listRunning() {
+    return list(createDir(runningUpdates));
+  }
+
+  @VisibleForTesting
+  public synchronized List<ReplicateRefUpdate> listBuilding() {
+    return list(createDir(buildingUpdates));
+  }
+
+  @VisibleForTesting
+  public synchronized List<ReplicateRefUpdate> list() {
+    return list(createDir(refUpdates));
+  }
+
+  private List<ReplicateRefUpdate> list(Path tasks) {
     List<ReplicateRefUpdate> results = new ArrayList<>();
-    try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) {
+    try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) {
       for (Path path : events) {
         if (Files.isRegularFile(path)) {
           try {
@@ -124,10 +172,17 @@
           } catch (IOException e) {
             logger.atSevere().withCause(e).log("Error when firing pending event %s", path);
           }
+        } else if (Files.isDirectory(path)) {
+          try {
+            results.addAll(list(path));
+          } catch (DirectoryIteratorException d) {
+            // iterating over the sub-directories is expected to have dirs disappear
+            Nfs.throwIfNotStaleFileHandle(d.getCause());
+          }
         }
       }
     } catch (IOException e) {
-      logger.atSevere().withCause(e).log("Error when firing pending events");
+      logger.atSevere().withCause(e).log("Error while listing tasks");
     }
     return results;
   }
@@ -137,11 +192,89 @@
     return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
   }
 
-  private Path refUpdates() {
+  private static Path createDir(Path dir) {
     try {
-      return Files.createDirectories(refUpdates);
+      return Files.createDirectories(dir);
     } catch (IOException e) {
-      throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e);
+      throw new ProvisionException(String.format("Couldn't create %s", dir), e);
+    }
+  }
+
+  private class Task {
+    public final ReplicateRefUpdate update;
+    public final String json;
+    public final String taskKey;
+    public final Path running;
+    public final Path waiting;
+
+    public Task(ReplicateRefUpdate update) {
+      this.update = update;
+      json = GSON.toJson(update) + "\n";
+      String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
+      taskKey = sha1(key).name();
+      running = createDir(runningUpdates).resolve(taskKey);
+      waiting = createDir(waitingUpdates).resolve(taskKey);
+    }
+
+    public String create() {
+      if (Files.exists(waiting)) {
+        return taskKey;
+      }
+
+      try {
+        Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null);
+        logger.atFine().log("CREATE %s %s", tmp, updateLog());
+        Files.write(tmp, json.getBytes(UTF_8));
+        logger.atFine().log("RENAME %s %s %s", tmp, waiting, updateLog());
+        rename(tmp, waiting);
+      } catch (IOException e) {
+        logger.atWarning().withCause(e).log("Couldn't create task %s", json);
+      }
+      return taskKey;
+    }
+
+    public void start() {
+      rename(waiting, running);
+    }
+
+    public void reset() {
+      rename(running, waiting);
+    }
+
+    public void finish() {
+      if (disableDeleteForTesting) {
+        logger.atFine().log("DELETE %s %s DISABLED", running, updateLog());
+        return;
+      }
+
+      try {
+        logger.atFine().log("DELETE %s %s", running, updateLog());
+        Files.delete(running);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+      }
+    }
+
+    public void delete() {
+      try {
+        Files.deleteIfExists(waiting);
+        Files.deleteIfExists(running);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+      }
+    }
+
+    private void rename(Path from, Path to) {
+      try {
+        logger.atFine().log("RENAME %s to %s %s", from, to, updateLog());
+        Files.move(from, to, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while renaming task %s", taskKey);
+      }
+    }
+
+    private String updateLog() {
+      return String.format("(%s:%s => %s)", update.project, update.ref, update.uri);
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
index 70452b4..ffa6be1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
@@ -16,8 +16,8 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.ioutil.HexFormat;
 import com.google.gerrit.server.util.IdGenerator;
 import com.google.inject.Inject;
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 194238c..0aad73b 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -142,6 +142,11 @@
 	persisted events will not be deleted. When the plugin is started again,
 	it will trigger all replications found under this directory.
 
+	For replication to work, is is important that atomic renames be possible
+	from within any subdirectory of the eventsDirectory to within any other
+	subdirectory of the eventsDirectory. This generally means that the entire
+	contents of the eventsDirectory should live on the same filesystem.
+
 	When not set, defaults to the plugin's data directory.
 
 remote.NAME.url
@@ -442,6 +447,88 @@
 	By default, replicates without matching, i.e. replicates
 	everything to all remotes.
 
+remote.NAME.slowLatencyThreshold
+:	the time duration after which the replication of a project to this
+	destination will be considered "slow". A slow project replication
+	will cause additional metrics to be exposed for further investigation.
+	See [metrics.md](metrics.md) for further details.
+
+	default: 15 minutes
+
+Directory `replication`
+--------------------
+The optional directory `$site_path/etc/replication` contains Git-style
+config files that controls the replication settings for the replication
+plugin. When present all `remote` sections from `replication.config` file are
+ignored.
+
+Files are composed of one `remote` section. Multiple `remote` sections or any
+other section makes the file invalid and skipped by the replication plugin.
+File name defines remote section name. Each section provides common configuration
+settings for one or more destination URLs. For more details how to setup `remote`
+sections please refer to the `replication.config` section.
+
+### Configuration example:
+
+Static configuration in `$site_path/etc/replication.config`:
+
+```
+[gerrit]
+    autoReload = true
+    replicateOnStartup = false
+[replication]
+    lockErrorMaxRetries = 5
+    maxRetries = 5
+```
+
+Remote sections in `$site_path/etc/replication` directory:
+
+* File `$site_path/etc/replication/host-one.config`
+
+ ```
+ [remote]
+    url = gerrit2@host-one.example.com:/some/path/${name}.git
+ ```
+
+
+* File `$site_path/etc/replication/pubmirror.config`
+
+ ```
+  [remote]
+    url = mirror1.us.some.org:/pub/git/${name}.git
+    url = mirror2.us.some.org:/pub/git/${name}.git
+    url = mirror3.us.some.org:/pub/git/${name}.git
+    push = +refs/heads/*:refs/heads/*
+    push = +refs/tags/*:refs/tags/*
+    threads = 3
+    authGroup = Public Mirror Group
+    authGroup = Second Public Mirror Group
+ ```
+
+Replication plugin resolves config files to the following configuration:
+
+```
+[gerrit]
+    autoReload = true
+    replicateOnStartup = false
+[replication]
+    lockErrorMaxRetries = 5
+    maxRetries = 5
+
+[remote "host-one"]
+    url = gerrit2@host-one.example.com:/some/path/${name}.git
+
+[remote "pubmirror"]
+    url = mirror1.us.some.org:/pub/git/${name}.git
+    url = mirror2.us.some.org:/pub/git/${name}.git
+    url = mirror3.us.some.org:/pub/git/${name}.git
+    push = +refs/heads/*:refs/heads/*
+    push = +refs/tags/*:refs/tags/*
+    threads = 3
+    authGroup = Public Mirror Group
+    authGroup = Second Public Mirror Group
+```
+
 File `secure.config`
 --------------------
 
diff --git a/src/main/resources/Documentation/metrics.md b/src/main/resources/Documentation/metrics.md
new file mode 100644
index 0000000..ef8b150
--- /dev/null
+++ b/src/main/resources/Documentation/metrics.md
@@ -0,0 +1,61 @@
+# Metrics
+
+Some metrics are emitted when replication occurs to a remote destination.
+The granularity of the metrics recorded is at destination level, however when a particular project replication is flagged
+as slow. This happens when the replication took longer than allowed threshold (see _remote.NAME.slowLatencyThreshold_ in [config.md](config.md))
+
+The reason only slow metrics are published, rather than all, is to contain their number, which, on a big Gerrit installation
+could potentially be considerably big.
+
+### Project level
+
+* plugins_replication_latency_slower_than_<threshold>_<destinationName>_<ProjectName> - Time spent pushing <ProjectName> to remote <destinationName> (in ms)
+
+### Destination level
+
+* plugins_replication_replication_delay_<destinationName> - Time spent waiting before pushing to remote <destinationName> (in ms)
+* plugins_replication_replication_retries_<destinationName> - Number of retries when pushing to remote <destinationName>
+* plugins_replication_replication_latency_<destinationName> - Time spent pushing to remote <destinationName> (in ms)
+
+### Example
+```
+# HELP plugins_replication_replication_delay_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_delay/destination, type=com.codahale.metrics.Histogram)
+# TYPE plugins_replication_replication_delay_destination summary
+plugins_replication_replication_delay_destinationName{quantile="0.5",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.75",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.95",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.98",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.99",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.999",} 65726.0
+plugins_replication_replication_delay_destinationName_count 3.0
+
+# HELP plugins_replication_replication_retries_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_retries/destination, type=com.codahale.metrics.Histogram)
+# TYPE plugins_replication_replication_retries_destination summary
+plugins_replication_replication_retries_destinationName{quantile="0.5",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.75",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.95",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.98",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.99",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.999",} 1.0
+plugins_replication_replication_retries_destinationName_count 3.0
+
+# HELP plugins_replication_replication_latency_destinationName Generated from Dropwizard metric import (metric=plugins/replication/replication_latency/destinationName, type=com.codahale.metrics.Timer)
+# TYPE plugins_replication_replication_latency_destinationName summary
+plugins_replication_replication_latency_destinationName{quantile="0.5",} 0.21199641400000002
+plugins_replication_replication_latency_destinationName{quantile="0.75",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.95",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.98",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.99",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.999",} 0.321083881
+plugins_replication_replication_latency_destinationName_count 2.0
+
+# HELP plugins_replication_latency_slower_than_60_destinationName_projectName Generated from Dropwizard metric import (metric=plugins/replication/latency_slower_than/60/destinationName/projectName, type=com.codahale.metrics.Histogram)
+# TYPE plugins_replication_latency_slower_than_60_destinationName_projectName summary
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.5",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.75",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.95",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.98",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.99",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.999",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName 1.0
+```
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
index 77dc1cc..2b6a8c4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
@@ -16,26 +16,28 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static java.nio.file.Files.createTempDirectory;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.getCurrentArguments;
-import static org.easymock.EasyMock.isA;
-import static org.easymock.EasyMock.replay;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+import com.google.common.eventbus.EventBus;
 import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Injector;
 import com.google.inject.Module;
+import com.google.inject.util.Providers;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.stream.Collectors;
-import org.easymock.IAnswer;
+import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.util.FS;
 import org.junit.Before;
 import org.junit.Ignore;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 @Ignore
 public abstract class AbstractConfigTest {
@@ -43,6 +45,11 @@
   protected final SitePaths sitePaths;
   protected final Destination.Factory destinationFactoryMock;
   protected final Path pluginDataPath;
+  protected ReplicationQueue replicationQueueMock;
+  protected WorkQueue workQueueMock;
+  protected EventBus eventBus = new EventBus();
+  protected FakeExecutorService executorService = new FakeExecutorService();
+  protected ConfigParser configParser;
 
   static class FakeDestination extends Destination {
     public final DestinationConfiguration config;
@@ -53,11 +60,9 @@
     }
 
     private static Injector injectorMock() {
-      Injector injector = createNiceMock(Injector.class);
-      Injector childInjectorMock = createNiceMock(Injector.class);
-      expect(injector.createChildInjector((Module) anyObject())).andReturn(childInjectorMock);
-      replay(childInjectorMock);
-      replay(injector);
+      Injector injector = mock(Injector.class);
+      Injector childInjectorMock = mock(Injector.class);
+      when(injector.createChildInjector(any(Module.class))).thenReturn(childInjectorMock);
       return injector;
     }
   }
@@ -66,21 +71,26 @@
     sitePath = createTempPath("site");
     sitePaths = new SitePaths(sitePath);
     pluginDataPath = createTempPath("data");
-    destinationFactoryMock = createMock(Destination.Factory.class);
+    destinationFactoryMock = mock(Destination.Factory.class);
+    configParser = new DestinationConfigParser();
   }
 
   @Before
   public void setup() {
-    expect(destinationFactoryMock.create(isA(DestinationConfiguration.class)))
-        .andAnswer(
-            new IAnswer<Destination>() {
+    when(destinationFactoryMock.create(any(DestinationConfiguration.class)))
+        .thenAnswer(
+            new Answer<Destination>() {
               @Override
-              public Destination answer() throws Throwable {
-                return new FakeDestination((DestinationConfiguration) getCurrentArguments()[0]);
+              public Destination answer(InvocationOnMock invocation) throws Throwable {
+                return new FakeDestination((DestinationConfiguration) invocation.getArguments()[0]);
               }
-            })
-        .anyTimes();
-    replay(destinationFactoryMock);
+            });
+
+    replicationQueueMock = mock(ReplicationQueue.class);
+    when(replicationQueueMock.isRunning()).thenReturn(Boolean.TRUE);
+
+    workQueueMock = mock(WorkQueue.class);
+    when(workQueueMock.createQueue(anyInt(), any(String.class))).thenReturn(executorService);
   }
 
   protected static Path createTempPath(String prefix) throws IOException {
@@ -88,8 +98,12 @@
   }
 
   protected FileBasedConfig newReplicationConfig() {
+    return newReplicationConfig("replication.config");
+  }
+
+  protected FileBasedConfig newReplicationConfig(String path) {
     FileBasedConfig replicationConfig =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+        new FileBasedConfig(sitePaths.etc_dir.resolve(path).toFile(), FS.DETECTED);
     return replicationConfig;
   }
 
@@ -113,4 +127,18 @@
 
     assertThatIsDestination(matchingDestinations.get(0), remoteName, remoteUrls);
   }
+
+  protected DestinationsCollection newDestinationsCollections(ReplicationConfig replicationConfig)
+      throws ConfigInvalidException {
+    return new DestinationsCollection(
+        destinationFactoryMock,
+        Providers.of(replicationQueueMock),
+        replicationConfig,
+        configParser,
+        eventBus);
+  }
+
+  protected ReplicationConfig newReplicationFileBasedConfig() {
+    return new ReplicationFileBasedConfig(sitePaths, pluginDataPath);
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
index 211cafa..b1b9453 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
@@ -15,192 +15,41 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import static com.google.common.truth.Truth.assertThat;
-import static org.easymock.EasyMock.anyInt;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
 
-import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Provider;
 import com.google.inject.util.Providers;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.junit.Before;
 import org.junit.Test;
 
 public class AutoReloadConfigDecoratorTest extends AbstractConfigTest {
-  private AutoReloadConfigDecorator autoReloadConfig;
-  private ReplicationQueue replicationQueueMock;
-  private WorkQueue workQueueMock;
-  private FakeExecutorService executorService = new FakeExecutorService();
-
-  public class FakeExecutorService implements ScheduledExecutorService {
-    public Runnable refreshCommand;
-
-    @Override
-    public void shutdown() {}
-
-    @Override
-    public List<Runnable> shutdownNow() {
-      return null;
-    }
-
-    @Override
-    public boolean isShutdown() {
-      return false;
-    }
-
-    @Override
-    public boolean isTerminated() {
-      return false;
-    }
-
-    @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
-      return false;
-    }
-
-    @Override
-    public <T> Future<T> submit(Callable<T> task) {
-      return null;
-    }
-
-    @Override
-    public <T> Future<T> submit(Runnable task, T result) {
-      return null;
-    }
-
-    @Override
-    public Future<?> submit(Runnable task) {
-      return null;
-    }
-
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException {
-      return null;
-    }
-
-    @Override
-    public <T> List<Future<T>> invokeAll(
-        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-        throws InterruptedException {
-      return null;
-    }
-
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException, ExecutionException {
-      return null;
-    }
-
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-      return null;
-    }
-
-    @Override
-    public void execute(Runnable command) {}
-
-    @Override
-    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
-      return null;
-    }
-
-    @Override
-    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
-      return null;
-    }
-
-    @Override
-    public ScheduledFuture<?> scheduleAtFixedRate(
-        Runnable command, long initialDelay, long period, TimeUnit unit) {
-      refreshCommand = command;
-      return null;
-    }
-
-    @Override
-    public ScheduledFuture<?> scheduleWithFixedDelay(
-        Runnable command, long initialDelay, long delay, TimeUnit unit) {
-      return null;
-    }
-  }
+  ReplicationConfig replicationConfig;
 
   public AutoReloadConfigDecoratorTest() throws IOException {
     super();
   }
 
-  @Override
-  @Before
-  public void setup() {
-    super.setup();
-
-    setupMocks();
-  }
-
-  private void setupMocks() {
-    replicationQueueMock = createNiceMock(ReplicationQueue.class);
-    expect(replicationQueueMock.isRunning()).andReturn(true);
-    replay(replicationQueueMock);
-
-    workQueueMock = createNiceMock(WorkQueue.class);
-    expect(workQueueMock.createQueue(anyInt(), anyObject(String.class))).andReturn(executorService);
-    replay(workQueueMock);
-  }
-
-  @Test
-  public void shouldLoadNotEmptyInitialReplicationConfig() throws Exception {
-    FileBasedConfig replicationConfig = newReplicationConfig();
-    String remoteName = "foo";
-    String remoteUrl = "ssh://git@git.somewhere.com/${name}";
-    replicationConfig.setString("remote", remoteName, "url", remoteUrl);
-    replicationConfig.save();
-
-    autoReloadConfig =
-        new AutoReloadConfigDecorator(
-            sitePaths,
-            destinationFactoryMock,
-            Providers.of(replicationQueueMock),
-            pluginDataPath,
-            "replication",
-            workQueueMock);
-
-    List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL);
-    assertThat(destinations).hasSize(1);
-    assertThatIsDestination(destinations.get(0), remoteName, remoteUrl);
-  }
-
   @Test
   public void shouldAutoReloadReplicationConfig() throws Exception {
-    FileBasedConfig replicationConfig = newReplicationConfig();
-    replicationConfig.setBoolean("gerrit", null, "autoReload", true);
+    FileBasedConfig fileConfig = newReplicationConfig();
+    fileConfig.setBoolean("gerrit", null, "autoReload", true);
     String remoteName1 = "foo";
     String remoteUrl1 = "ssh://git@git.foo.com/${name}";
-    replicationConfig.setString("remote", remoteName1, "url", remoteUrl1);
-    replicationConfig.save();
+    fileConfig.setString("remote", remoteName1, "url", remoteUrl1);
+    fileConfig.save();
 
-    autoReloadConfig =
-        new AutoReloadConfigDecorator(
-            sitePaths,
-            destinationFactoryMock,
-            Providers.of(replicationQueueMock),
-            pluginDataPath,
-            "replication",
-            workQueueMock);
-    autoReloadConfig.startup(workQueueMock);
+    replicationConfig = newReplicationFileBasedConfig();
 
-    List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL);
+    newAutoReloadConfig(() -> newReplicationFileBasedConfig()).start();
+
+    DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(1);
     assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
 
@@ -208,45 +57,195 @@
 
     String remoteName2 = "bar";
     String remoteUrl2 = "ssh://git@git.bar.com/${name}";
-    replicationConfig.setString("remote", remoteName2, "url", remoteUrl2);
-    replicationConfig.save();
+    fileConfig.setString("remote", remoteName2, "url", remoteUrl2);
+    fileConfig.save();
     executorService.refreshCommand.run();
 
-    destinations = autoReloadConfig.getDestinations(FilterType.ALL);
+    destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(2);
     assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
     assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
   }
 
   @Test
+  public void shouldAutoReloadFanoutReplicationConfigWhenConfigIsAdded() throws Exception {
+    String remoteName1 = "foo";
+    String remoteUrl1 = "ssh://git@git.foo.com/${name}";
+    FileBasedConfig remoteConfig = newReplicationConfig("replication/" + remoteName1 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl1);
+    remoteConfig.save();
+
+    replicationConfig = new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    newAutoReloadConfig(
+            () -> {
+              try {
+                return new FanoutReplicationConfig(sitePaths, pluginDataPath);
+              } catch (IOException | ConfigInvalidException e) {
+                throw new RuntimeException(e);
+              }
+            })
+        .start();
+
+    DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+
+    TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS
+
+    String remoteName2 = "foobar";
+    String remoteUrl2 = "ssh://git@git.foobar.com/${name}";
+    remoteConfig = newReplicationConfig("replication/" + remoteName2 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl2);
+    remoteConfig.save();
+    executorService.refreshCommand.run();
+
+    destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(2);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
+  }
+
+  @Test
+  public void shouldAutoReloadFanoutReplicationConfigWhenConfigIsRemoved() throws Exception {
+    String remoteName1 = "foo";
+    String remoteUrl1 = "ssh://git@git.foo.com/${name}";
+    FileBasedConfig remoteConfig = newReplicationConfig("replication/" + remoteName1 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl1);
+    remoteConfig.save();
+
+    String remoteName2 = "foobar";
+    String remoteUrl2 = "ssh://git@git.foobar.com/${name}";
+    remoteConfig = newReplicationConfig("replication/" + remoteName2 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl2);
+    remoteConfig.save();
+
+    replicationConfig = new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    newAutoReloadConfig(
+            () -> {
+              try {
+                return new FanoutReplicationConfig(sitePaths, pluginDataPath);
+              } catch (IOException | ConfigInvalidException e) {
+                throw new RuntimeException(e);
+              }
+            })
+        .start();
+
+    DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(2);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
+
+    TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS
+
+    assertThat(
+            sitePaths.etc_dir.resolve("replication/" + remoteName2 + ".config").toFile().delete())
+        .isTrue();
+
+    executorService.refreshCommand.run();
+
+    destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+  }
+
+  @Test
+  public void shouldAutoReloadFanoutReplicationConfigWhenConfigIsModified() throws Exception {
+    String remoteName1 = "foo";
+    String remoteUrl1 = "ssh://git@git.foo.com/${name}";
+    FileBasedConfig remoteConfig = newReplicationConfig("replication/" + remoteName1 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl1);
+    remoteConfig.save();
+
+    String remoteName2 = "bar";
+    String remoteUrl2 = "ssh://git@git.bar.com/${name}";
+    remoteConfig = newReplicationConfig("replication/" + remoteName2 + ".config");
+    remoteConfig.setString("remote", null, "url", remoteUrl2);
+    remoteConfig.save();
+
+    replicationConfig = new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    newAutoReloadConfig(
+            () -> {
+              try {
+                return new FanoutReplicationConfig(sitePaths, pluginDataPath);
+              } catch (IOException | ConfigInvalidException e) {
+                throw new RuntimeException(e);
+              }
+            })
+        .start();
+
+    DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(2);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
+
+    TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS
+
+    String remoteUrl3 = "ssh://git@git.foobar.com/${name}";
+    remoteConfig.setString("remote", null, "url", remoteUrl3);
+    remoteConfig.save();
+
+    executorService.refreshCommand.run();
+
+    destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(2);
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl3);
+  }
+
+  @Test
   public void shouldNotAutoReloadReplicationConfigIfDisabled() throws Exception {
     String remoteName1 = "foo";
     String remoteUrl1 = "ssh://git@git.foo.com/${name}";
-    FileBasedConfig replicationConfig = newReplicationConfig();
-    replicationConfig.setBoolean("gerrit", null, "autoReload", false);
-    replicationConfig.setString("remote", remoteName1, "url", remoteUrl1);
-    replicationConfig.save();
+    FileBasedConfig fileConfig = newReplicationConfig();
+    fileConfig.setBoolean("gerrit", null, "autoReload", false);
+    fileConfig.setString("remote", remoteName1, "url", remoteUrl1);
+    fileConfig.save();
 
-    autoReloadConfig =
-        new AutoReloadConfigDecorator(
-            sitePaths,
-            destinationFactoryMock,
-            Providers.of(replicationQueueMock),
-            pluginDataPath,
-            "replication",
-            workQueueMock);
-    autoReloadConfig.startup(workQueueMock);
+    replicationConfig = newReplicationFileBasedConfig();
 
-    List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL);
+    DestinationsCollection destinationsCollections = newDestinationsCollections(replicationConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(1);
     assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
 
     TimeUnit.SECONDS.sleep(1); // Allow the filesystem to change the update TS
 
-    replicationConfig.setString("remote", "bar", "url", "ssh://git@git.bar.com/${name}");
-    replicationConfig.save();
+    fileConfig.setString("remote", "bar", "url", "ssh://git@git.bar.com/${name}");
+    fileConfig.save();
     executorService.refreshCommand.run();
 
-    assertThat(autoReloadConfig.getDestinations(FilterType.ALL)).isEqualTo(destinations);
+    assertThat(destinationsCollections.getAll(FilterType.ALL)).isEqualTo(destinations);
+  }
+
+  private AutoReloadConfigDecorator newAutoReloadConfig(
+      Supplier<ReplicationConfig> configSupplier) {
+    AutoReloadRunnable autoReloadRunnable =
+        new AutoReloadRunnable(
+            configParser,
+            new Provider<ReplicationConfig>() {
+
+              @Override
+              public ReplicationConfig get() {
+                return configSupplier.get();
+              }
+            },
+            eventBus,
+            Providers.of(replicationQueueMock));
+    return new AutoReloadConfigDecorator(
+        "replication",
+        workQueueMock,
+        newReplicationFileBasedConfig(),
+        autoReloadRunnable,
+        eventBus);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
new file mode 100644
index 0000000..e7339d9
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
@@ -0,0 +1,122 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Provider;
+import com.google.inject.util.Providers;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoReloadRunnableTest {
+
+  private SitePaths sitePaths;
+  private EventBus eventBus;
+  private ReloadTrackerSubscriber onReloadSubscriber;
+  private String pluginName;
+  private ReplicationQueue replicationQueueMock;
+
+  @Before
+  public void setUp() throws IOException {
+    Path tmp = Files.createTempFile(pluginName, "_site");
+    Files.deleteIfExists(tmp);
+    sitePaths = new SitePaths(tmp);
+    pluginName = "replication";
+    eventBus = new EventBus();
+    onReloadSubscriber = new ReloadTrackerSubscriber();
+    eventBus.register(onReloadSubscriber);
+
+    replicationQueueMock = mock(ReplicationQueue.class);
+    when(replicationQueueMock.isRunning()).thenReturn(Boolean.TRUE);
+  }
+
+  @Test
+  public void configurationIsReloadedWhenParsingSucceeds() {
+    ConfigParser parser = new TestValidConfigurationListener();
+
+    attemptAutoReload(parser);
+
+    assertThat(onReloadSubscriber.reloaded).isTrue();
+  }
+
+  @Test
+  public void configurationIsNotReloadedWhenParsingFails() {
+    ConfigParser parser = new TestInvalidConfigurationListener();
+
+    attemptAutoReload(parser);
+
+    assertThat(onReloadSubscriber.reloaded).isFalse();
+  }
+
+  private void attemptAutoReload(ConfigParser validator) {
+    final AutoReloadRunnable autoReloadRunnable =
+        new AutoReloadRunnable(
+            validator, newVersionConfigProvider(), eventBus, Providers.of(replicationQueueMock));
+
+    autoReloadRunnable.run();
+  }
+
+  private Provider<ReplicationConfig> newVersionConfigProvider() {
+    return new Provider<ReplicationConfig>() {
+      @Override
+      public ReplicationConfig get() {
+        return new ReplicationFileBasedConfig(sitePaths, sitePaths.data_dir) {
+          @Override
+          public String getVersion() {
+            return String.format("%s", System.nanoTime());
+          }
+        };
+      }
+    };
+  }
+
+  private static class ReloadTrackerSubscriber {
+    public boolean reloaded = false;
+
+    @Subscribe
+    public void onReload(
+        @SuppressWarnings("unused") List<DestinationConfiguration> destinationConfigurations) {
+      reloaded = true;
+    }
+  }
+
+  private static class TestValidConfigurationListener implements ConfigParser {
+    @Override
+    public List<RemoteConfiguration> parseRemotes(Config newConfig) {
+      return Collections.emptyList();
+    }
+  }
+
+  private static class TestInvalidConfigurationListener implements ConfigParser {
+    @Override
+    public List<RemoteConfiguration> parseRemotes(Config configurationChangeEvent)
+        throws ConfigInvalidException {
+      throw new ConfigInvalidException("expected test failure");
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java b/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java
new file mode 100644
index 0000000..2f7059e
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java
@@ -0,0 +1,118 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class FakeExecutorService implements ScheduledExecutorService {
+  public Runnable refreshCommand = () -> {};
+
+  @Override
+  public void shutdown() {}
+
+  @Override
+  public List<Runnable> shutdownNow() {
+    return null;
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return false;
+  }
+
+  @Override
+  public boolean isTerminated() {
+    return false;
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+    return false;
+  }
+
+  @Override
+  public <T> Future<T> submit(Callable<T> task) {
+    return null;
+  }
+
+  @Override
+  public <T> Future<T> submit(Runnable task, T result) {
+    return null;
+  }
+
+  @Override
+  public Future<?> submit(Runnable task) {
+    return null;
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException {
+    return null;
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(
+      Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException {
+    return null;
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException, ExecutionException {
+    return null;
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    return null;
+  }
+
+  @Override
+  public void execute(Runnable command) {}
+
+  @Override
+  public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+    return null;
+  }
+
+  @Override
+  public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+    return null;
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleAtFixedRate(
+      Runnable command, long initialDelay, long period, TimeUnit unit) {
+    refreshCommand = command;
+    return null;
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleWithFixedDelay(
+      Runnable command, long initialDelay, long delay, TimeUnit unit) {
+    return null;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfigTest.java
new file mode 100644
index 0000000..8cba4bb
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfigTest.java
@@ -0,0 +1,286 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.io.MoreFiles;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import java.io.IOException;
+import java.util.List;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FanoutReplicationConfigTest extends AbstractConfigTest {
+
+  public FanoutReplicationConfigTest() throws IOException {
+    super();
+  }
+
+  String remoteName1 = "foo";
+  String remoteUrl1 = "ssh://git@git.somewhere.com/${name}";
+  String remoteName2 = "bar";
+  String remoteUrl2 = "ssh://git@git.elsewhere.com/${name}";
+
+  @Before
+  public void setupTests() {
+    FileBasedConfig config = newReplicationConfig();
+    try {
+      config.save();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void shouldSkipRemoteConfigFromReplicationConfig() throws Exception {
+    String remoteName = "foo";
+    String remoteUrl = "ssh://git@git.somewhere.com/${name}";
+
+    FileBasedConfig config = newReplicationConfig();
+    config.setString("remote", remoteName, "url", remoteUrl);
+    config.save();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+
+    assertThatIsDestination(destinations.get(0), remoteName2, remoteUrl2);
+  }
+
+  @Test
+  public void shouldLoadDestinationsFromMultipleFiles() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(2);
+
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
+  }
+
+  @Test
+  public void shouldIgnoreDestinationsFromSubdirectories() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config = newRemoteConfig("subdirectory/" + remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+
+    assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
+  }
+
+  @Test
+  public void shouldIgnoreNonConfigFiles() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config =
+        new FileBasedConfig(
+            sitePaths.etc_dir.resolve("replication/" + remoteName2 + ".yaml").toFile(),
+            FS.DETECTED);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(1);
+
+    assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
+  }
+
+  @Test(expected = ConfigInvalidException.class)
+  public void shouldThrowConfigInvalidExceptionWhenUrlIsMissingName() throws Exception {
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", "ssh://git@git.elsewhere.com/name");
+    config.save();
+
+    newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+  }
+
+  @Test
+  public void shouldIgnoreEmptyConfigFile() throws Exception {
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(0);
+  }
+
+  @Test
+  public void shouldIgnoreConfigWhenMoreThanOneRemoteInASingleFile() throws Exception {
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.setString("remote", remoteName2, "url", remoteUrl2);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(0);
+  }
+
+  @Test
+  public void shouldIgnoreConfigRemoteSection() throws Exception {
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("replication", null, "url", remoteUrl1);
+    config.save();
+
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(new FanoutReplicationConfig(sitePaths, pluginDataPath));
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+    assertThat(destinations).hasSize(0);
+  }
+
+  @Test
+  public void shouldReturnSameVersionWhenNoChanges() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    FanoutReplicationConfig objectUnderTest =
+        new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    String version = objectUnderTest.getVersion();
+
+    objectUnderTest = new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    assertThat(objectUnderTest.getVersion()).isEqualTo(version);
+  }
+
+  @Test
+  public void shouldReturnNewVersionWhenConfigFileAdded() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    FanoutReplicationConfig objectUnderTest =
+        new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    String version = objectUnderTest.getVersion();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    assertThat(objectUnderTest.getVersion()).isNotEqualTo(version);
+  }
+
+  @Test
+  public void shouldReturnNewVersionWhenConfigFileIsModified() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    FanoutReplicationConfig objectUnderTest =
+        new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    String version = objectUnderTest.getVersion();
+
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    assertThat(objectUnderTest.getVersion()).isNotEqualTo(version);
+  }
+
+  @Test
+  public void shouldReturnNewVersionWhenConfigFileRemoved() throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    FanoutReplicationConfig objectUnderTest =
+        new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    String version = objectUnderTest.getVersion();
+    assertThat(
+            sitePaths.etc_dir.resolve("replication/" + remoteName2 + ".config").toFile().delete())
+        .isTrue();
+
+    assertThat(objectUnderTest.getVersion()).isNotEqualTo(version);
+  }
+
+  @Test
+  public void shouldReturnReplicationConfigVersionWhenReplicationConfigDirectoryRemoved()
+      throws Exception {
+
+    FileBasedConfig config = newRemoteConfig(remoteName1);
+    config.setString("remote", null, "url", remoteUrl1);
+    config.save();
+
+    config = newRemoteConfig(remoteName2);
+    config.setString("remote", null, "url", remoteUrl2);
+    config.save();
+
+    FanoutReplicationConfig objectUnderTest =
+        new FanoutReplicationConfig(sitePaths, pluginDataPath);
+
+    String replicationConfigVersion =
+        new ReplicationFileBasedConfig(sitePaths, pluginDataPath).getVersion();
+
+    MoreFiles.deleteRecursively(sitePaths.etc_dir.resolve("replication"), ALLOW_INSECURE);
+
+    assertThat(objectUnderTest.getVersion()).isEqualTo(replicationConfigVersion);
+  }
+
+  protected FileBasedConfig newRemoteConfig(String configFileName) {
+    return new FileBasedConfig(
+        sitePaths.etc_dir.resolve("replication/" + configFileName + ".config").toFile(),
+        FS.DETECTED);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
index 0f6d629..2ee9a39 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
@@ -14,11 +14,10 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
-import static org.easymock.EasyMock.verify;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.permissions.PermissionBackendException;
@@ -36,14 +35,12 @@
 
   @Before
   public void setUp() throws Exception {
-    dispatcherMock = createMock(EventDispatcher.class);
-    replay(dispatcherMock);
+    dispatcherMock = mock(EventDispatcher.class);
     gitUpdateProcessing = new GitUpdateProcessing(dispatcherMock);
   }
 
   @Test
   public void headRefReplicated() throws URISyntaxException, PermissionBackendException {
-    reset(dispatcherMock);
     RefReplicatedEvent expectedEvent =
         new RefReplicatedEvent(
             "someProject",
@@ -51,9 +48,6 @@
             "someHost",
             RefPushResult.SUCCEEDED,
             RemoteRefUpdate.Status.OK);
-    dispatcherMock.postEvent(RefReplicatedEventEquals.eqEvent(expectedEvent));
-    expectLastCall().once();
-    replay(dispatcherMock);
 
     gitUpdateProcessing.onRefReplicatedToOneNode(
         "someProject",
@@ -61,12 +55,11 @@
         new URIish("git://someHost/someProject.git"),
         RefPushResult.SUCCEEDED,
         RemoteRefUpdate.Status.OK);
-    verify(dispatcherMock);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
   }
 
   @Test
   public void changeRefReplicated() throws URISyntaxException, PermissionBackendException {
-    reset(dispatcherMock);
     RefReplicatedEvent expectedEvent =
         new RefReplicatedEvent(
             "someProject",
@@ -74,9 +67,6 @@
             "someHost",
             RefPushResult.FAILED,
             RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD);
-    dispatcherMock.postEvent(RefReplicatedEventEquals.eqEvent(expectedEvent));
-    expectLastCall().once();
-    replay(dispatcherMock);
 
     gitUpdateProcessing.onRefReplicatedToOneNode(
         "someProject",
@@ -84,19 +74,15 @@
         new URIish("git://someHost/someProject.git"),
         RefPushResult.FAILED,
         RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD);
-    verify(dispatcherMock);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
   }
 
   @Test
   public void onAllNodesReplicated() throws PermissionBackendException {
-    reset(dispatcherMock);
     RefReplicationDoneEvent expectedDoneEvent =
         new RefReplicationDoneEvent("someProject", "refs/heads/master", 5);
-    dispatcherMock.postEvent(RefReplicationDoneEventEquals.eqEvent(expectedDoneEvent));
-    expectLastCall().once();
-    replay(dispatcherMock);
 
     gitUpdateProcessing.onRefReplicatedToAllNodes("someProject", "refs/heads/master", 5);
-    verify(dispatcherMock);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedDoneEvent));
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index c010ddb..c09fcd1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -14,19 +14,18 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import static com.googlesource.gerrit.plugins.replication.RemoteRefUpdateCollectionMatcher.eqRemoteRef;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.getCurrentArguments;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.eclipse.jgit.lib.Ref.Storage.NEW;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.metrics.Timer1;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.PerThreadRequestScope;
 import com.google.gerrit.server.permissions.PermissionBackend;
@@ -45,8 +44,6 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import junit.framework.AssertionFailedError;
-import org.easymock.IAnswer;
 import org.eclipse.jgit.errors.NotSupportedException;
 import org.eclipse.jgit.errors.TransportException;
 import org.eclipse.jgit.lib.Config;
@@ -68,6 +65,8 @@
 import org.eclipse.jgit.util.FS;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class PushOneTest {
   private static final int TEST_PUSH_TIMEOUT_SECS = 10;
@@ -86,7 +85,7 @@
   private IdGenerator idGeneratorMock;
   private ReplicationStateListeners replicationStateListenersMock;
   private ReplicationMetrics replicationMetricsMock;
-  private Timer1.Context timerContextMock;
+  private Timer1.Context<String> timerContextMock;
   private ProjectCache projectCacheMock;
   private TransportFactory transportFactoryMock;
   private Transport transportMock;
@@ -108,7 +107,7 @@
 
   @Before
   public void setup() throws Exception {
-    projectNameKey = new Project.NameKey("fooProject");
+    projectNameKey = Project.nameKey("fooProject");
     urish = new URIish("http://foo.com/fooProject.git");
 
     newLocalRef =
@@ -126,6 +125,7 @@
     setupMocks();
   }
 
+  @SuppressWarnings("unchecked")
   private void setupMocks() throws Exception {
     FileBasedConfig config = new FileBasedConfig(new Config(), new File("/foo"), FS.DETECTED);
     config.setString("remote", "Replication", "push", "foo");
@@ -134,8 +134,8 @@
     setupRepositoryMock(config);
     setupGitRepoManagerMock();
 
-    projectStateMock = createNiceMock(ProjectState.class);
-    forProjectMock = createNiceMock(ForProject.class);
+    projectStateMock = mock(ProjectState.class);
+    forProjectMock = mock(ForProject.class);
     setupWithUserMock();
     setupPermissionBackedMock();
 
@@ -144,46 +144,22 @@
     setupRefSpecMock();
     setupRemoteConfigMock();
 
-    credentialsFactory = createNiceMock(CredentialsFactory.class);
+    credentialsFactory = mock(CredentialsFactory.class);
 
     setupFetchConnectionMock();
     setupPushConnectionMock();
     setupRequestScopeMock();
-    idGeneratorMock = createNiceMock(IdGenerator.class);
-    replicationStateListenersMock = createNiceMock(ReplicationStateListeners.class);
+    idGeneratorMock = mock(IdGenerator.class);
+    replicationStateListenersMock = mock(ReplicationStateListeners.class);
 
-    timerContextMock = createNiceMock(Timer1.Context.class);
+    timerContextMock = mock(Timer1.Context.class);
     setupReplicationMetricsMock();
 
     setupTransportMock();
 
     setupProjectCacheMock();
 
-    replicationConfigMock = createNiceMock(ReplicationConfig.class);
-
-    replay(
-        gitRepositoryManagerMock,
-        refUpdateMock,
-        repositoryMock,
-        permissionBackendMock,
-        destinationMock,
-        remoteConfigMock,
-        credentialsFactory,
-        threadRequestScoperMock,
-        idGeneratorMock,
-        replicationStateListenersMock,
-        replicationMetricsMock,
-        projectCacheMock,
-        timerContextMock,
-        transportFactoryMock,
-        projectStateMock,
-        withUserMock,
-        forProjectMock,
-        fetchConnection,
-        pushConnection,
-        refSpecMock,
-        refDatabaseMock,
-        replicationConfigMock);
+    replicationConfigMock = mock(ReplicationConfig.class);
   }
 
   @Test
@@ -212,10 +188,7 @@
 
     PushResult pushResult = new PushResult();
 
-    expect(transportMock.push(anyObject(), eqRemoteRef(expectedUpdates)))
-        .andReturn(pushResult)
-        .once();
-    replay(transportMock);
+    when(transportMock.push(any(), eq(expectedUpdates))).thenReturn(pushResult);
 
     PushOne pushOne = createPushOne(replicationPushFilter);
 
@@ -223,8 +196,6 @@
     pushOne.run();
 
     isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
-
-    verify(transportMock);
   }
 
   @Test
@@ -241,12 +212,6 @@
               }
             });
 
-    // easymock way to check if method was never called
-    expect(transportMock.push(anyObject(), anyObject()))
-        .andThrow(new AssertionFailedError())
-        .anyTimes();
-    replay(transportMock);
-
     PushOne pushOne = createPushOne(replicationPushFilter);
 
     pushOne.addRef(PushOne.ALL_REFS);
@@ -254,7 +219,7 @@
 
     isCallFinished.await(10, TimeUnit.SECONDS);
 
-    verify(transportMock);
+    verify(transportMock, never()).push(any(), any());
   }
 
   private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
@@ -281,31 +246,31 @@
   }
 
   private void setupProjectCacheMock() throws IOException {
-    projectCacheMock = createNiceMock(ProjectCache.class);
-    expect(projectCacheMock.checkedGet(projectNameKey)).andReturn(projectStateMock);
+    projectCacheMock = mock(ProjectCache.class);
+    when(projectCacheMock.checkedGet(projectNameKey)).thenReturn(projectStateMock);
   }
 
   private void setupTransportMock() throws NotSupportedException, TransportException {
-    transportMock = createNiceMock(Transport.class);
-    expect(transportMock.openFetch()).andReturn(fetchConnection);
-    transportFactoryMock = createNiceMock(TransportFactory.class);
-    expect(transportFactoryMock.open(repositoryMock, urish)).andReturn(transportMock).anyTimes();
+    transportMock = mock(Transport.class);
+    when(transportMock.openFetch()).thenReturn(fetchConnection);
+    transportFactoryMock = mock(TransportFactory.class);
+    when(transportFactoryMock.open(repositoryMock, urish)).thenReturn(transportMock);
   }
 
   private void setupReplicationMetricsMock() {
-    replicationMetricsMock = createNiceMock(ReplicationMetrics.class);
-    expect(replicationMetricsMock.start(anyObject())).andReturn(timerContextMock);
+    replicationMetricsMock = mock(ReplicationMetrics.class);
+    when(replicationMetricsMock.start(any())).thenReturn(timerContextMock);
   }
 
   private void setupRequestScopeMock() {
-    threadRequestScoperMock = createNiceMock(PerThreadRequestScope.Scoper.class);
-    expect(threadRequestScoperMock.scope(anyObject()))
-        .andAnswer(
-            new IAnswer<Callable<Object>>() {
+    threadRequestScoperMock = mock(PerThreadRequestScope.Scoper.class);
+    when(threadRequestScoperMock.scope(any()))
+        .thenAnswer(
+            new Answer<Callable<Object>>() {
               @SuppressWarnings("unchecked")
               @Override
-              public Callable<Object> answer() throws Throwable {
-                Callable<Object> originalCall = (Callable<Object>) getCurrentArguments()[0];
+              public Callable<Object> answer(InvocationOnMock invocation) throws Throwable {
+                Callable<Object> originalCall = (Callable<Object>) invocation.getArguments()[0];
                 return new Callable<Object>() {
 
                   @Override
@@ -316,66 +281,64 @@
                   }
                 };
               }
-            })
-        .anyTimes();
+            });
   }
 
   private void setupPushConnectionMock() {
-    pushConnection = createNiceMock(PushConnection.class);
-    expect(pushConnection.getRefsMap()).andReturn(remoteRefs);
+    pushConnection = mock(PushConnection.class);
+    when(pushConnection.getRefsMap()).thenReturn(remoteRefs);
   }
 
   private void setupFetchConnectionMock() {
-    fetchConnection = createNiceMock(FetchConnection.class);
-    expect(fetchConnection.getRefsMap()).andReturn(remoteRefs);
+    fetchConnection = mock(FetchConnection.class);
+    when(fetchConnection.getRefsMap()).thenReturn(remoteRefs);
   }
 
   private void setupRemoteConfigMock() {
-    remoteConfigMock = createNiceMock(RemoteConfig.class);
-    expect(remoteConfigMock.getPushRefSpecs()).andReturn(ImmutableList.of(refSpecMock));
+    remoteConfigMock = mock(RemoteConfig.class);
+    when(remoteConfigMock.getPushRefSpecs()).thenReturn(ImmutableList.of(refSpecMock));
   }
 
   private void setupRefSpecMock() {
-    refSpecMock = createNiceMock(RefSpec.class);
-    expect(refSpecMock.matchSource(anyObject(String.class))).andReturn(true);
-    expect(refSpecMock.expandFromSource(anyObject(String.class))).andReturn(refSpecMock);
-    expect(refSpecMock.getDestination()).andReturn("fooProject").anyTimes();
-    expect(refSpecMock.isForceUpdate()).andReturn(false).anyTimes();
+    refSpecMock = mock(RefSpec.class);
+    when(refSpecMock.matchSource(any(String.class))).thenReturn(true);
+    when(refSpecMock.expandFromSource(any(String.class))).thenReturn(refSpecMock);
+    when(refSpecMock.getDestination()).thenReturn("fooProject");
+    when(refSpecMock.isForceUpdate()).thenReturn(false);
   }
 
   private void setupDestinationMock() {
-    destinationMock = createNiceMock(Destination.class);
-    expect(destinationMock.requestRunway(anyObject())).andReturn(RunwayStatus.allowed());
+    destinationMock = mock(Destination.class);
+    when(destinationMock.requestRunway(any())).thenReturn(RunwayStatus.allowed());
   }
 
   private void setupPermissionBackedMock() {
-    permissionBackendMock = createNiceMock(PermissionBackend.class);
-    expect(permissionBackendMock.currentUser()).andReturn(withUserMock);
+    permissionBackendMock = mock(PermissionBackend.class);
+    when(permissionBackendMock.currentUser()).thenReturn(withUserMock);
   }
 
   private void setupWithUserMock() {
-    withUserMock = createNiceMock(WithUser.class);
-    expect(withUserMock.project(projectNameKey)).andReturn(forProjectMock);
+    withUserMock = mock(WithUser.class);
+    when(withUserMock.project(projectNameKey)).thenReturn(forProjectMock);
   }
 
   private void setupGitRepoManagerMock() throws IOException {
-    gitRepositoryManagerMock = createNiceMock(GitRepositoryManager.class);
-    expect(gitRepositoryManagerMock.openRepository(projectNameKey)).andReturn(repositoryMock);
+    gitRepositoryManagerMock = mock(GitRepositoryManager.class);
+    when(gitRepositoryManagerMock.openRepository(projectNameKey)).thenReturn(repositoryMock);
   }
 
   private void setupRepositoryMock(FileBasedConfig config) throws IOException {
-    repositoryMock = createNiceMock(Repository.class);
-    refDatabaseMock = createNiceMock(RefDatabase.class);
-    expect(repositoryMock.getConfig()).andReturn(config).anyTimes();
-    expect(repositoryMock.getRefDatabase()).andReturn(refDatabaseMock);
-    expect(refDatabaseMock.getRefs()).andReturn(localRefs);
-    expect(repositoryMock.updateRef("fooProject")).andReturn(refUpdateMock);
+    repositoryMock = mock(Repository.class);
+    refDatabaseMock = mock(RefDatabase.class);
+    when(repositoryMock.getConfig()).thenReturn(config);
+    when(repositoryMock.getRefDatabase()).thenReturn(refDatabaseMock);
+    when(refDatabaseMock.getRefs()).thenReturn(localRefs);
+    when(repositoryMock.updateRef("fooProject")).thenReturn(refUpdateMock);
   }
 
   private void setupRefUpdateMock() {
-    refUpdateMock = createNiceMock(RefUpdate.class);
-    expect(refUpdateMock.getOldObjectId())
-        .andReturn(ObjectId.fromString("0000000000000000000000000000000000000001"))
-        .anyTimes();
+    refUpdateMock = mock(RefUpdate.class);
+    when(refUpdateMock.getOldObjectId())
+        .thenReturn(ObjectId.fromString("0000000000000000000000000000000000000001"));
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java
deleted file mode 100644
index 983e97f..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java
+++ /dev/null
@@ -1,83 +0,0 @@
-// Copyright (C) 2013 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-
-public class RefReplicatedEventEquals implements IArgumentMatcher {
-
-  private RefReplicatedEvent expected;
-
-  public RefReplicatedEventEquals(RefReplicatedEvent expected) {
-    this.expected = expected;
-  }
-
-  public static final RefReplicatedEvent eqEvent(RefReplicatedEvent refReplicatedEvent) {
-    EasyMock.reportMatcher(new RefReplicatedEventEquals(refReplicatedEvent));
-    return null;
-  }
-
-  @Override
-  public boolean matches(Object actual) {
-    if (!(actual instanceof RefReplicatedEvent)) {
-      return false;
-    }
-    RefReplicatedEvent actualRefReplicatedEvent = (RefReplicatedEvent) actual;
-    if (!equals(expected.project, actualRefReplicatedEvent.project)) {
-      return false;
-    }
-    if (!equals(expected.ref, actualRefReplicatedEvent.ref)) {
-      return false;
-    }
-    if (!equals(expected.targetNode, actualRefReplicatedEvent.targetNode)) {
-      return false;
-    }
-    if (!equals(expected.status, actualRefReplicatedEvent.status)) {
-      return false;
-    }
-    if (!equals(expected.refStatus, actualRefReplicatedEvent.refStatus)) {
-      return false;
-    }
-    return true;
-  }
-
-  private static boolean equals(Object object1, Object object2) {
-    if (object1 == object2) {
-      return true;
-    }
-    if (object1 != null && !object1.equals(object2)) {
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public void appendTo(StringBuffer buffer) {
-    buffer.append("eqEvent(");
-    buffer.append(expected.getClass().getName());
-    buffer.append(" with project \"");
-    buffer.append(expected.project);
-    buffer.append("\" and ref \"");
-    buffer.append(expected.ref);
-    buffer.append("\" and targetNode \"");
-    buffer.append(expected.targetNode);
-    buffer.append("\" and status \"");
-    buffer.append(expected.status);
-    buffer.append("\" and refStatus \"");
-    buffer.append(expected.refStatus);
-    buffer.append("\")");
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java
deleted file mode 100644
index d1284e1..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright (C) 2013 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-
-public class RefReplicationDoneEventEquals implements IArgumentMatcher {
-
-  private RefReplicationDoneEvent expected;
-
-  public RefReplicationDoneEventEquals(RefReplicationDoneEvent expected) {
-    this.expected = expected;
-  }
-
-  public static final RefReplicationDoneEvent eqEvent(RefReplicationDoneEvent refReplicatedEvent) {
-    EasyMock.reportMatcher(new RefReplicationDoneEventEquals(refReplicatedEvent));
-    return null;
-  }
-
-  @Override
-  public boolean matches(Object actual) {
-    if (!(actual instanceof RefReplicationDoneEvent)) {
-      return false;
-    }
-    RefReplicationDoneEvent actualRefReplicatedDoneEvent = (RefReplicationDoneEvent) actual;
-    if (!equals(expected.project, actualRefReplicatedDoneEvent.project)) {
-      return false;
-    }
-    if (!equals(expected.ref, actualRefReplicatedDoneEvent.ref)) {
-      return false;
-    }
-    if (expected.nodesCount != actualRefReplicatedDoneEvent.nodesCount) {
-      return false;
-    }
-    return true;
-  }
-
-  private static boolean equals(Object object1, Object object2) {
-    if (object1 == object2) {
-      return true;
-    }
-    if (object1 != null && !object1.equals(object2)) {
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public void appendTo(StringBuffer buffer) {
-    buffer.append("eqEvent(");
-    buffer.append(expected.getClass().getName());
-    buffer.append(" with project \"");
-    buffer.append(expected.project);
-    buffer.append("\" and ref \"");
-    buffer.append(expected.ref);
-    buffer.append("\" and nodesCount \"");
-    buffer.append(expected.nodesCount);
-    buffer.append("\")");
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
deleted file mode 100644
index 111a792..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import java.util.Collection;
-import java.util.Objects;
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-import org.eclipse.jgit.transport.RemoteRefUpdate;
-
-public class RemoteRefUpdateCollectionMatcher implements IArgumentMatcher {
-  Collection<RemoteRefUpdate> expectedRemoteRefs;
-
-  public static Collection<RemoteRefUpdate> eqRemoteRef(
-      Collection<RemoteRefUpdate> expectedRemoteRefs) {
-    EasyMock.reportMatcher(new RemoteRefUpdateCollectionMatcher(expectedRemoteRefs));
-    return null;
-  }
-
-  public RemoteRefUpdateCollectionMatcher(Collection<RemoteRefUpdate> expectedRemoteRefs) {
-    this.expectedRemoteRefs = expectedRemoteRefs;
-  }
-
-  @Override
-  public boolean matches(Object argument) {
-    if (!(argument instanceof Collection)) return false;
-
-    @SuppressWarnings("unchecked")
-    Collection<RemoteRefUpdate> refs = (Collection<RemoteRefUpdate>) argument;
-
-    if (expectedRemoteRefs.size() != refs.size()) return false;
-    return refs.stream()
-        .allMatch(
-            ref -> expectedRemoteRefs.stream().anyMatch(expectedRef -> compare(ref, expectedRef)));
-  }
-
-  @Override
-  public void appendTo(StringBuffer buffer) {
-    buffer.append("expected:" + expectedRemoteRefs.toString());
-  }
-
-  private boolean compare(RemoteRefUpdate ref, RemoteRefUpdate expectedRef) {
-    return Objects.equals(ref.getRemoteName(), expectedRef.getRemoteName())
-        && Objects.equals(ref.getStatus(), expectedRef.getStatus())
-        && Objects.equals(ref.getExpectedOldObjectId(), expectedRef.getExpectedOldObjectId())
-        && Objects.equals(ref.getNewObjectId(), expectedRef.getNewObjectId())
-        && Objects.equals(ref.isFastForward(), expectedRef.isFastForward())
-        && Objects.equals(ref.getSrcRef(), expectedRef.getSrcRef())
-        && Objects.equals(ref.isForceUpdate(), expectedRef.isForceUpdate())
-        && Objects.equals(ref.getMessage(), expectedRef.getMessage());
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
new file mode 100644
index 0000000..adb7fc8
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -0,0 +1,266 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.common.io.MoreFiles;
+import com.google.common.io.RecursiveDeleteOption;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.google.inject.Key;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.junit.After;
+import org.junit.Test;
+
+@UseLocalDisk
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationFanoutIT extends LightweightPluginDaemonTest {
+  private static final Optional<String> ALL_PROJECTS = Optional.empty();
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int TEST_REPLICATION_DELAY = 1;
+  private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2);
+
+  @Inject private SitePaths sitePaths;
+  @Inject private ProjectOperations projectOperations;
+  private Path pluginDataDir;
+  private Path gitPath;
+  private Path storagePath;
+  private FileBasedConfig config;
+  private ReplicationTasksStorage tasksStorage;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    gitPath = sitePaths.site_path.resolve("git");
+
+    config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    setAutoReload();
+    config.save();
+
+    setReplicationDestination("remote1", "suffix1", Optional.of("not-used-project"));
+
+    super.setUpTestPlugin();
+
+    pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
+    storagePath = pluginDataDir.resolve("ref-updates");
+    tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+    cleanupReplicationTasks();
+    tasksStorage.disableDeleteForTesting(true);
+  }
+
+  @After
+  public void cleanUp() throws IOException {
+    if (Files.exists(sitePaths.etc_dir.resolve("replication"))) {
+      MoreFiles.deleteRecursively(
+          sitePaths.etc_dir.resolve("replication"), RecursiveDeleteOption.ALLOW_INSECURE);
+    }
+  }
+
+  @Test
+  public void shouldReplicateNewBranch() throws Exception {
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Project.NameKey targetProject = createTestProject(project + "replica");
+    String newBranch = "refs/heads/mybranch";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(project.get()).branch(newBranch).create(input);
+
+    assertThat(listReplicationTasks("refs/heads/(mybranch|master)")).hasSize(2);
+
+    try (Repository repo = repoManager.openRepository(targetProject);
+        Repository sourceRepo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, newBranch) != null);
+
+      Ref masterRef = getRef(sourceRepo, master);
+      Ref targetBranchRef = getRef(repo, newBranch);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(masterRef.getObjectId());
+    }
+  }
+
+  @Test
+  public void shouldReplicateNewBranchToTwoRemotes() throws Exception {
+    Project.NameKey targetProject1 = createTestProject(project + "replica1");
+    Project.NameKey targetProject2 = createTestProject(project + "replica2");
+
+    setReplicationDestination("foo1", "replica1", ALL_PROJECTS);
+    setReplicationDestination("foo2", "replica2", ALL_PROJECTS);
+    reloadConfig();
+
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().refName();
+
+    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
+
+    try (Repository repo1 = repoManager.openRepository(targetProject1);
+        Repository repo2 = repoManager.openRepository(targetProject2)) {
+      waitUntil(
+          () ->
+              (checkedGetRef(repo1, sourceRef) != null && checkedGetRef(repo2, sourceRef) != null));
+
+      Ref targetBranchRef1 = getRef(repo1, sourceRef);
+      assertThat(targetBranchRef1).isNotNull();
+      assertThat(targetBranchRef1.getObjectId()).isEqualTo(sourceCommit.getId());
+
+      Ref targetBranchRef2 = getRef(repo2, sourceRef);
+      assertThat(targetBranchRef2).isNotNull();
+      assertThat(targetBranchRef2.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  @Test
+  public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
+    List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+    createTestProject(project + "replica1");
+    createTestProject(project + "replica2");
+
+    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+    config.setInt("remote", "foo1", "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    config.setInt("remote", "foo2", "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    reloadConfig();
+
+    createChange();
+
+    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+
+    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+  }
+
+  private Ref getRef(Repository repo, String branchName) throws IOException {
+    return repo.getRefDatabase().exactRef(branchName);
+  }
+
+  private Ref checkedGetRef(Repository repo, String branchName) {
+    try {
+      return repo.getRefDatabase().exactRef(branchName);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
+      return null;
+    }
+  }
+
+  private void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
+    setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project);
+  }
+
+  private void setReplicationDestination(
+      String remoteName, List<String> replicaSuffixes, Optional<String> allProjects)
+      throws IOException {
+    FileBasedConfig remoteConfig =
+        new FileBasedConfig(
+            sitePaths.etc_dir.resolve("replication/" + remoteName + ".config").toFile(),
+            FS.DETECTED);
+
+    setReplicationDestination(remoteConfig, replicaSuffixes, allProjects);
+  }
+
+  private void setAutoReload() throws IOException {
+    config.setBoolean("gerrit", null, "autoReload", true);
+    config.save();
+  }
+
+  private void setReplicationDestination(
+      FileBasedConfig config, List<String> replicaSuffixes, Optional<String> project)
+      throws IOException {
+
+    List<String> replicaUrls =
+        replicaSuffixes.stream()
+            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+            .collect(toList());
+    config.setStringList("remote", null, "url", replicaUrls);
+    config.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY);
+    project.ifPresent(prj -> config.setString("remote", null, "projects", prj));
+
+    config.save();
+  }
+
+  private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
+    WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
+  }
+
+  private void reloadConfig() {
+    getAutoReloadConfigDecoratorInstance().reload();
+  }
+
+  private AutoReloadConfigDecorator getAutoReloadConfigDecoratorInstance() {
+    return getInstance(AutoReloadConfigDecorator.class);
+  }
+
+  private <T> T getInstance(Class<T> classObj) {
+    return plugin.getSysInjector().getInstance(classObj);
+  }
+
+  private Project.NameKey createTestProject(String name) throws Exception {
+    return projectOperations.newProject().name(name).create();
+  }
+
+  private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
+    Pattern refmaskPattern = Pattern.compile(refRegex);
+    return tasksStorage.list().stream()
+        .filter(task -> refmaskPattern.matcher(task.ref).matches())
+        .collect(toList());
+  }
+
+  public void cleanupReplicationTasks() throws IOException {
+    cleanupReplicationTasks(storagePath);
+  }
+
+  private void cleanupReplicationTasks(Path basePath) throws IOException {
+    try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) {
+      for (Path path : files) {
+        if (Files.isDirectory(path)) {
+          cleanupReplicationTasks(path);
+        } else {
+          path.toFile().delete();
+        }
+      }
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
index 36cc209..efacae7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
@@ -19,7 +19,6 @@
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import java.io.IOException;
 import java.util.List;
-import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.junit.Test;
 
@@ -37,8 +36,10 @@
     config.setString("remote", remoteName, "url", remoteUrl);
     config.save();
 
-    ReplicationFileBasedConfig replicationConfig = newReplicationFileBasedConfig();
-    List<Destination> destinations = replicationConfig.getDestinations(FilterType.ALL);
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(newReplicationFileBasedConfig());
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(1);
 
     assertThatIsDestination(destinations.get(0), remoteName, remoteUrl);
@@ -55,18 +56,13 @@
     config.setString("remote", remoteName2, "url", remoteUrl2);
     config.save();
 
-    ReplicationFileBasedConfig replicationConfig = newReplicationFileBasedConfig();
-    List<Destination> destinations = replicationConfig.getDestinations(FilterType.ALL);
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(newReplicationFileBasedConfig());
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(2);
 
-    assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
-    assertThatIsDestination(destinations.get(1), remoteName2, remoteUrl2);
-  }
-
-  private ReplicationFileBasedConfig newReplicationFileBasedConfig()
-      throws ConfigInvalidException, IOException {
-    ReplicationFileBasedConfig replicationConfig =
-        new ReplicationFileBasedConfig(sitePaths, destinationFactoryMock, pluginDataPath);
-    return replicationConfig;
+    assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
+    assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 0074075..31cd75d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -25,13 +25,13 @@
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.extensions.api.changes.NotifyHandling;
 import com.google.gerrit.extensions.api.projects.BranchInput;
 import com.google.gerrit.extensions.common.ProjectInfo;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import com.google.inject.Key;
@@ -106,7 +106,7 @@
 
     assertThat(listReplicationTasks("refs/meta/config")).hasSize(1);
 
-    waitUntil(() -> nonEmptyProjectExists(new Project.NameKey(sourceProject + "replica")));
+    waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")));
 
     ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get();
     assertThat(replicaProject).isNotNull();
@@ -149,7 +149,7 @@
 
     Result pushResult = createChange();
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1);
 
@@ -198,7 +198,7 @@
 
     Result pushResult = createChange();
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
 
@@ -329,6 +329,8 @@
   }
 
   private void replicateBranchDeletion(boolean mirror) throws Exception {
+    tasksStorage.disableDeleteForTesting(false);
+
     setReplicationDestination("foo", "replica", ALL_PROJECTS, mirror);
     reloadConfig();
 
@@ -371,10 +373,10 @@
     setReplicationDestination(remoteName, "replica", ALL_PROJECTS);
 
     Result pushResult = createChange();
-    shutdownConfig();
+    shutdownDestinations();
 
     pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     assertThrows(
         InterruptedException.class,
@@ -397,10 +399,10 @@
     reloadConfig();
 
     Result pushResult = createChange();
-    shutdownConfig();
+    shutdownDestinations();
 
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -422,7 +424,7 @@
     replicationQueueStart();
 
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -433,6 +435,20 @@
   }
 
   @Test
+  public void shouldCleanupTasksAfterNewProjectReplication() throws Exception {
+    tasksStorage.disableDeleteForTesting(false);
+    setReplicationDestination("task_cleanup_project", "replica", ALL_PROJECTS);
+    config.setInt("remote", "task_cleanup_project", "replicationRetry", 0);
+    config.save();
+    reloadConfig();
+    assertThat(tasksStorage.listRunning()).hasSize(0);
+    Project.NameKey sourceProject = createTestProject("task_cleanup_project");
+
+    waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")));
+    waitUntil(() -> tasksStorage.listRunning().size() == 0);
+  }
+
+  @Test
   public void shouldFirePendingOnlyToStoredUri() throws Exception {
     String suffix1 = "replica1";
     String suffix2 = "replica2";
@@ -444,7 +460,7 @@
     setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE, false);
     reloadConfig();
 
-    String changeRef = createChange().getPatchSet().getRefName();
+    String changeRef = createChange().getPatchSet().refName();
 
     tasksStorage.disableDeleteForTesting(false);
     changeReplicationTasksForRemote(changeRef, remote1).forEach(tasksStorage::delete);
@@ -531,6 +547,7 @@
     config.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY);
     config.setBoolean("remote", remoteName, "mirror", mirror);
     project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+    config.setBoolean("gerrit", null, "autoReload", true);
     config.save();
   }
 
@@ -545,11 +562,11 @@
   }
 
   private void reloadConfig() {
-    getAutoReloadConfigDecoratorInstance().forceReload();
+    getAutoReloadConfigDecoratorInstance().reload();
   }
 
-  private void shutdownConfig() {
-    getAutoReloadConfigDecoratorInstance().shutdown();
+  private void shutdownDestinations() {
+    getInstance(DestinationsCollection.class).shutdown();
   }
 
   private void replicationQueueStart() {
@@ -590,10 +607,18 @@
         .collect(toList());
   }
 
-  private void cleanupReplicationTasks() throws IOException {
-    try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) {
+  public void cleanupReplicationTasks() throws IOException {
+    cleanupReplicationTasks(storagePath);
+  }
+
+  private void cleanupReplicationTasks(Path basePath) throws IOException {
+    try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) {
       for (Path path : files) {
-        path.toFile().delete();
+        if (Files.isDirectory(path)) {
+          cleanupReplicationTasks(path);
+        } else {
+          path.toFile().delete();
+        }
       }
     }
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
index 193af1e..e0de577 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
@@ -15,11 +15,9 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import static com.google.common.truth.Truth.assertThat;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.resetToDefault;
-import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
 import java.net.URISyntaxException;
@@ -35,8 +33,7 @@
 
   @Before
   public void setUp() throws Exception {
-    pushResultProcessingMock = createNiceMock(PushResultProcessing.class);
-    replay(pushResultProcessingMock);
+    pushResultProcessingMock = mock(PushResultProcessing.class);
     replicationState = new ReplicationState(pushResultProcessingMock);
   }
 
@@ -53,52 +50,36 @@
 
   @Test
   public void shouldFireOneReplicationEventWhenNothingToReplicate() {
-    resetToDefault(pushResultProcessingMock);
-
-    // expected event
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(0);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.markAllPushTasksScheduled();
-    verify(pushResultProcessingMock);
+
+    // expected event
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(0);
   }
 
   @Test
   public void shouldFireEventsForReplicationOfOneRefToOneNode() throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri = new URIish("git://someHost/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "someRef", 1);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(1);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "someRef");
     replicationState.markAllPushTasksScheduled();
     replicationState.notifyRefReplicated(
         "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "someRef", 1);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(1);
   }
 
   @Test
   public void shouldFireEventsForReplicationOfOneRefToMultipleNodes() throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri1 = new URIish("git://someHost1/someRepo.git");
     URIish uri2 = new URIish("git://someHost2/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "someRef", uri2, RefPushResult.FAILED, RemoteRefUpdate.Status.NON_EXISTING);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "someRef", 2);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "someRef");
     replicationState.increasePushTaskCount("someProject", "someRef");
@@ -107,33 +88,29 @@
         "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.notifyRefReplicated(
         "someProject", "someRef", uri2, RefPushResult.FAILED, RemoteRefUpdate.Status.NON_EXISTING);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject",
+            "someRef",
+            uri2,
+            RefPushResult.FAILED,
+            RemoteRefUpdate.Status.NON_EXISTING);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "someRef", 2);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2);
   }
 
   @Test
   public void shouldFireEventsForReplicationOfMultipleRefsToMultipleNodes()
       throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri1 = new URIish("git://host1/someRepo.git");
     URIish uri2 = new URIish("git://host2/someRepo.git");
     URIish uri3 = new URIish("git://host3/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri3, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref1", 3);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref2", 2);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(5);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "ref1");
     replicationState.increasePushTaskCount("someProject", "ref1");
@@ -151,24 +128,32 @@
         "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.notifyRefReplicated(
         "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri3, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref1", 3);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref2", 2);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(5);
   }
 
   @Test
   public void shouldFireEventsForReplicationSameRefDifferentProjects() throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri = new URIish("git://host1/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("project1", "ref1", 1);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("project2", "ref2", 1);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("project1", "ref1");
     replicationState.increasePushTaskCount("project2", "ref2");
@@ -177,25 +162,24 @@
         "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.notifyRefReplicated(
         "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("project1", "ref1", 1);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("project2", "ref2", 1);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2);
   }
 
   @Test
   public void shouldFireEventsWhenSomeReplicationCompleteBeforeAllTasksAreScheduled()
       throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri1 = new URIish("git://host1/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref1", 1);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref2", 1);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "ref1");
     replicationState.increasePushTaskCount("someProject", "ref2");
@@ -204,7 +188,17 @@
     replicationState.notifyRefReplicated(
         "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.markAllPushTasksScheduled();
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref1", 1);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref2", 1);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2);
   }
 
   @Test