Merge branch 'stable-3.0' into stable-3.1

* stable-3.0:
  ReplicationQueue: Check nulls in firePendingEvents
  Log stack trace when an error occur while deleting event
  Append LF to the json string of persisted replication event
  Change default for the replicateOnStartup to false
  Don't lose ref-updated events on plugin restart

Change-Id: I505f3769fe89720533e93c197ffe791eda82e848
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
index acbf763..f3d2103 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 
 public interface AdminApi {
   public boolean createProject(Project.NameKey project, String head);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 945f869..4c07f40 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -14,127 +14,45 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Multimap;
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.common.FileUtil;
-import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
 import com.google.gerrit.extensions.annotations.PluginName;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
-import com.google.inject.Provider;
 import com.google.inject.Singleton;
-import java.io.IOException;
 import java.nio.file.Path;
-import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import org.eclipse.jgit.errors.ConfigInvalidException;
-import org.eclipse.jgit.transport.URIish;
 
 @Singleton
-public class AutoReloadConfigDecorator implements ReplicationConfig {
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+public class AutoReloadConfigDecorator implements ReplicationConfig, LifecycleListener {
   private static final long RELOAD_DELAY = 120;
   private static final long RELOAD_INTERVAL = 60;
 
   private volatile ReplicationFileBasedConfig currentConfig;
-  private long currentConfigTs;
-  private long lastFailedConfigTs;
 
-  private final SitePaths site;
-  private final Destination.Factory destinationFactory;
-  private final Path pluginDataDir;
-  // Use Provider<> instead of injecting the ReplicationQueue because of circular dependency with
-  // ReplicationConfig
-  private final Provider<ReplicationQueue> replicationQueue;
   private final ScheduledExecutorService autoReloadExecutor;
   private ScheduledFuture<?> autoReloadRunnable;
-
-  private volatile boolean shuttingDown;
+  private final AutoReloadRunnable reloadRunner;
 
   @Inject
   public AutoReloadConfigDecorator(
-      SitePaths site,
-      Destination.Factory destinationFactory,
-      Provider<ReplicationQueue> replicationQueue,
-      @PluginData Path pluginDataDir,
       @PluginName String pluginName,
-      WorkQueue workQueue)
-      throws ConfigInvalidException, IOException {
-    this.site = site;
-    this.destinationFactory = destinationFactory;
-    this.pluginDataDir = pluginDataDir;
-    this.currentConfig = loadConfig();
-    this.currentConfigTs = getLastModified(currentConfig);
-    this.replicationQueue = replicationQueue;
+      WorkQueue workQueue,
+      ReplicationFileBasedConfig replicationConfig,
+      AutoReloadRunnable reloadRunner,
+      EventBus eventBus) {
+    this.currentConfig = replicationConfig;
     this.autoReloadExecutor = workQueue.createQueue(1, pluginName + "_auto-reload-config");
-  }
-
-  private static long getLastModified(ReplicationFileBasedConfig cfg) {
-    return FileUtil.lastModified(cfg.getCfgPath());
-  }
-
-  private ReplicationFileBasedConfig loadConfig() throws ConfigInvalidException, IOException {
-    return new ReplicationFileBasedConfig(site, destinationFactory, pluginDataDir);
-  }
-
-  private synchronized boolean isAutoReload() {
-    return currentConfig.getConfig().getBoolean("gerrit", "autoReload", false);
-  }
-
-  @Override
-  public synchronized List<Destination> getDestinations(FilterType filterType) {
-    return currentConfig.getDestinations(filterType);
-  }
-
-  @Override
-  public synchronized Multimap<Destination, URIish> getURIs(
-      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
-    return currentConfig.getURIs(remoteName, projectName, filterType);
-  }
-
-  private synchronized void reloadIfNeeded() {
-    reload(false);
+    this.reloadRunner = reloadRunner;
+    eventBus.register(this);
   }
 
   @VisibleForTesting
-  public void forceReload() {
-    reload(true);
-  }
-
-  private void reload(boolean force) {
-    if (force || isAutoReload()) {
-      ReplicationQueue queue = replicationQueue.get();
-
-      long lastModified = getLastModified(currentConfig);
-      try {
-        if (force
-            || (!shuttingDown
-                && lastModified > currentConfigTs
-                && lastModified > lastFailedConfigTs
-                && queue.isRunning()
-                && !queue.isReplaying())) {
-          queue.stop();
-          currentConfig = loadConfig();
-          currentConfigTs = lastModified;
-          lastFailedConfigTs = 0;
-          logger.atInfo().log(
-              "Configuration reloaded: %d destinations",
-              currentConfig.getDestinations(FilterType.ALL).size());
-        }
-      } catch (Exception e) {
-        logger.atSevere().withCause(e).log(
-            "Cannot reload replication configuration: keeping existing settings");
-        lastFailedConfigTs = lastModified;
-        return;
-      } finally {
-        queue.start();
-      }
-    }
+  public void reload() {
+    reloadRunner.reload();
   }
 
   @Override
@@ -153,45 +71,36 @@
   }
 
   @Override
-  public synchronized boolean isEmpty() {
-    return currentConfig.isEmpty();
-  }
-
-  @Override
   public Path getEventsDirectory() {
     return currentConfig.getEventsDirectory();
   }
 
-  /* shutdown() cannot be set as a synchronized method because
-   * it may need to wait for pending events to complete;
-   * e.g. when enabling the drain of replication events before
-   * shutdown.
-   *
-   * As a rule of thumb for synchronized methods, because they
-   * implicitly define a critical section and associated lock,
-   * they should never hold waiting for another resource, otherwise
-   * the risk of deadlock is very high.
-   *
-   * See more background about deadlocks, what they are and how to
-   * prevent them at: https://en.wikipedia.org/wiki/Deadlock
-   */
   @Override
-  public int shutdown() {
-    this.shuttingDown = true;
-    if (autoReloadRunnable != null) {
-      autoReloadRunnable.cancel(false);
-      autoReloadRunnable = null;
-    }
-    return currentConfig.shutdown();
+  public synchronized void start() {
+    autoReloadRunnable =
+        autoReloadExecutor.scheduleAtFixedRate(
+            reloadRunner, RELOAD_DELAY, RELOAD_INTERVAL, TimeUnit.SECONDS);
   }
 
   @Override
-  public synchronized void startup(WorkQueue workQueue) {
-    shuttingDown = false;
-    currentConfig.startup(workQueue);
-    autoReloadRunnable =
-        autoReloadExecutor.scheduleAtFixedRate(
-            this::reloadIfNeeded, RELOAD_DELAY, RELOAD_INTERVAL, TimeUnit.SECONDS);
+  public synchronized void stop() {
+    if (autoReloadRunnable != null) {
+      if (!autoReloadRunnable.cancel(true)) {
+        throw new IllegalStateException(
+            "Unable to cancel replication reload task: cannot guarantee orderly shutdown");
+      }
+      autoReloadRunnable = null;
+    }
+  }
+
+  @Override
+  public String getVersion() {
+    return currentConfig.getVersion();
+  }
+
+  @Subscribe
+  public void onReload(ReplicationFileBasedConfig newConfig) {
+    currentConfig = newConfig;
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java
new file mode 100644
index 0000000..a1084a8
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java
@@ -0,0 +1,87 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import java.nio.file.Path;
+import java.util.List;
+
+public class AutoReloadRunnable implements Runnable {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final SitePaths site;
+  private final Path pluginDataDir;
+  private final EventBus eventBus;
+  private final Provider<ObservableQueue> queueObserverProvider;
+  private final ReplicationConfigValidator configValidator;
+
+  private ReplicationFileBasedConfig loadedConfig;
+  private String loadedConfigVersion;
+  private String lastFailedConfigVersion;
+
+  @Inject
+  public AutoReloadRunnable(
+      ReplicationConfigValidator configValidator,
+      ReplicationFileBasedConfig config,
+      SitePaths site,
+      @PluginData Path pluginDataDir,
+      EventBus eventBus,
+      Provider<ObservableQueue> queueObserverProvider) {
+    this.loadedConfig = config;
+    this.loadedConfigVersion = config.getVersion();
+    this.lastFailedConfigVersion = "";
+    this.site = site;
+    this.pluginDataDir = pluginDataDir;
+    this.eventBus = eventBus;
+    this.queueObserverProvider = queueObserverProvider;
+    this.configValidator = configValidator;
+  }
+
+  @Override
+  public synchronized void run() {
+    String pendingConfigVersion = loadedConfig.getVersion();
+    ObservableQueue queue = queueObserverProvider.get();
+    if (pendingConfigVersion.equals(loadedConfigVersion)
+        || pendingConfigVersion.equals(lastFailedConfigVersion)
+        || !queue.isRunning()
+        || queue.isReplaying()) {
+      return;
+    }
+
+    reload();
+  }
+
+  synchronized void reload() {
+    String pendingConfigVersion = loadedConfig.getVersion();
+    try {
+      ReplicationFileBasedConfig newConfig = new ReplicationFileBasedConfig(site, pluginDataDir);
+      final List<RemoteConfiguration> newValidDestinations =
+          configValidator.validateConfig(newConfig);
+      loadedConfig = newConfig;
+      loadedConfigVersion = newConfig.getVersion();
+      lastFailedConfigVersion = "";
+      eventBus.post(newValidDestinations);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log(
+          "Cannot reload replication configuration: keeping existing settings");
+      lastFailedConfigVersion = pendingConfigVersion;
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
index a8dede3..fa26e82 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
@@ -16,8 +16,8 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
@@ -31,7 +31,7 @@
   }
 
   private final RemoteConfig config;
-  private final ReplicationConfig replicationConfig;
+  private final DestinationsCollection destinations;
   private final DynamicItem<AdminApiFactory> adminApiFactory;
   private final Project.NameKey project;
   private final String head;
@@ -39,21 +39,20 @@
   @Inject
   CreateProjectTask(
       RemoteConfig config,
-      ReplicationConfig replicationConfig,
+      DestinationsCollection destinations,
       DynamicItem<AdminApiFactory> adminApiFactory,
       @Assisted Project.NameKey project,
       @Assisted String head) {
     this.config = config;
-    this.replicationConfig = replicationConfig;
+    this.destinations = destinations;
     this.adminApiFactory = adminApiFactory;
     this.project = project;
     this.head = head;
   }
 
   public boolean create() {
-    return replicationConfig
-        .getURIs(Optional.of(config.getName()), project, FilterType.PROJECT_CREATION).values()
-        .stream()
+    return destinations.getURIs(Optional.of(config.getName()), project, FilterType.PROJECT_CREATION)
+        .values().stream()
         .map(u -> createProject(u, project, head))
         .reduce(true, (a, b) -> a && b);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
index f9b2ad7..4617672 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
@@ -16,8 +16,8 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.ioutil.HexFormat;
 import com.google.gerrit.server.util.IdGenerator;
 import com.google.inject.Inject;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 679776f..92b07ed 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -26,13 +26,13 @@
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Lists;
 import com.google.gerrit.common.data.GroupReference;
+import com.google.gerrit.entities.AccountGroup;
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.extensions.config.FactoryModule;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
-import com.google.gerrit.reviewdb.client.AccountGroup;
-import com.google.gerrit.reviewdb.client.Branch;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.reviewdb.client.RefNames;
 import com.google.gerrit.server.CurrentUser;
 import com.google.gerrit.server.PluginUser;
 import com.google.gerrit.server.account.GroupBackend;
@@ -60,7 +60,6 @@
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.servlet.RequestScoped;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
-import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
@@ -522,6 +521,7 @@
         pending.put(uri, pushOp);
         switch (reason) {
           case COLLISION:
+            replicationTasksStorage.get().reset(pushOp);
             @SuppressWarnings("unused")
             ScheduledFuture<?> ignored =
                 pool.schedule(pushOp, config.getRescheduleDelay(), TimeUnit.SECONDS);
@@ -536,6 +536,7 @@
             postReplicationFailedEvent(pushOp, status);
             if (pushOp.setToRetry()) {
               postReplicationScheduledEvent(pushOp);
+              replicationTasksStorage.get().reset(pushOp);
               @SuppressWarnings("unused")
               ScheduledFuture<?> ignored2 =
                   pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES);
@@ -562,36 +563,19 @@
       if (inFlightOp != null) {
         return RunwayStatus.denied(inFlightOp.getId());
       }
+      replicationTasksStorage.get().start(op);
       inFlight.put(op.getURI(), op);
     }
     return RunwayStatus.allowed();
   }
 
-  void notifyFinished(PushOne task) {
+  void notifyFinished(PushOne op) {
     synchronized (stateLock) {
-      inFlight.remove(task.getURI());
-      if (!task.wasCanceled()) {
-        for (String ref : task.getRefs()) {
-          if (!refHasPendingPush(task.getURI(), ref)) {
-            replicationTasksStorage
-                .get()
-                .delete(
-                    new ReplicateRefUpdate(
-                        task.getProjectNameKey().get(), ref, task.getURI(), getRemoteConfigName()));
-          }
-        }
-      }
+      replicationTasksStorage.get().finish(op);
+      inFlight.remove(op.getURI());
     }
   }
 
-  private boolean refHasPendingPush(URIish opUri, String ref) {
-    return pushContainsRef(pending.get(opUri), ref) || pushContainsRef(inFlight.get(opUri), ref);
-  }
-
-  private boolean pushContainsRef(PushOne op, String ref) {
-    return op != null && op.getRefs().contains(ref);
-  }
-
   boolean wouldPushProject(Project.NameKey project) {
     if (!shouldReplicate(project)) {
       return false;
@@ -607,27 +591,16 @@
   }
 
   boolean isSingleProjectMatch() {
-    List<String> projects = config.getProjects();
-    boolean ret = (projects.size() == 1);
-    if (ret) {
-      String projectMatch = projects.get(0);
-      if (ReplicationFilter.getPatternType(projectMatch)
-          != ReplicationFilter.PatternType.EXACT_MATCH) {
-        // projectMatch is either regular expression, or wild-card.
-        //
-        // Even though they might refer to a single project now, they need not
-        // after new projects have been created. Hence, we do not treat them as
-        // matching a single project.
-        ret = false;
-      }
-    }
-    return ret;
+    return config.isSingleProjectMatch();
   }
 
   boolean wouldPushRef(String ref) {
     if (!config.replicatePermissions() && RefNames.REFS_CONFIG.equals(ref)) {
       return false;
     }
+    if (PushOne.ALL_REFS.equals(ref)) {
+      return true;
+    }
     for (RefSpec s : config.getRemoteConfig().getPushRefSpecs()) {
       if (s.matchSource(ref)) {
         return true;
@@ -666,7 +639,7 @@
         } else if (!remoteNameStyle.equals("slash")) {
           repLog.debug("Unknown remoteNameStyle: {}, falling back to slash", remoteNameStyle);
         }
-        String replacedPath = replaceName(uri.getPath(), name, isSingleProjectMatch());
+        String replacedPath = replaceName(uri.getPath(), name, config.isSingleProjectMatch());
         if (replacedPath != null) {
           uri = uri.setPath(replacedPath);
           r.add(uri);
@@ -731,6 +704,10 @@
     return config.getDelay() * 1000;
   }
 
+  int getSlowLatencyThreshold() {
+    return config.getSlowLatencyThreshold();
+  }
+
   private static boolean matches(URIish uri, String urlMatch) {
     if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
       return true;
@@ -750,7 +727,7 @@
       ReplicationScheduledEvent event =
           new ReplicationScheduledEvent(project.get(), ref, targetNode);
       try {
-        eventDispatcher.get().postEvent(new Branch.NameKey(project, ref), event);
+        eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
       } catch (PermissionBackendException e) {
         repLog.error("error posting event", e);
       }
@@ -764,7 +741,7 @@
       RefReplicatedEvent event =
           new RefReplicatedEvent(project.get(), ref, targetNode, RefPushResult.FAILED, status);
       try {
-        eventDispatcher.get().postEvent(new Branch.NameKey(project, ref), event);
+        eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
       } catch (PermissionBackendException e) {
         repLog.error("error posting event", e);
       }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index f688cfc..4b757ea 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -16,13 +16,16 @@
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.config.ConfigUtil;
+import java.util.concurrent.TimeUnit;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.transport.RemoteConfig;
 
-public class DestinationConfiguration {
+public class DestinationConfiguration implements RemoteConfiguration {
   static final int DEFAULT_REPLICATION_DELAY = 15;
   static final int DEFAULT_RESCHEDULE_DELAY = 3;
   static final int DEFAULT_DRAIN_QUEUE_ATTEMPTS = 0;
+  private static final int DEFAULT_SLOW_LATENCY_THRESHOLD_SECS = 900;
 
   private final int delay;
   private final int rescheduleDelay;
@@ -41,6 +44,7 @@
   private final ImmutableList<String> authGroupNames;
   private final RemoteConfig remoteConfig;
   private final int maxRetries;
+  private final int slowLatencyThreshold;
 
   protected DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) {
     this.remoteConfig = remoteConfig;
@@ -67,16 +71,29 @@
     maxRetries =
         getInt(
             remoteConfig, cfg, "replicationMaxRetries", cfg.getInt("replication", "maxRetries", 0));
+
+    slowLatencyThreshold =
+        (int)
+            ConfigUtil.getTimeUnit(
+                cfg,
+                "remote",
+                remoteConfig.getName(),
+                "slowLatencyThreshold",
+                DEFAULT_SLOW_LATENCY_THRESHOLD_SECS,
+                TimeUnit.SECONDS);
   }
 
+  @Override
   public int getDelay() {
     return delay;
   }
 
+  @Override
   public int getRescheduleDelay() {
     return rescheduleDelay;
   }
 
+  @Override
   public int getRetryDelay() {
     return retryDelay;
   }
@@ -93,26 +110,32 @@
     return lockErrorMaxRetries;
   }
 
+  @Override
   public ImmutableList<String> getUrls() {
     return urls;
   }
 
+  @Override
   public ImmutableList<String> getAdminUrls() {
     return adminUrls;
   }
 
+  @Override
   public ImmutableList<String> getProjects() {
     return projects;
   }
 
+  @Override
   public ImmutableList<String> getAuthGroupNames() {
     return authGroupNames;
   }
 
+  @Override
   public String getRemoteNameStyle() {
     return remoteNameStyle;
   }
 
+  @Override
   public boolean replicatePermissions() {
     return replicatePermissions;
   }
@@ -129,10 +152,12 @@
     return replicateHiddenProjects;
   }
 
+  @Override
   public RemoteConfig getRemoteConfig() {
     return remoteConfig;
   }
 
+  @Override
   public int getMaxRetries() {
     return maxRetries;
   }
@@ -140,4 +165,9 @@
   private static int getInt(RemoteConfig rc, Config cfg, String name, int defValue) {
     return cfg.getInt("remote", rc.getName(), name, defValue);
   }
+
+  @Override
+  public int getSlowLatencyThreshold() {
+    return slowLatencyThreshold;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
new file mode 100644
index 0000000..2f210e0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -0,0 +1,348 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp;
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
+import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.Destination.Factory;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.URIish;
+
+@Singleton
+public class DestinationsCollection implements ReplicationDestinations, ReplicationConfigValidator {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final Factory destinationFactory;
+  private final Provider<ReplicationQueue> replicationQueue;
+  private volatile List<Destination> destinations;
+  private boolean shuttingDown;
+
+  public static class EventQueueNotEmptyException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public EventQueueNotEmptyException(String errorMessage) {
+      super(errorMessage);
+    }
+  }
+
+  @Inject
+  public DestinationsCollection(
+      Destination.Factory destinationFactory,
+      Provider<ReplicationQueue> replicationQueue,
+      ReplicationFileBasedConfig replicationConfig,
+      EventBus eventBus)
+      throws ConfigInvalidException {
+    this.destinationFactory = destinationFactory;
+    this.replicationQueue = replicationQueue;
+    this.destinations = allDestinations(destinationFactory, validateConfig(replicationConfig));
+    eventBus.register(this);
+  }
+
+  @Override
+  public Multimap<Destination, URIish> getURIs(
+      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
+    if (getAll(filterType).isEmpty()) {
+      return ImmutableMultimap.of();
+    }
+
+    SetMultimap<Destination, URIish> uris = HashMultimap.create();
+    for (Destination config : getAll(filterType)) {
+      if (!config.wouldPushProject(projectName)) {
+        continue;
+      }
+
+      if (remoteName.isPresent() && !config.getRemoteConfigName().equals(remoteName.get())) {
+        continue;
+      }
+
+      boolean adminURLUsed = false;
+
+      for (String url : config.getAdminUrls()) {
+        if (Strings.isNullOrEmpty(url)) {
+          continue;
+        }
+
+        URIish uri;
+        try {
+          uri = new URIish(url);
+        } catch (URISyntaxException e) {
+          repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage());
+          continue;
+        }
+
+        if (!isGerrit(uri) && !isGerritHttp(uri)) {
+          String path =
+              replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
+          if (path == null) {
+            repLog.warn("adminURL {} does not contain ${name}", uri);
+            continue;
+          }
+
+          uri = uri.setPath(path);
+          if (!isSSH(uri)) {
+            repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri);
+            continue;
+          }
+        }
+        uris.put(config, uri);
+        adminURLUsed = true;
+      }
+
+      if (!adminURLUsed) {
+        for (URIish uri : config.getURIs(projectName, "*")) {
+          uris.put(config, uri);
+        }
+      }
+    }
+    return uris;
+  }
+
+  @Override
+  public synchronized List<Destination> getAll(FilterType filterType) {
+    Predicate<? super Destination> filter;
+    switch (filterType) {
+      case PROJECT_CREATION:
+        filter = dest -> dest.isCreateMissingRepos();
+        break;
+      case PROJECT_DELETION:
+        filter = dest -> dest.isReplicateProjectDeletions();
+        break;
+      case ALL:
+      default:
+        filter = dest -> true;
+        break;
+    }
+    return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList());
+  }
+
+  @Override
+  public synchronized boolean isEmpty() {
+    return destinations.isEmpty();
+  }
+
+  @Override
+  public synchronized void startup(WorkQueue workQueue) {
+    shuttingDown = false;
+    for (Destination cfg : destinations) {
+      cfg.start(workQueue);
+    }
+  }
+
+  /* shutdown() cannot be set as a synchronized method because
+   * it may need to wait for pending events to complete;
+   * e.g. when enabling the drain of replication events before
+   * shutdown.
+   *
+   * As a rule of thumb for synchronized methods, because they
+   * implicitly define a critical section and associated lock,
+   * they should never hold waiting for another resource, otherwise
+   * the risk of deadlock is very high.
+   *
+   * See more background about deadlocks, what they are and how to
+   * prevent them at: https://en.wikipedia.org/wiki/Deadlock
+   */
+  @Override
+  public int shutdown() {
+    synchronized (this) {
+      shuttingDown = true;
+    }
+
+    int discarded = 0;
+    for (Destination cfg : destinations) {
+      try {
+        drainReplicationEvents(cfg);
+      } catch (EventQueueNotEmptyException e) {
+        logger.atWarning().log("Event queue not empty: %s", e.getMessage());
+      } finally {
+        discarded += cfg.shutdown();
+      }
+    }
+    return discarded;
+  }
+
+  void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
+    int drainQueueAttempts = destination.getDrainQueueAttempts();
+    if (drainQueueAttempts == 0) {
+      return;
+    }
+    int pending = destination.getQueueInfo().pending.size();
+    int inFlight = destination.getQueueInfo().inFlight.size();
+    while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
+      try {
+        logger.atInfo().log(
+            "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
+            inFlight, pending);
+        Thread.sleep(destination.getReplicationDelaySeconds());
+      } catch (InterruptedException ie) {
+        logger.atWarning().withCause(ie).log(
+            "Wait for replication events to drain has been interrupted");
+      }
+      pending = destination.getQueueInfo().pending.size();
+      inFlight = destination.getQueueInfo().inFlight.size();
+      drainQueueAttempts--;
+    }
+    if (pending > 0 || inFlight > 0) {
+      throw new EventQueueNotEmptyException(
+          String.format("Pending: %d - InFlight: %d", pending, inFlight));
+    }
+  }
+
+  @Subscribe
+  public synchronized void onReload(List<RemoteConfiguration> remoteConfigurations) {
+    if (shuttingDown) {
+      logger.atWarning().log("Shutting down: configuration reload ignored");
+      return;
+    }
+
+    try {
+      replicationQueue.get().stop();
+      destinations = allDestinations(destinationFactory, remoteConfigurations);
+      logger.atInfo().log("Configuration reloaded: %d destinations", getAll(FilterType.ALL).size());
+    } finally {
+      replicationQueue.get().start();
+    }
+  }
+
+  @Override
+  public List<RemoteConfiguration> validateConfig(ReplicationFileBasedConfig replicationConfig)
+      throws ConfigInvalidException {
+    if (!replicationConfig.getConfig().getFile().exists()) {
+      logger.atWarning().log(
+          "Config file %s does not exist; not replicating",
+          replicationConfig.getConfig().getFile());
+      return Collections.emptyList();
+    }
+    if (replicationConfig.getConfig().getFile().length() == 0) {
+      logger.atInfo().log(
+          "Config file %s is empty; not replicating", replicationConfig.getConfig().getFile());
+      return Collections.emptyList();
+    }
+
+    try {
+      replicationConfig.getConfig().load();
+    } catch (ConfigInvalidException e) {
+      throw new ConfigInvalidException(
+          String.format(
+              "Config file %s is invalid: %s",
+              replicationConfig.getConfig().getFile(), e.getMessage()),
+          e);
+    } catch (IOException e) {
+      throw new ConfigInvalidException(
+          String.format(
+              "Cannot read %s: %s", replicationConfig.getConfig().getFile(), e.getMessage()),
+          e);
+    }
+
+    boolean defaultForceUpdate =
+        replicationConfig.getConfig().getBoolean("gerrit", "defaultForceUpdate", false);
+
+    ImmutableList.Builder<RemoteConfiguration> confs = ImmutableList.builder();
+    for (RemoteConfig c : allRemotes(replicationConfig.getConfig())) {
+      if (c.getURIs().isEmpty()) {
+        continue;
+      }
+
+      // If destination for push is not set assume equal to source.
+      for (RefSpec ref : c.getPushRefSpecs()) {
+        if (ref.getDestination() == null) {
+          ref.setDestination(ref.getSource());
+        }
+      }
+
+      if (c.getPushRefSpecs().isEmpty()) {
+        c.addPushRefSpec(
+            new RefSpec()
+                .setSourceDestination("refs/*", "refs/*")
+                .setForceUpdate(defaultForceUpdate));
+      }
+
+      DestinationConfiguration destinationConfiguration =
+          new DestinationConfiguration(c, replicationConfig.getConfig());
+
+      if (!destinationConfiguration.isSingleProjectMatch()) {
+        for (URIish u : c.getURIs()) {
+          if (u.getPath() == null || !u.getPath().contains("${name}")) {
+            throw new ConfigInvalidException(
+                String.format(
+                    "remote.%s.url \"%s\" lacks ${name} placeholder in %s",
+                    c.getName(), u, replicationConfig.getConfig().getFile()));
+          }
+        }
+      }
+
+      confs.add(destinationConfiguration);
+    }
+
+    return confs.build();
+  }
+
+  private List<Destination> allDestinations(
+      Destination.Factory destinationFactory, List<RemoteConfiguration> remoteConfigurations) {
+
+    ImmutableList.Builder<Destination> dest = ImmutableList.builder();
+    for (RemoteConfiguration c : remoteConfigurations) {
+      if (c instanceof DestinationConfiguration) {
+        dest.add(destinationFactory.create((DestinationConfiguration) c));
+      }
+    }
+    return dest.build();
+  }
+
+  private static List<RemoteConfig> allRemotes(FileBasedConfig cfg) throws ConfigInvalidException {
+    Set<String> names = cfg.getSubsections("remote");
+    List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size());
+    for (String name : names) {
+      try {
+        result.add(new RemoteConfig(cfg, name));
+      } catch (URISyntaxException e) {
+        throw new ConfigInvalidException(
+            String.format("remote %s has invalid URL in %s", name, cfg.getFile()), e);
+      }
+    }
+    return result;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
index aaf2b15..f910a40 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
@@ -18,8 +18,8 @@
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
 import com.google.common.base.Charsets;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.restapi.Url;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import java.io.IOException;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
index 6dcc80e..d195aa3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.ssh.SshAddressesModule;
 import java.io.IOException;
 import java.io.OutputStream;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
index 07978ab..9264d9b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
@@ -40,11 +40,11 @@
   @Option(name = "--json", usage = "output in json format")
   private boolean json;
 
-  @Inject private ReplicationConfig config;
+  @Inject private ReplicationDestinations destinations;
 
   @Override
   protected void run() {
-    for (Destination d : config.getDestinations(FilterType.ALL)) {
+    for (Destination d : destinations.getAll(FilterType.ALL)) {
       if (matches(d.getRemoteConfigName())) {
         printRemote(d);
       }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
index aa6e16c..da960e6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -16,7 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import java.io.File;
 import java.io.IOException;
 import org.eclipse.jgit.internal.storage.file.FileRepository;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java
new file mode 100644
index 0000000..1007ae5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+/** Allows a queue activity to be observed */
+public interface ObservableQueue {
+  /**
+   * Indicates whether the observed queue is running
+   *
+   * @return true, when the queue is running, false otherwise
+   */
+  boolean isRunning();
+
+  /**
+   * Indicates whether the observed queue is replaying queued events
+   *
+   * @return true, when the queue is replaying, false otherwise
+   */
+  boolean isReplaying();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
index 833b02b..4f60319 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.gerrit.common.Nullable;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.inject.Inject;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 56cecfe..b46c278 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -16,19 +16,20 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toMap;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Sets;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.ResourceConflictException;
 import com.google.gerrit.metrics.Timer1;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.reviewdb.client.RefNames;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.PerThreadRequestScope;
 import com.google.gerrit.server.git.ProjectRunnable;
@@ -329,14 +330,19 @@
     }
 
     repLog.info("Replication to {} started...", uri);
-    Timer1.Context context = metrics.start(config.getName());
+    Timer1.Context<String> destinationContext = metrics.start(config.getName());
     try {
-      long startedAt = context.getStartTime();
+      long startedAt = destinationContext.getStartTime();
       long delay = NANOSECONDS.toMillis(startedAt - createdAt);
       metrics.record(config.getName(), delay, retryCount);
       git = gitManager.openRepository(projectName);
       runImpl();
-      long elapsed = NANOSECONDS.toMillis(context.stop());
+      long elapsed = NANOSECONDS.toMillis(destinationContext.stop());
+
+      if (elapsed > SECONDS.toMillis(pool.getSlowLatencyThreshold())) {
+        metrics.recordSlowProjectReplication(
+            config.getName(), projectName.get(), pool.getSlowLatencyThreshold(), elapsed);
+      }
       repLog.info(
           "Replication to {} completed in {}ms, {}ms delay, {} retries",
           uri,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
index fccdb7b..d1ab790 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
@@ -14,9 +14,10 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.events.RefEvent;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import java.util.Objects;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.RemoteRefUpdate.Status;
 
@@ -45,11 +46,40 @@
 
   @Override
   public Project.NameKey getProjectNameKey() {
-    return new Project.NameKey(project);
+    return Project.nameKey(project);
   }
 
   @Override
   public String getRefName() {
     return ref;
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof RefReplicatedEvent)) {
+      return false;
+    }
+    RefReplicatedEvent event = (RefReplicatedEvent) other;
+    if (!Objects.equals(event.project, this.project)) {
+      return false;
+    }
+    if (!Objects.equals(event.ref, this.ref)) {
+      return false;
+    }
+    if (!Objects.equals(event.targetNode, this.targetNode)) {
+      return false;
+    }
+    if (!Objects.equals(event.status, this.status)) {
+      return false;
+    }
+    if (!Objects.equals(event.refStatus, this.refStatus)) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
index 4789a96..e663194 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
@@ -14,8 +14,9 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.events.RefEvent;
+import java.util.Objects;
 
 public class RefReplicationDoneEvent extends RefEvent {
   public static final String TYPE = "ref-replication-done";
@@ -33,11 +34,35 @@
 
   @Override
   public Project.NameKey getProjectNameKey() {
-    return new Project.NameKey(project);
+    return Project.nameKey(project);
   }
 
   @Override
   public String getRefName() {
     return ref;
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof RefReplicationDoneEvent)) {
+      return false;
+    }
+
+    RefReplicationDoneEvent event = (RefReplicationDoneEvent) other;
+    if (!Objects.equals(event.project, this.project)) {
+      return false;
+    }
+    if (!Objects.equals(event.ref, this.ref)) {
+      return false;
+    }
+    if (event.nodesCount != this.nodesCount) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
new file mode 100644
index 0000000..b66e73c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
@@ -0,0 +1,122 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.eclipse.jgit.transport.RemoteConfig;
+
+/** Remote configuration for a replication endpoint */
+public interface RemoteConfiguration {
+  /**
+   * Time to wait before scheduling a remote replication operation. Setting to 0 effectively
+   * disables the delay.
+   *
+   * @return the delay value in seconds
+   */
+  int getDelay();
+  /**
+   * Time to wait before rescheduling a remote replication operation, which might have failed the
+   * first time round. Setting to 0 effectively disables the delay.
+   *
+   * @return the delay value in seconds
+   */
+  int getRescheduleDelay();
+  /**
+   * Time to wait before retrying a failed remote replication operation, Setting to 0 effectively
+   * disables the delay.
+   *
+   * @return the delay value in seconds
+   */
+  int getRetryDelay();
+  /**
+   * List of the remote endpoint addresses used for replication.
+   *
+   * @return list of remote URL strings
+   */
+  ImmutableList<String> getUrls();
+  /**
+   * List of alternative remote endpoint addresses, used for admin operations, such as repository
+   * creation
+   *
+   * @return list of remote URL strings
+   */
+  ImmutableList<String> getAdminUrls();
+  /**
+   * List of repositories that should be replicated
+   *
+   * @return list of project strings
+   */
+  ImmutableList<String> getProjects();
+  /**
+   * List of groups that should be used to access the repositories.
+   *
+   * @return list of group strings
+   */
+  ImmutableList<String> getAuthGroupNames();
+  /**
+   * Influence how the name of the remote repository should be computed.
+   *
+   * @return a string representing a remote style name
+   */
+  String getRemoteNameStyle();
+  /**
+   * If true, permissions-only projects and the refs/meta/config branch will also be replicated
+   *
+   * @return a string representing a remote style name
+   */
+  boolean replicatePermissions();
+  /**
+   * the JGIT remote configuration representing the replication for this endpoint
+   *
+   * @return The remote config {@link RemoteConfig}
+   */
+  RemoteConfig getRemoteConfig();
+  /**
+   * Number of times to retry a replication operation
+   *
+   * @return the number of retries
+   */
+  int getMaxRetries();
+
+  /**
+   * the time duration after which the replication for a project should be considered “slow”
+   *
+   * @return the slow latency threshold
+   */
+  int getSlowLatencyThreshold();
+
+  /**
+   * Whether the remote configuration is for a single project only
+   *
+   * @return true, when configuration is for a single project, false otherwise
+   */
+  default boolean isSingleProjectMatch() {
+    List<String> projects = getProjects();
+    boolean ret = (projects.size() == 1);
+    if (ret) {
+      String projectMatch = projects.get(0);
+      if (ReplicationFilter.getPatternType(projectMatch)
+          != ReplicationFilter.PatternType.EXACT_MATCH) {
+        // projectMatch is either regular expression, or wild-card.
+        //
+        // Even though they might refer to a single project now, they need not
+        // after new projects have been created. Hence, we do not treat them as
+        // matching a single project.
+        ret = false;
+      }
+    }
+    return ret;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
index ee9d4c0..d4d979f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -16,7 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import java.io.IOException;
 import java.io.OutputStream;
 import org.eclipse.jgit.transport.URIish;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index 929c538..b981bc8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -11,44 +11,63 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
+
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.common.collect.Multimap;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.git.WorkQueue;
 import java.nio.file.Path;
-import java.util.List;
-import java.util.Optional;
-import org.eclipse.jgit.transport.URIish;
 
+/** Configuration of all the replication end points. */
 public interface ReplicationConfig {
 
+  /** Filter for accessing replication projects. */
   enum FilterType {
     PROJECT_CREATION,
     PROJECT_DELETION,
     ALL
   }
 
-  List<Destination> getDestinations(FilterType filterType);
-
-  Multimap<Destination, URIish> getURIs(
-      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType);
-
+  /**
+   * Returns current replication configuration of whether to replicate or not all the projects when
+   * the plugin starts.
+   *
+   * @return true if replication at plugin start, false otherwise.
+   */
   boolean isReplicateAllOnPluginStart();
 
+  /**
+   * Returns the default behaviour of the replication plugin when pushing to remote replication
+   * ends. Even though the property name has the 'update' suffix, it actually refers to Git push
+   * operation and not to a Git update.
+   *
+   * @return true if forced push is the default, false otherwise.
+   */
   boolean isDefaultForceUpdate();
 
+  /**
+   * Returns the maximum number of ref-specs to log into the replication_log whenever a push
+   * operation is completed against a replication end.
+   *
+   * @return maximum number of refs to log, zero if unlimited.
+   */
   int getMaxRefsToLog();
 
-  boolean isEmpty();
-
+  /**
+   * Configured location where the replication events are stored on the filesystem for being resumed
+   * and kept across restarts.
+   *
+   * @return path to store persisted events.
+   */
   Path getEventsDirectory();
 
-  int shutdown();
-
-  void startup(WorkQueue workQueue);
-
   int getSshConnectionTimeout();
 
   int getSshCommandTimeout();
+
+  /**
+   * Current logical version string of the current configuration loaded in memory, depending on the
+   * actual implementation of the configuration on the persistent storage.
+   *
+   * @return current logical version number.
+   */
+  String getVersion();
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfigValidator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfigValidator.java
new file mode 100644
index 0000000..7883114
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfigValidator.java
@@ -0,0 +1,31 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.util.List;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+
+public interface ReplicationConfigValidator {
+
+  /**
+   * validate the new replication.config
+   *
+   * @param newConfig new configuration detected
+   * @return List of validated {@link RemoteConfiguration}
+   * @throws ConfigInvalidException if the new configuration is not valid.
+   */
+  List<RemoteConfiguration> validateConfig(ReplicationFileBasedConfig newConfig)
+      throws ConfigInvalidException;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
new file mode 100644
index 0000000..b191d7d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
@@ -0,0 +1,63 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.collect.Multimap;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.WorkQueue;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import java.util.List;
+import java.util.Optional;
+import org.eclipse.jgit.transport.URIish;
+
+/** Git destinations currently active for replication. */
+public interface ReplicationDestinations {
+
+  /**
+   * Return all the URIs associated to a project and a filter criteria.
+   *
+   * @param remoteName name of the replication end or empty if selecting all ends.
+   * @param projectName name of the project
+   * @param filterType type of filter criteria for selecting projects
+   * @return the multi-map of destinations and the associated replication URIs
+   */
+  Multimap<Destination, URIish> getURIs(
+      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType);
+
+  /**
+   * List of currently active replication destinations.
+   *
+   * @param filterType type project filtering
+   * @return the list of active destinations
+   */
+  List<Destination> getAll(FilterType filterType);
+
+  /** @return true if there are no destinations, false otherwise. */
+  boolean isEmpty();
+
+  /**
+   * Start replicating to all destinations.
+   *
+   * @param workQueue execution queue for scheduling the replication events.
+   */
+  void startup(WorkQueue workQueue);
+
+  /**
+   * Stop the replication to all destinations.
+   *
+   * @return number of events cancelled during shutdown.
+   */
+  int shutdown();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 1f0c40e..554e441 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -13,51 +13,19 @@
 // limitations under the License.
 package com.googlesource.gerrit.plugins.replication;
 
-import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
-import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp;
-import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
-import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static java.util.stream.Collectors.toList;
-
 import com.google.common.base.Strings;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.SetMultimap;
-import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.annotations.PluginData;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.config.ConfigUtil;
 import com.google.gerrit.server.config.SitePaths;
-import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import java.io.IOException;
-import java.net.URISyntaxException;
 import java.nio.file.Path;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Predicate;
-import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.transport.RemoteConfig;
-import org.eclipse.jgit.transport.URIish;
 import org.eclipse.jgit.util.FS;
 
 @Singleton
 public class ReplicationFileBasedConfig implements ReplicationConfig {
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final int DEFAULT_SSH_CONNECTION_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes
 
-  private List<Destination> destinations;
   private final SitePaths site;
   private Path cfgPath;
   private boolean replicateAllOnPluginStart;
@@ -69,177 +37,17 @@
   private final Path pluginDataDir;
 
   @Inject
-  public ReplicationFileBasedConfig(
-      SitePaths site, Destination.Factory destinationFactory, @PluginData Path pluginDataDir)
-      throws ConfigInvalidException, IOException {
+  public ReplicationFileBasedConfig(SitePaths site, @PluginData Path pluginDataDir) {
     this.site = site;
     this.cfgPath = site.etc_dir.resolve("replication.config");
     this.config = new FileBasedConfig(cfgPath.toFile(), FS.DETECTED);
-    this.destinations = allDestinations(destinationFactory);
+    this.replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
+    this.defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
+    this.maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0);
     this.pluginDataDir = pluginDataDir;
   }
 
-  /*
-   * (non-Javadoc)
-   * @see
-   * com.googlesource.gerrit.plugins.replication.ReplicationConfig#getDestinations
-   * (com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType)
-   */
-  @Override
-  public List<Destination> getDestinations(FilterType filterType) {
-    Predicate<? super Destination> filter;
-    switch (filterType) {
-      case PROJECT_CREATION:
-        filter = dest -> dest.isCreateMissingRepos();
-        break;
-      case PROJECT_DELETION:
-        filter = dest -> dest.isReplicateProjectDeletions();
-        break;
-      case ALL:
-      default:
-        filter = dest -> true;
-        break;
-    }
-    return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList());
-  }
-
-  private List<Destination> allDestinations(Destination.Factory destinationFactory)
-      throws ConfigInvalidException, IOException {
-    if (!config.getFile().exists()) {
-      logger.atWarning().log("Config file %s does not exist; not replicating", config.getFile());
-      return Collections.emptyList();
-    }
-    if (config.getFile().length() == 0) {
-      logger.atInfo().log("Config file %s is empty; not replicating", config.getFile());
-      return Collections.emptyList();
-    }
-
-    try {
-      config.load();
-    } catch (ConfigInvalidException e) {
-      throw new ConfigInvalidException(
-          String.format("Config file %s is invalid: %s", config.getFile(), e.getMessage()), e);
-    } catch (IOException e) {
-      throw new IOException(
-          String.format("Cannot read %s: %s", config.getFile(), e.getMessage()), e);
-    }
-
-    replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
-
-    defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
-
-    maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0);
-
-    sshCommandTimeout =
-        (int) ConfigUtil.getTimeUnit(config, "gerrit", null, "sshCommandTimeout", 0, SECONDS);
-    sshConnectionTimeout =
-        (int)
-            ConfigUtil.getTimeUnit(
-                config,
-                "gerrit",
-                null,
-                "sshConnectionTimeout",
-                DEFAULT_SSH_CONNECTION_TIMEOUT_MS,
-                MILLISECONDS);
-
-    ImmutableList.Builder<Destination> dest = ImmutableList.builder();
-    for (RemoteConfig c : allRemotes(config)) {
-      if (c.getURIs().isEmpty()) {
-        continue;
-      }
-
-      // If destination for push is not set assume equal to source.
-      for (RefSpec ref : c.getPushRefSpecs()) {
-        if (ref.getDestination() == null) {
-          ref.setDestination(ref.getSource());
-        }
-      }
-
-      if (c.getPushRefSpecs().isEmpty()) {
-        c.addPushRefSpec(
-            new RefSpec()
-                .setSourceDestination("refs/*", "refs/*")
-                .setForceUpdate(defaultForceUpdate));
-      }
-
-      Destination destination = destinationFactory.create(new DestinationConfiguration(c, config));
-
-      if (!destination.isSingleProjectMatch()) {
-        for (URIish u : c.getURIs()) {
-          if (u.getPath() == null || !u.getPath().contains("${name}")) {
-            throw new ConfigInvalidException(
-                String.format(
-                    "remote.%s.url \"%s\" lacks ${name} placeholder in %s",
-                    c.getName(), u, config.getFile()));
-          }
-        }
-      }
-
-      dest.add(destination);
-    }
-    return dest.build();
-  }
-
-  @Override
-  public Multimap<Destination, URIish> getURIs(
-      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
-    if (getDestinations(filterType).isEmpty()) {
-      return ImmutableMultimap.of();
-    }
-
-    SetMultimap<Destination, URIish> uris = HashMultimap.create();
-    for (Destination config : getDestinations(filterType)) {
-      if (!config.wouldPushProject(projectName)) {
-        continue;
-      }
-
-      if (remoteName.isPresent() && !config.getRemoteConfigName().equals(remoteName.get())) {
-        continue;
-      }
-
-      boolean adminURLUsed = false;
-
-      for (String url : config.getAdminUrls()) {
-        if (Strings.isNullOrEmpty(url)) {
-          continue;
-        }
-
-        URIish uri;
-        try {
-          uri = new URIish(url);
-        } catch (URISyntaxException e) {
-          repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage());
-          continue;
-        }
-
-        if (!isGerrit(uri) && !isGerritHttp(uri)) {
-          String path =
-              replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
-          if (path == null) {
-            repLog.warn("adminURL {} does not contain ${name}", uri);
-            continue;
-          }
-
-          uri = uri.setPath(path);
-          if (!isSSH(uri)) {
-            repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri);
-            continue;
-          }
-        }
-        uris.put(config, uri);
-        adminURLUsed = true;
-      }
-
-      if (!adminURLUsed) {
-        for (URIish uri : config.getURIs(projectName, "*")) {
-          uris.put(config, uri);
-        }
-      }
-    }
-    return uris;
-  }
-
-  static String replaceName(String in, String name, boolean keyIsOptional) {
+  public static String replaceName(String in, String name, boolean keyIsOptional) {
     String key = "${name}";
     int n = in.indexOf(key);
     if (0 <= n) {
@@ -272,28 +80,6 @@
     return maxRefsToLog;
   }
 
-  private static List<RemoteConfig> allRemotes(FileBasedConfig cfg) throws ConfigInvalidException {
-    Set<String> names = cfg.getSubsections("remote");
-    List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size());
-    for (String name : names) {
-      try {
-        result.add(new RemoteConfig(cfg, name));
-      } catch (URISyntaxException e) {
-        throw new ConfigInvalidException(
-            String.format("remote %s has invalid URL in %s", name, cfg.getFile()), e);
-      }
-    }
-    return result;
-  }
-
-  /* (non-Javadoc)
-   * @see com.googlesource.gerrit.plugins.replication.ReplicationConfig#isEmpty()
-   */
-  @Override
-  public boolean isEmpty() {
-    return destinations.isEmpty();
-  }
-
   @Override
   public Path getEventsDirectory() {
     String eventsDirectory = config.getString("replication", null, "eventsDirectory");
@@ -307,67 +93,13 @@
     return cfgPath;
   }
 
-  @Override
-  public int shutdown() {
-    int discarded = 0;
-    for (Destination cfg : destinations) {
-      try {
-        drainReplicationEvents(cfg);
-      } catch (EventQueueNotEmptyException e) {
-        logger.atWarning().log("Event queue not empty: %s", e.getMessage());
-      } finally {
-        discarded += cfg.shutdown();
-      }
-    }
-    return discarded;
-  }
-
-  void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
-    int drainQueueAttempts = destination.getDrainQueueAttempts();
-    if (drainQueueAttempts == 0) {
-      return;
-    }
-    int pending = destination.getQueueInfo().pending.size();
-    int inFlight = destination.getQueueInfo().inFlight.size();
-
-    while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
-      try {
-        logger.atInfo().log(
-            "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
-            inFlight, pending);
-        Thread.sleep(destination.getReplicationDelaySeconds());
-      } catch (InterruptedException ie) {
-        logger.atWarning().withCause(ie).log(
-            "Wait for replication events to drain has been interrupted");
-      }
-      pending = destination.getQueueInfo().pending.size();
-      inFlight = destination.getQueueInfo().inFlight.size();
-      drainQueueAttempts--;
-    }
-
-    if (pending > 0 || inFlight > 0) {
-      throw new EventQueueNotEmptyException(
-          String.format("Pending: %d - InFlight: %d", pending, inFlight));
-    }
-  }
-
-  public static class EventQueueNotEmptyException extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    public EventQueueNotEmptyException(String errorMessage) {
-      super(errorMessage);
-    }
-  }
-
-  FileBasedConfig getConfig() {
+  public FileBasedConfig getConfig() {
     return config;
   }
 
   @Override
-  public void startup(WorkQueue workQueue) {
-    for (Destination cfg : destinations) {
-      cfg.start(workQueue);
-    }
+  public String getVersion() {
+    return Long.toString(config.getFile().lastModified());
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
index 05bbb03..5b4204e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.gerrit.common.data.AccessSection;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import java.util.Collections;
 import java.util.List;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java
index afc7926..1bc17ec 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java
@@ -14,11 +14,14 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.metrics.Description;
 import com.google.gerrit.metrics.Field;
 import com.google.gerrit.metrics.Histogram1;
+import com.google.gerrit.metrics.Histogram3;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.server.logging.PluginMetadata;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -27,10 +30,37 @@
   private final Timer1<String> executionTime;
   private final Histogram1<String> executionDelay;
   private final Histogram1<String> executionRetries;
+  private final Histogram3<Integer, String, String> slowProjectReplicationLatency;
 
   @Inject
-  ReplicationMetrics(MetricMaker metricMaker) {
-    Field<String> DEST_FIELD = Field.ofString("destination");
+  ReplicationMetrics(@PluginName String pluginName, MetricMaker metricMaker) {
+    Field<String> DEST_FIELD =
+        Field.ofString(
+                "destination",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(PluginMetadata.create("destination", fieldValue)))
+            .build();
+
+    Field<String> PROJECT_FIELD =
+        Field.ofString(
+                "project",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(PluginMetadata.create("project", fieldValue)))
+            .build();
+
+    Field<Integer> SLOW_THRESHOLD_FIELD =
+        Field.ofInteger(
+                "slow_threshold",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(
+                            PluginMetadata.create("slow_threshold", fieldValue.toString())))
+            .build();
 
     executionTime =
         metricMaker.newTimer(
@@ -55,6 +85,17 @@
                 .setCumulative()
                 .setUnit("retries"),
             DEST_FIELD);
+
+    slowProjectReplicationLatency =
+        metricMaker.newHistogram(
+            "latency_slower_than_threshold" + "",
+            new Description(
+                    "latency for project to destination, where latency was slower than threshold")
+                .setCumulative()
+                .setUnit(Description.Units.MILLISECONDS),
+            SLOW_THRESHOLD_FIELD,
+            PROJECT_FIELD,
+            DEST_FIELD);
   }
 
   /**
@@ -63,7 +104,7 @@
    * @param name the destination name.
    * @return the timer context.
    */
-  Timer1.Context start(String name) {
+  Timer1.Context<String> start(String name) {
     return executionTime.start(name);
   }
 
@@ -78,4 +119,17 @@
     executionDelay.record(name, delay);
     executionRetries.record(name, retries);
   }
+
+  /**
+   * Record replication latency for project to destination, where latency was slower than threshold
+   *
+   * @param destinationName the destination name.
+   * @param projectName the project name.
+   * @param slowThreshold replication initialDelay in milliseconds.
+   * @param latency number of retries.
+   */
+  void recordSlowProjectReplication(
+      String destinationName, String projectName, Integer slowThreshold, long latency) {
+    slowProjectReplicationLatency.record(slowThreshold, destinationName, projectName, latency);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index 835d068..979c8e3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -16,6 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.StartReplicationCapability.START_REPLICATION;
 
+import com.google.common.eventbus.EventBus;
 import com.google.gerrit.extensions.annotations.Exports;
 import com.google.gerrit.extensions.config.CapabilityDefinition;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
@@ -23,18 +24,35 @@
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.events.EventTypes;
 import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.ProvisionException;
 import com.google.inject.Scopes;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.internal.UniqueAnnotations;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.transport.SshSessionFactory;
+import org.eclipse.jgit.util.FS;
 
 class ReplicationModule extends AbstractModule {
+  private final Path cfgPath;
+
+  @Inject
+  public ReplicationModule(SitePaths site) {
+    cfgPath = site.etc_dir.resolve("replication.config");
+  }
+
   @Override
   protected void configure() {
     install(new FactoryModuleBuilder().build(Destination.Factory.class));
     bind(ReplicationQueue.class).in(Scopes.SINGLETON);
+    bind(ObservableQueue.class).to(ReplicationQueue.class);
     bind(LifecycleListener.class)
         .annotatedWith(UniqueAnnotations.create())
         .to(ReplicationQueue.class);
@@ -57,7 +75,19 @@
 
     install(new FactoryModuleBuilder().build(PushAll.Factory.class));
 
-    bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
+    bind(EventBus.class).in(Scopes.SINGLETON);
+    bind(ReplicationDestinations.class).to(DestinationsCollection.class);
+    bind(ReplicationConfigValidator.class).to(DestinationsCollection.class);
+
+    if (getReplicationConfig().getBoolean("gerrit", "autoReload", false)) {
+      bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
+      bind(LifecycleListener.class)
+          .annotatedWith(UniqueAnnotations.create())
+          .to(AutoReloadConfigDecorator.class);
+    } else {
+      bind(ReplicationConfig.class).to(ReplicationFileBasedConfig.class);
+    }
+
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
     DynamicSet.bind(binder(), ReplicationStateListener.class).to(ReplicationStateLogger.class);
 
@@ -68,4 +98,15 @@
 
     bind(TransportFactory.class).to(TransportFactoryImpl.class).in(Scopes.SINGLETON);
   }
+
+  private FileBasedConfig getReplicationConfig() {
+    File replicationConfigFile = cfgPath.toFile();
+    FileBasedConfig config = new FileBasedConfig(replicationConfigFile, FS.DETECTED);
+    try {
+      config.load();
+    } catch (IOException | ConfigInvalidException e) {
+      throw new ProvisionException("Unable to load " + replicationConfigFile.getAbsolutePath(), e);
+    }
+    return config;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 8030d28..e9a60e4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -17,15 +17,16 @@
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
@@ -39,7 +40,8 @@
 
 /** Manages automatic replication to remote repositories. */
 public class ReplicationQueue
-    implements LifecycleListener,
+    implements ObservableQueue,
+        LifecycleListener,
         GitReferenceUpdatedListener,
         ProjectDeletedListener,
         HeadUpdatedListener {
@@ -50,7 +52,7 @@
 
   private final WorkQueue workQueue;
   private final DynamicItem<EventDispatcher> dispatcher;
-  private final ReplicationConfig config;
+  private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency
   private final ReplicationTasksStorage replicationTasksStorage;
   private volatile boolean running;
   private volatile boolean replaying;
@@ -59,13 +61,13 @@
   @Inject
   ReplicationQueue(
       WorkQueue wq,
-      ReplicationConfig rc,
+      Provider<ReplicationDestinations> rd,
       DynamicItem<EventDispatcher> dis,
       ReplicationStateListeners sl,
       ReplicationTasksStorage rts) {
     workQueue = wq;
     dispatcher = dis;
-    config = rc;
+    destinations = rd;
     stateLog = sl;
     replicationTasksStorage = rts;
     beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
@@ -74,8 +76,9 @@
   @Override
   public void start() {
     if (!running) {
-      config.startup(workQueue);
+      destinations.get().startup(workQueue);
       running = true;
+      replicationTasksStorage.resetAll();
       firePendingEvents();
       fireBeforeStartupEvents();
     }
@@ -84,16 +87,18 @@
   @Override
   public void stop() {
     running = false;
-    int discarded = config.shutdown();
+    int discarded = destinations.get().shutdown();
     if (discarded > 0) {
       repLog.warn("Canceled {} replication events during shutdown", discarded);
     }
   }
 
+  @Override
   public boolean isRunning() {
     return running;
   }
 
+  @Override
   public boolean isReplaying() {
     return replaying;
   }
@@ -105,49 +110,43 @@
   @VisibleForTesting
   public void scheduleFullSync(
       Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
-    if (!running) {
-      stateLog.warn("Replication plugin did not finish startup before event", state);
-      return;
-    }
-
-    for (Destination cfg : config.getDestinations(FilterType.ALL)) {
-      if (cfg.wouldPushProject(project)) {
-        for (URIish uri : cfg.getURIs(project, urlMatch)) {
-          cfg.schedule(project, PushOne.ALL_REFS, uri, state, now);
-          replicationTasksStorage.persist(
-              new ReplicateRefUpdate(
-                  project.get(), PushOne.ALL_REFS, uri, cfg.getRemoteConfigName()));
-        }
-      }
-    }
+    fire(project, urlMatch, PushOne.ALL_REFS, state, now);
   }
 
   @Override
   public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
-    onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+    fire(event.getProjectName(), event.getRefName());
   }
 
-  private void onGitReferenceUpdated(String projectName, String refName) {
+  private void fire(String projectName, String refName) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+    fire(Project.nameKey(projectName), null, refName, state, false);
+    state.markAllPushTasksScheduled();
+  }
+
+  private void fire(
+      Project.NameKey project,
+      String urlMatch,
+      String refName,
+      ReplicationState state,
+      boolean now) {
     if (!running) {
       stateLog.warn(
           "Replication plugin did not finish startup before event, event replication is postponed",
           state);
-      beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(projectName, refName));
+      beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(project.get(), refName));
       return;
     }
 
-    Project.NameKey project = new Project.NameKey(projectName);
-    for (Destination cfg : config.getDestinations(FilterType.ALL)) {
+    for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
       if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
-        for (URIish uri : cfg.getURIs(project, null)) {
-          replicationTasksStorage.persist(
-              new ReplicateRefUpdate(projectName, refName, uri, cfg.getRemoteConfigName()));
-          cfg.schedule(project, refName, uri, state);
+        for (URIish uri : cfg.getURIs(project, urlMatch)) {
+          replicationTasksStorage.create(
+              new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+          cfg.schedule(project, refName, uri, state, now);
         }
       }
     }
-    state.markAllPushTasksScheduled();
   }
 
   private void firePendingEvents() {
@@ -155,11 +154,11 @@
     try {
       Set<String> eventsReplayed = new HashSet<>();
       replaying = true;
-      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) {
+      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
         String eventKey = String.format("%s:%s", t.project, t.ref);
         if (!eventsReplayed.contains(eventKey)) {
           repLog.info("Firing pending task {}", eventKey);
-          onGitReferenceUpdated(t.project, t.ref);
+          fire(t.project, t.ref);
           eventsReplayed.add(eventKey);
         }
       }
@@ -170,15 +169,15 @@
 
   @Override
   public void onProjectDeleted(ProjectDeletedListener.Event event) {
-    Project.NameKey p = new Project.NameKey(event.getProjectName());
-    config.getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries().stream()
+    Project.NameKey p = Project.nameKey(event.getProjectName());
+    destinations.get().getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries().stream()
         .forEach(e -> e.getKey().scheduleDeleteProject(e.getValue(), p));
   }
 
   @Override
   public void onHeadUpdated(HeadUpdatedListener.Event event) {
-    Project.NameKey p = new Project.NameKey(event.getProjectName());
-    config.getURIs(Optional.empty(), p, FilterType.ALL).entries().stream()
+    Project.NameKey p = Project.nameKey(event.getProjectName());
+    destinations.get().getURIs(Optional.empty(), p, FilterType.ALL).entries().stream()
         .forEach(e -> e.getKey().scheduleUpdateHead(e.getValue(), p, event.getNewHeadName()));
   }
 
@@ -188,7 +187,7 @@
       String eventKey = String.format("%s:%s", event.projectName(), event.refName());
       if (!eventsReplayed.contains(eventKey)) {
         repLog.info("Firing pending task {}", event);
-        onGitReferenceUpdated(event.projectName(), event.refName());
+        fire(event.projectName(), event.refName());
         eventsReplayed.add(eventKey);
       }
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
index aa965fe..28f6b6b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.events.RefEvent;
 
 public class ReplicationScheduledEvent extends RefEvent {
@@ -38,6 +38,6 @@
 
   @Override
   public Project.NameKey getProjectNameKey() {
-    return new Project.NameKey(project);
+    return Project.nameKey(project);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 39158f3..3e6c4d4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -27,11 +27,33 @@
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.List;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.transport.URIish;
 
+/**
+ * A persistent store for replication tasks.
+ *
+ * <p>The data of this store lives under <replication_data>/ref-updates where replication_data is
+ * determined by the replication.eventsDirectory config option and defaults to
+ * <site_dir>/data/replication. Atomic renames must be supported from anywhere within the store to
+ * anywhere within the store. This generally means that all the contents of the store needs to live
+ * on the same filesystem.
+ *
+ * <p>Individual tasks are stored in files under the following directories using the sha1 of the
+ * task:
+ *
+ * <p><code>
+ *   .../building/<tmp_name>  new replication tasks under construction
+ *   .../running/<sha1>       running replication tasks
+ *   .../waiting/<sha1>       outstanding replication tasks
+ * </code>
+ *
+ * <p>Tasks are moved atomically via a rename between those directories to indicate the current
+ * state of each task.
+ */
 @Singleton
 public class ReplicationTasksStorage {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -44,6 +66,10 @@
     public final String uri;
     public final String remote;
 
+    public ReplicateRefUpdate(PushOne push, String ref) {
+      this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName());
+    }
+
     public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
       this.project = project;
       this.ref = ref;
@@ -60,28 +86,20 @@
   private static final Gson GSON = new Gson();
 
   private final Path refUpdates;
+  private final Path buildingUpdates;
+  private final Path runningUpdates;
+  private final Path waitingUpdates;
 
   @Inject
   ReplicationTasksStorage(ReplicationConfig config) {
     refUpdates = config.getEventsDirectory().resolve("ref-updates");
+    buildingUpdates = refUpdates.resolve("building");
+    runningUpdates = refUpdates.resolve("running");
+    waitingUpdates = refUpdates.resolve("waiting");
   }
 
-  public String persist(ReplicateRefUpdate r) {
-    String json = GSON.toJson(r) + "\n";
-    String eventKey = sha1(json).name();
-    Path file = refUpdates().resolve(eventKey);
-
-    if (Files.exists(file)) {
-      return eventKey;
-    }
-
-    try {
-      logger.atFine().log("CREATE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
-      Files.write(file, json.getBytes(UTF_8));
-    } catch (IOException e) {
-      logger.atWarning().withCause(e).log("Couldn't persist event %s", json);
-    }
-    return eventKey;
+  public synchronized String create(ReplicateRefUpdate r) {
+    return new Task(r).create();
   }
 
   @VisibleForTesting
@@ -89,37 +107,64 @@
     this.disableDeleteForTesting = deleteDisabled;
   }
 
-  public void delete(ReplicateRefUpdate r) {
-    String taskJson = GSON.toJson(r) + "\n";
-    String taskKey = sha1(taskJson).name();
-    Path file = refUpdates().resolve(taskKey);
-
-    if (disableDeleteForTesting) {
-      logger.atFine().log("DELETE %s (%s:%s => %s) DISABLED", file, r.project, r.ref, r.uri);
-      return;
-    }
-
-    try {
-      logger.atFine().log("DELETE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
-      Files.delete(file);
-    } catch (IOException e) {
-      logger.atSevere().withCause(e).log("Error while deleting event %s", taskKey);
+  public synchronized void start(PushOne push) {
+    for (String ref : push.getRefs()) {
+      new Task(new ReplicateRefUpdate(push, ref)).start();
     }
   }
 
-  public List<ReplicateRefUpdate> list() {
-    ArrayList<ReplicateRefUpdate> result = new ArrayList<>();
-    try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) {
+  public synchronized void reset(PushOne push) {
+    for (String ref : push.getRefs()) {
+      new Task(new ReplicateRefUpdate(push, ref)).reset();
+    }
+  }
+
+  public synchronized void resetAll() {
+    for (ReplicateRefUpdate r : listRunning()) {
+      new Task(r).reset();
+    }
+  }
+
+  public synchronized void finish(PushOne push) {
+    for (String ref : push.getRefs()) {
+      new Task(new ReplicateRefUpdate(push, ref)).finish();
+    }
+  }
+
+  public synchronized List<ReplicateRefUpdate> listWaiting() {
+    return list(createDir(waitingUpdates));
+  }
+
+  @VisibleForTesting
+  public synchronized List<ReplicateRefUpdate> listRunning() {
+    return list(createDir(runningUpdates));
+  }
+
+  @VisibleForTesting
+  public synchronized List<ReplicateRefUpdate> listBuilding() {
+    return list(createDir(buildingUpdates));
+  }
+
+  @VisibleForTesting
+  public synchronized List<ReplicateRefUpdate> list() {
+    return list(createDir(refUpdates));
+  }
+
+  private List<ReplicateRefUpdate> list(Path tasks) {
+    List<ReplicateRefUpdate> results = new ArrayList<>();
+    try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) {
       for (Path e : events) {
         if (Files.isRegularFile(e)) {
           String json = new String(Files.readAllBytes(e), UTF_8);
-          result.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+          results.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+        } else if (Files.isDirectory(e)) {
+          results.addAll(list(e));
         }
       }
     } catch (IOException e) {
-      logger.atSevere().withCause(e).log("Error when firing pending events");
+      logger.atSevere().withCause(e).log("Error while listing tasks");
     }
-    return result;
+    return results;
   }
 
   @SuppressWarnings("deprecation")
@@ -127,11 +172,80 @@
     return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
   }
 
-  private Path refUpdates() {
+  private static Path createDir(Path dir) {
     try {
-      return Files.createDirectories(refUpdates);
+      return Files.createDirectories(dir);
     } catch (IOException e) {
-      throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e);
+      throw new ProvisionException(String.format("Couldn't create %s", dir), e);
+    }
+  }
+
+  private class Task {
+    public final ReplicateRefUpdate update;
+    public final String json;
+    public final String taskKey;
+    public final Path running;
+    public final Path waiting;
+
+    public Task(ReplicateRefUpdate update) {
+      this.update = update;
+      json = GSON.toJson(update) + "\n";
+      String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
+      taskKey = sha1(key).name();
+      running = createDir(runningUpdates).resolve(taskKey);
+      waiting = createDir(waitingUpdates).resolve(taskKey);
+    }
+
+    public String create() {
+      if (Files.exists(waiting)) {
+        return taskKey;
+      }
+
+      try {
+        Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null);
+        logger.atFine().log("CREATE %s %s", tmp, updateLog());
+        Files.write(tmp, json.getBytes(UTF_8));
+        logger.atFine().log("RENAME %s %s %s", tmp, waiting, updateLog());
+        rename(tmp, waiting);
+      } catch (IOException e) {
+        logger.atWarning().withCause(e).log("Couldn't create task %s", json);
+      }
+      return taskKey;
+    }
+
+    public void start() {
+      rename(waiting, running);
+    }
+
+    public void reset() {
+      rename(running, waiting);
+    }
+
+    public void finish() {
+      if (disableDeleteForTesting) {
+        logger.atFine().log("DELETE %s %s DISABLED", running, updateLog());
+        return;
+      }
+
+      try {
+        logger.atFine().log("DELETE %s %s", running, updateLog());
+        Files.delete(running);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+      }
+    }
+
+    private void rename(Path from, Path to) {
+      try {
+        logger.atFine().log("RENAME %s to %s %s", from, to, updateLog());
+        Files.move(from, to, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while renaming task %s", taskKey);
+      }
+    }
+
+    private String updateLog() {
+      return String.format("(%s:%s => %s)", update.project, update.ref, update.uri);
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
index 18a4cc2..ed15b92 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
@@ -26,11 +26,11 @@
 import org.eclipse.jgit.util.FS;
 
 /** Looks up a remote's password in secure.config. */
-class SecureCredentialsFactory implements CredentialsFactory {
+public class SecureCredentialsFactory implements CredentialsFactory {
   private final Config config;
 
   @Inject
-  SecureCredentialsFactory(SitePaths site) throws ConfigInvalidException, IOException {
+  public SecureCredentialsFactory(SitePaths site) throws ConfigInvalidException, IOException {
     config = load(site);
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
index 70452b4..ffa6be1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
@@ -16,8 +16,8 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.ioutil.HexFormat;
 import com.google.gerrit.server.util.IdGenerator;
 import com.google.inject.Inject;
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 32bc630..7233061 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -142,6 +142,11 @@
 	persisted events will not be deleted. When the plugin is started again,
 	it will trigger all replications found under this directory.
 
+	For replication to work, is is important that atomic renames be possible
+	from within any subdirectory of the eventsDirectory to within any other
+	subdirectory of the eventsDirectory. This generally means that the entire
+	contents of the eventsDirectory should live on the same filesystem.
+
 	When not set, defaults to the plugin's data directory.
 
 remote.NAME.url
@@ -419,6 +424,15 @@
 	By default, replicates without matching, i.e. replicates
 	everything to all remotes.
 
+remote.NAME.slowLatencyThreshold
+:	the time duration after which the replication of a project to this
+	destination will be considered "slow". A slow project replication
+	will cause additional metrics to be exposed for further investigation.
+	See [metrics.md](metrics.md) for further details.
+
+	default: 15 minutes
+
+
 File `secure.config`
 --------------------
 
diff --git a/src/main/resources/Documentation/metrics.md b/src/main/resources/Documentation/metrics.md
new file mode 100644
index 0000000..ef8b150
--- /dev/null
+++ b/src/main/resources/Documentation/metrics.md
@@ -0,0 +1,61 @@
+# Metrics
+
+Some metrics are emitted when replication occurs to a remote destination.
+The granularity of the metrics recorded is at destination level, however when a particular project replication is flagged
+as slow. This happens when the replication took longer than allowed threshold (see _remote.NAME.slowLatencyThreshold_ in [config.md](config.md))
+
+The reason only slow metrics are published, rather than all, is to contain their number, which, on a big Gerrit installation
+could potentially be considerably big.
+
+### Project level
+
+* plugins_replication_latency_slower_than_<threshold>_<destinationName>_<ProjectName> - Time spent pushing <ProjectName> to remote <destinationName> (in ms)
+
+### Destination level
+
+* plugins_replication_replication_delay_<destinationName> - Time spent waiting before pushing to remote <destinationName> (in ms)
+* plugins_replication_replication_retries_<destinationName> - Number of retries when pushing to remote <destinationName>
+* plugins_replication_replication_latency_<destinationName> - Time spent pushing to remote <destinationName> (in ms)
+
+### Example
+```
+# HELP plugins_replication_replication_delay_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_delay/destination, type=com.codahale.metrics.Histogram)
+# TYPE plugins_replication_replication_delay_destination summary
+plugins_replication_replication_delay_destinationName{quantile="0.5",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.75",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.95",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.98",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.99",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.999",} 65726.0
+plugins_replication_replication_delay_destinationName_count 3.0
+
+# HELP plugins_replication_replication_retries_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_retries/destination, type=com.codahale.metrics.Histogram)
+# TYPE plugins_replication_replication_retries_destination summary
+plugins_replication_replication_retries_destinationName{quantile="0.5",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.75",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.95",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.98",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.99",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.999",} 1.0
+plugins_replication_replication_retries_destinationName_count 3.0
+
+# HELP plugins_replication_replication_latency_destinationName Generated from Dropwizard metric import (metric=plugins/replication/replication_latency/destinationName, type=com.codahale.metrics.Timer)
+# TYPE plugins_replication_replication_latency_destinationName summary
+plugins_replication_replication_latency_destinationName{quantile="0.5",} 0.21199641400000002
+plugins_replication_replication_latency_destinationName{quantile="0.75",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.95",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.98",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.99",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.999",} 0.321083881
+plugins_replication_replication_latency_destinationName_count 2.0
+
+# HELP plugins_replication_latency_slower_than_60_destinationName_projectName Generated from Dropwizard metric import (metric=plugins/replication/latency_slower_than/60/destinationName/projectName, type=com.codahale.metrics.Histogram)
+# TYPE plugins_replication_latency_slower_than_60_destinationName_projectName summary
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.5",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.75",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.95",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.98",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.99",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.999",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName 1.0
+```
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
index 77dc1cc..3932fb3 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
@@ -16,26 +16,28 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static java.nio.file.Files.createTempDirectory;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.getCurrentArguments;
-import static org.easymock.EasyMock.isA;
-import static org.easymock.EasyMock.replay;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+import com.google.common.eventbus.EventBus;
 import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Injector;
 import com.google.inject.Module;
+import com.google.inject.util.Providers;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.stream.Collectors;
-import org.easymock.IAnswer;
+import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.util.FS;
 import org.junit.Before;
 import org.junit.Ignore;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 @Ignore
 public abstract class AbstractConfigTest {
@@ -43,6 +45,10 @@
   protected final SitePaths sitePaths;
   protected final Destination.Factory destinationFactoryMock;
   protected final Path pluginDataPath;
+  protected ReplicationQueue replicationQueueMock;
+  protected WorkQueue workQueueMock;
+  protected EventBus eventBus = new EventBus();
+  protected FakeExecutorService executorService = new FakeExecutorService();
 
   static class FakeDestination extends Destination {
     public final DestinationConfiguration config;
@@ -53,11 +59,9 @@
     }
 
     private static Injector injectorMock() {
-      Injector injector = createNiceMock(Injector.class);
-      Injector childInjectorMock = createNiceMock(Injector.class);
-      expect(injector.createChildInjector((Module) anyObject())).andReturn(childInjectorMock);
-      replay(childInjectorMock);
-      replay(injector);
+      Injector injector = mock(Injector.class);
+      Injector childInjectorMock = mock(Injector.class);
+      when(injector.createChildInjector(any(Module.class))).thenReturn(childInjectorMock);
       return injector;
     }
   }
@@ -66,21 +70,25 @@
     sitePath = createTempPath("site");
     sitePaths = new SitePaths(sitePath);
     pluginDataPath = createTempPath("data");
-    destinationFactoryMock = createMock(Destination.Factory.class);
+    destinationFactoryMock = mock(Destination.Factory.class);
   }
 
   @Before
   public void setup() {
-    expect(destinationFactoryMock.create(isA(DestinationConfiguration.class)))
-        .andAnswer(
-            new IAnswer<Destination>() {
+    when(destinationFactoryMock.create(any(DestinationConfiguration.class)))
+        .thenAnswer(
+            new Answer<Destination>() {
               @Override
-              public Destination answer() throws Throwable {
-                return new FakeDestination((DestinationConfiguration) getCurrentArguments()[0]);
+              public Destination answer(InvocationOnMock invocation) throws Throwable {
+                return new FakeDestination((DestinationConfiguration) invocation.getArguments()[0]);
               }
-            })
-        .anyTimes();
-    replay(destinationFactoryMock);
+            });
+
+    replicationQueueMock = mock(ReplicationQueue.class);
+    when(replicationQueueMock.isRunning()).thenReturn(Boolean.TRUE);
+
+    workQueueMock = mock(WorkQueue.class);
+    when(workQueueMock.createQueue(anyInt(), any(String.class))).thenReturn(executorService);
   }
 
   protected static Path createTempPath(String prefix) throws IOException {
@@ -113,4 +121,13 @@
 
     assertThatIsDestination(matchingDestinations.get(0), remoteName, remoteUrls);
   }
+
+  protected DestinationsCollection newDestinationsCollections(
+      ReplicationFileBasedConfig replicationFileBasedConfig) throws ConfigInvalidException {
+    return new DestinationsCollection(
+        destinationFactoryMock,
+        Providers.of(replicationQueueMock),
+        replicationFileBasedConfig,
+        eventBus);
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
index 211cafa..d85f622 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
@@ -15,127 +15,19 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import static com.google.common.truth.Truth.assertThat;
-import static org.easymock.EasyMock.anyInt;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
 
-import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.util.Providers;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.junit.Before;
 import org.junit.Test;
 
 public class AutoReloadConfigDecoratorTest extends AbstractConfigTest {
-  private AutoReloadConfigDecorator autoReloadConfig;
-  private ReplicationQueue replicationQueueMock;
-  private WorkQueue workQueueMock;
-  private FakeExecutorService executorService = new FakeExecutorService();
-
-  public class FakeExecutorService implements ScheduledExecutorService {
-    public Runnable refreshCommand;
-
-    @Override
-    public void shutdown() {}
-
-    @Override
-    public List<Runnable> shutdownNow() {
-      return null;
-    }
-
-    @Override
-    public boolean isShutdown() {
-      return false;
-    }
-
-    @Override
-    public boolean isTerminated() {
-      return false;
-    }
-
-    @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
-      return false;
-    }
-
-    @Override
-    public <T> Future<T> submit(Callable<T> task) {
-      return null;
-    }
-
-    @Override
-    public <T> Future<T> submit(Runnable task, T result) {
-      return null;
-    }
-
-    @Override
-    public Future<?> submit(Runnable task) {
-      return null;
-    }
-
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException {
-      return null;
-    }
-
-    @Override
-    public <T> List<Future<T>> invokeAll(
-        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-        throws InterruptedException {
-      return null;
-    }
-
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException, ExecutionException {
-      return null;
-    }
-
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-      return null;
-    }
-
-    @Override
-    public void execute(Runnable command) {}
-
-    @Override
-    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
-      return null;
-    }
-
-    @Override
-    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
-      return null;
-    }
-
-    @Override
-    public ScheduledFuture<?> scheduleAtFixedRate(
-        Runnable command, long initialDelay, long period, TimeUnit unit) {
-      refreshCommand = command;
-      return null;
-    }
-
-    @Override
-    public ScheduledFuture<?> scheduleWithFixedDelay(
-        Runnable command, long initialDelay, long delay, TimeUnit unit) {
-      return null;
-    }
-  }
+  ReplicationFileBasedConfig replicationFileBasedConfig;
 
   public AutoReloadConfigDecoratorTest() throws IOException {
     super();
@@ -146,39 +38,7 @@
   public void setup() {
     super.setup();
 
-    setupMocks();
-  }
-
-  private void setupMocks() {
-    replicationQueueMock = createNiceMock(ReplicationQueue.class);
-    expect(replicationQueueMock.isRunning()).andReturn(true);
-    replay(replicationQueueMock);
-
-    workQueueMock = createNiceMock(WorkQueue.class);
-    expect(workQueueMock.createQueue(anyInt(), anyObject(String.class))).andReturn(executorService);
-    replay(workQueueMock);
-  }
-
-  @Test
-  public void shouldLoadNotEmptyInitialReplicationConfig() throws Exception {
-    FileBasedConfig replicationConfig = newReplicationConfig();
-    String remoteName = "foo";
-    String remoteUrl = "ssh://git@git.somewhere.com/${name}";
-    replicationConfig.setString("remote", remoteName, "url", remoteUrl);
-    replicationConfig.save();
-
-    autoReloadConfig =
-        new AutoReloadConfigDecorator(
-            sitePaths,
-            destinationFactoryMock,
-            Providers.of(replicationQueueMock),
-            pluginDataPath,
-            "replication",
-            workQueueMock);
-
-    List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL);
-    assertThat(destinations).hasSize(1);
-    assertThatIsDestination(destinations.get(0), remoteName, remoteUrl);
+    replicationFileBasedConfig = newReplicationFileBasedConfig();
   }
 
   @Test
@@ -190,17 +50,12 @@
     replicationConfig.setString("remote", remoteName1, "url", remoteUrl1);
     replicationConfig.save();
 
-    autoReloadConfig =
-        new AutoReloadConfigDecorator(
-            sitePaths,
-            destinationFactoryMock,
-            Providers.of(replicationQueueMock),
-            pluginDataPath,
-            "replication",
-            workQueueMock);
-    autoReloadConfig.startup(workQueueMock);
+    newAutoReloadConfig().start();
 
-    List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL);
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(replicationFileBasedConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(1);
     assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
 
@@ -212,7 +67,7 @@
     replicationConfig.save();
     executorService.refreshCommand.run();
 
-    destinations = autoReloadConfig.getDestinations(FilterType.ALL);
+    destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(2);
     assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
     assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
@@ -227,17 +82,10 @@
     replicationConfig.setString("remote", remoteName1, "url", remoteUrl1);
     replicationConfig.save();
 
-    autoReloadConfig =
-        new AutoReloadConfigDecorator(
-            sitePaths,
-            destinationFactoryMock,
-            Providers.of(replicationQueueMock),
-            pluginDataPath,
-            "replication",
-            workQueueMock);
-    autoReloadConfig.startup(workQueueMock);
-
-    List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL);
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(replicationFileBasedConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(1);
     assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
 
@@ -247,6 +95,23 @@
     replicationConfig.save();
     executorService.refreshCommand.run();
 
-    assertThat(autoReloadConfig.getDestinations(FilterType.ALL)).isEqualTo(destinations);
+    assertThat(destinationsCollections.getAll(FilterType.ALL)).isEqualTo(destinations);
+  }
+
+  private AutoReloadConfigDecorator newAutoReloadConfig() throws ConfigInvalidException {
+    AutoReloadRunnable autoReloadRunnable =
+        new AutoReloadRunnable(
+            newDestinationsCollections(replicationFileBasedConfig),
+            replicationFileBasedConfig,
+            sitePaths,
+            pluginDataPath,
+            eventBus,
+            Providers.of(replicationQueueMock));
+    return new AutoReloadConfigDecorator(
+        "replication", workQueueMock, replicationFileBasedConfig, autoReloadRunnable, eventBus);
+  }
+
+  private ReplicationFileBasedConfig newReplicationFileBasedConfig() {
+    return new ReplicationFileBasedConfig(sitePaths, pluginDataPath);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
new file mode 100644
index 0000000..b1aa7c8
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
@@ -0,0 +1,120 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.util.Providers;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoReloadRunnableTest {
+
+  private SitePaths sitePaths;
+  private EventBus eventBus;
+  private ReloadTrackerSubscriber onReloadSubscriber;
+  private String pluginName;
+  private ReplicationQueue replicationQueueMock;
+
+  @Before
+  public void setUp() throws IOException {
+    Path tmp = Files.createTempFile(pluginName, "_site");
+    Files.deleteIfExists(tmp);
+    sitePaths = new SitePaths(tmp);
+    pluginName = "replication";
+    eventBus = new EventBus();
+    onReloadSubscriber = new ReloadTrackerSubscriber();
+    eventBus.register(onReloadSubscriber);
+
+    replicationQueueMock = mock(ReplicationQueue.class);
+    when(replicationQueueMock.isRunning()).thenReturn(Boolean.TRUE);
+  }
+
+  @Test
+  public void configurationIsReloadedWhenValidationSucceeds() {
+    ReplicationConfigValidator validator = new TestValidConfigurationListener();
+
+    attemptAutoReload(validator);
+
+    assertThat(onReloadSubscriber.reloaded).isTrue();
+  }
+
+  @Test
+  public void configurationIsNotReloadedWhenValidationFails() {
+    ReplicationConfigValidator validator = new TestInvalidConfigurationListener();
+
+    attemptAutoReload(validator);
+
+    assertThat(onReloadSubscriber.reloaded).isFalse();
+  }
+
+  private void attemptAutoReload(ReplicationConfigValidator validator) {
+    final AutoReloadRunnable autoReloadRunnable =
+        new AutoReloadRunnable(
+            validator,
+            newVersionConfig(),
+            sitePaths,
+            sitePaths.data_dir,
+            eventBus,
+            Providers.of(replicationQueueMock));
+
+    autoReloadRunnable.run();
+  }
+
+  private ReplicationFileBasedConfig newVersionConfig() {
+    return new ReplicationFileBasedConfig(sitePaths, sitePaths.data_dir) {
+      @Override
+      public String getVersion() {
+        return String.format("%s", System.nanoTime());
+      }
+    };
+  }
+
+  private static class ReloadTrackerSubscriber {
+    public boolean reloaded = false;
+
+    @Subscribe
+    public void onReload(
+        @SuppressWarnings("unused") List<DestinationConfiguration> destinationConfigurations) {
+      reloaded = true;
+    }
+  }
+
+  private static class TestValidConfigurationListener implements ReplicationConfigValidator {
+    @Override
+    public List<RemoteConfiguration> validateConfig(ReplicationFileBasedConfig newConfig) {
+      return Collections.emptyList();
+    }
+  }
+
+  private static class TestInvalidConfigurationListener implements ReplicationConfigValidator {
+    @Override
+    public List<RemoteConfiguration> validateConfig(
+        ReplicationFileBasedConfig configurationChangeEvent) throws ConfigInvalidException {
+      throw new ConfigInvalidException("expected test failure");
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java b/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java
new file mode 100644
index 0000000..2f7059e
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java
@@ -0,0 +1,118 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class FakeExecutorService implements ScheduledExecutorService {
+  public Runnable refreshCommand = () -> {};
+
+  @Override
+  public void shutdown() {}
+
+  @Override
+  public List<Runnable> shutdownNow() {
+    return null;
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return false;
+  }
+
+  @Override
+  public boolean isTerminated() {
+    return false;
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+    return false;
+  }
+
+  @Override
+  public <T> Future<T> submit(Callable<T> task) {
+    return null;
+  }
+
+  @Override
+  public <T> Future<T> submit(Runnable task, T result) {
+    return null;
+  }
+
+  @Override
+  public Future<?> submit(Runnable task) {
+    return null;
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException {
+    return null;
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(
+      Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException {
+    return null;
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException, ExecutionException {
+    return null;
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    return null;
+  }
+
+  @Override
+  public void execute(Runnable command) {}
+
+  @Override
+  public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+    return null;
+  }
+
+  @Override
+  public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+    return null;
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleAtFixedRate(
+      Runnable command, long initialDelay, long period, TimeUnit unit) {
+    refreshCommand = command;
+    return null;
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleWithFixedDelay(
+      Runnable command, long initialDelay, long delay, TimeUnit unit) {
+    return null;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
index 0f6d629..2ee9a39 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
@@ -14,11 +14,10 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
-import static org.easymock.EasyMock.verify;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.permissions.PermissionBackendException;
@@ -36,14 +35,12 @@
 
   @Before
   public void setUp() throws Exception {
-    dispatcherMock = createMock(EventDispatcher.class);
-    replay(dispatcherMock);
+    dispatcherMock = mock(EventDispatcher.class);
     gitUpdateProcessing = new GitUpdateProcessing(dispatcherMock);
   }
 
   @Test
   public void headRefReplicated() throws URISyntaxException, PermissionBackendException {
-    reset(dispatcherMock);
     RefReplicatedEvent expectedEvent =
         new RefReplicatedEvent(
             "someProject",
@@ -51,9 +48,6 @@
             "someHost",
             RefPushResult.SUCCEEDED,
             RemoteRefUpdate.Status.OK);
-    dispatcherMock.postEvent(RefReplicatedEventEquals.eqEvent(expectedEvent));
-    expectLastCall().once();
-    replay(dispatcherMock);
 
     gitUpdateProcessing.onRefReplicatedToOneNode(
         "someProject",
@@ -61,12 +55,11 @@
         new URIish("git://someHost/someProject.git"),
         RefPushResult.SUCCEEDED,
         RemoteRefUpdate.Status.OK);
-    verify(dispatcherMock);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
   }
 
   @Test
   public void changeRefReplicated() throws URISyntaxException, PermissionBackendException {
-    reset(dispatcherMock);
     RefReplicatedEvent expectedEvent =
         new RefReplicatedEvent(
             "someProject",
@@ -74,9 +67,6 @@
             "someHost",
             RefPushResult.FAILED,
             RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD);
-    dispatcherMock.postEvent(RefReplicatedEventEquals.eqEvent(expectedEvent));
-    expectLastCall().once();
-    replay(dispatcherMock);
 
     gitUpdateProcessing.onRefReplicatedToOneNode(
         "someProject",
@@ -84,19 +74,15 @@
         new URIish("git://someHost/someProject.git"),
         RefPushResult.FAILED,
         RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD);
-    verify(dispatcherMock);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
   }
 
   @Test
   public void onAllNodesReplicated() throws PermissionBackendException {
-    reset(dispatcherMock);
     RefReplicationDoneEvent expectedDoneEvent =
         new RefReplicationDoneEvent("someProject", "refs/heads/master", 5);
-    dispatcherMock.postEvent(RefReplicationDoneEventEquals.eqEvent(expectedDoneEvent));
-    expectLastCall().once();
-    replay(dispatcherMock);
 
     gitUpdateProcessing.onRefReplicatedToAllNodes("someProject", "refs/heads/master", 5);
-    verify(dispatcherMock);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedDoneEvent));
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index c010ddb..c09fcd1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -14,19 +14,18 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import static com.googlesource.gerrit.plugins.replication.RemoteRefUpdateCollectionMatcher.eqRemoteRef;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.getCurrentArguments;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.eclipse.jgit.lib.Ref.Storage.NEW;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.metrics.Timer1;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.PerThreadRequestScope;
 import com.google.gerrit.server.permissions.PermissionBackend;
@@ -45,8 +44,6 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import junit.framework.AssertionFailedError;
-import org.easymock.IAnswer;
 import org.eclipse.jgit.errors.NotSupportedException;
 import org.eclipse.jgit.errors.TransportException;
 import org.eclipse.jgit.lib.Config;
@@ -68,6 +65,8 @@
 import org.eclipse.jgit.util.FS;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class PushOneTest {
   private static final int TEST_PUSH_TIMEOUT_SECS = 10;
@@ -86,7 +85,7 @@
   private IdGenerator idGeneratorMock;
   private ReplicationStateListeners replicationStateListenersMock;
   private ReplicationMetrics replicationMetricsMock;
-  private Timer1.Context timerContextMock;
+  private Timer1.Context<String> timerContextMock;
   private ProjectCache projectCacheMock;
   private TransportFactory transportFactoryMock;
   private Transport transportMock;
@@ -108,7 +107,7 @@
 
   @Before
   public void setup() throws Exception {
-    projectNameKey = new Project.NameKey("fooProject");
+    projectNameKey = Project.nameKey("fooProject");
     urish = new URIish("http://foo.com/fooProject.git");
 
     newLocalRef =
@@ -126,6 +125,7 @@
     setupMocks();
   }
 
+  @SuppressWarnings("unchecked")
   private void setupMocks() throws Exception {
     FileBasedConfig config = new FileBasedConfig(new Config(), new File("/foo"), FS.DETECTED);
     config.setString("remote", "Replication", "push", "foo");
@@ -134,8 +134,8 @@
     setupRepositoryMock(config);
     setupGitRepoManagerMock();
 
-    projectStateMock = createNiceMock(ProjectState.class);
-    forProjectMock = createNiceMock(ForProject.class);
+    projectStateMock = mock(ProjectState.class);
+    forProjectMock = mock(ForProject.class);
     setupWithUserMock();
     setupPermissionBackedMock();
 
@@ -144,46 +144,22 @@
     setupRefSpecMock();
     setupRemoteConfigMock();
 
-    credentialsFactory = createNiceMock(CredentialsFactory.class);
+    credentialsFactory = mock(CredentialsFactory.class);
 
     setupFetchConnectionMock();
     setupPushConnectionMock();
     setupRequestScopeMock();
-    idGeneratorMock = createNiceMock(IdGenerator.class);
-    replicationStateListenersMock = createNiceMock(ReplicationStateListeners.class);
+    idGeneratorMock = mock(IdGenerator.class);
+    replicationStateListenersMock = mock(ReplicationStateListeners.class);
 
-    timerContextMock = createNiceMock(Timer1.Context.class);
+    timerContextMock = mock(Timer1.Context.class);
     setupReplicationMetricsMock();
 
     setupTransportMock();
 
     setupProjectCacheMock();
 
-    replicationConfigMock = createNiceMock(ReplicationConfig.class);
-
-    replay(
-        gitRepositoryManagerMock,
-        refUpdateMock,
-        repositoryMock,
-        permissionBackendMock,
-        destinationMock,
-        remoteConfigMock,
-        credentialsFactory,
-        threadRequestScoperMock,
-        idGeneratorMock,
-        replicationStateListenersMock,
-        replicationMetricsMock,
-        projectCacheMock,
-        timerContextMock,
-        transportFactoryMock,
-        projectStateMock,
-        withUserMock,
-        forProjectMock,
-        fetchConnection,
-        pushConnection,
-        refSpecMock,
-        refDatabaseMock,
-        replicationConfigMock);
+    replicationConfigMock = mock(ReplicationConfig.class);
   }
 
   @Test
@@ -212,10 +188,7 @@
 
     PushResult pushResult = new PushResult();
 
-    expect(transportMock.push(anyObject(), eqRemoteRef(expectedUpdates)))
-        .andReturn(pushResult)
-        .once();
-    replay(transportMock);
+    when(transportMock.push(any(), eq(expectedUpdates))).thenReturn(pushResult);
 
     PushOne pushOne = createPushOne(replicationPushFilter);
 
@@ -223,8 +196,6 @@
     pushOne.run();
 
     isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
-
-    verify(transportMock);
   }
 
   @Test
@@ -241,12 +212,6 @@
               }
             });
 
-    // easymock way to check if method was never called
-    expect(transportMock.push(anyObject(), anyObject()))
-        .andThrow(new AssertionFailedError())
-        .anyTimes();
-    replay(transportMock);
-
     PushOne pushOne = createPushOne(replicationPushFilter);
 
     pushOne.addRef(PushOne.ALL_REFS);
@@ -254,7 +219,7 @@
 
     isCallFinished.await(10, TimeUnit.SECONDS);
 
-    verify(transportMock);
+    verify(transportMock, never()).push(any(), any());
   }
 
   private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
@@ -281,31 +246,31 @@
   }
 
   private void setupProjectCacheMock() throws IOException {
-    projectCacheMock = createNiceMock(ProjectCache.class);
-    expect(projectCacheMock.checkedGet(projectNameKey)).andReturn(projectStateMock);
+    projectCacheMock = mock(ProjectCache.class);
+    when(projectCacheMock.checkedGet(projectNameKey)).thenReturn(projectStateMock);
   }
 
   private void setupTransportMock() throws NotSupportedException, TransportException {
-    transportMock = createNiceMock(Transport.class);
-    expect(transportMock.openFetch()).andReturn(fetchConnection);
-    transportFactoryMock = createNiceMock(TransportFactory.class);
-    expect(transportFactoryMock.open(repositoryMock, urish)).andReturn(transportMock).anyTimes();
+    transportMock = mock(Transport.class);
+    when(transportMock.openFetch()).thenReturn(fetchConnection);
+    transportFactoryMock = mock(TransportFactory.class);
+    when(transportFactoryMock.open(repositoryMock, urish)).thenReturn(transportMock);
   }
 
   private void setupReplicationMetricsMock() {
-    replicationMetricsMock = createNiceMock(ReplicationMetrics.class);
-    expect(replicationMetricsMock.start(anyObject())).andReturn(timerContextMock);
+    replicationMetricsMock = mock(ReplicationMetrics.class);
+    when(replicationMetricsMock.start(any())).thenReturn(timerContextMock);
   }
 
   private void setupRequestScopeMock() {
-    threadRequestScoperMock = createNiceMock(PerThreadRequestScope.Scoper.class);
-    expect(threadRequestScoperMock.scope(anyObject()))
-        .andAnswer(
-            new IAnswer<Callable<Object>>() {
+    threadRequestScoperMock = mock(PerThreadRequestScope.Scoper.class);
+    when(threadRequestScoperMock.scope(any()))
+        .thenAnswer(
+            new Answer<Callable<Object>>() {
               @SuppressWarnings("unchecked")
               @Override
-              public Callable<Object> answer() throws Throwable {
-                Callable<Object> originalCall = (Callable<Object>) getCurrentArguments()[0];
+              public Callable<Object> answer(InvocationOnMock invocation) throws Throwable {
+                Callable<Object> originalCall = (Callable<Object>) invocation.getArguments()[0];
                 return new Callable<Object>() {
 
                   @Override
@@ -316,66 +281,64 @@
                   }
                 };
               }
-            })
-        .anyTimes();
+            });
   }
 
   private void setupPushConnectionMock() {
-    pushConnection = createNiceMock(PushConnection.class);
-    expect(pushConnection.getRefsMap()).andReturn(remoteRefs);
+    pushConnection = mock(PushConnection.class);
+    when(pushConnection.getRefsMap()).thenReturn(remoteRefs);
   }
 
   private void setupFetchConnectionMock() {
-    fetchConnection = createNiceMock(FetchConnection.class);
-    expect(fetchConnection.getRefsMap()).andReturn(remoteRefs);
+    fetchConnection = mock(FetchConnection.class);
+    when(fetchConnection.getRefsMap()).thenReturn(remoteRefs);
   }
 
   private void setupRemoteConfigMock() {
-    remoteConfigMock = createNiceMock(RemoteConfig.class);
-    expect(remoteConfigMock.getPushRefSpecs()).andReturn(ImmutableList.of(refSpecMock));
+    remoteConfigMock = mock(RemoteConfig.class);
+    when(remoteConfigMock.getPushRefSpecs()).thenReturn(ImmutableList.of(refSpecMock));
   }
 
   private void setupRefSpecMock() {
-    refSpecMock = createNiceMock(RefSpec.class);
-    expect(refSpecMock.matchSource(anyObject(String.class))).andReturn(true);
-    expect(refSpecMock.expandFromSource(anyObject(String.class))).andReturn(refSpecMock);
-    expect(refSpecMock.getDestination()).andReturn("fooProject").anyTimes();
-    expect(refSpecMock.isForceUpdate()).andReturn(false).anyTimes();
+    refSpecMock = mock(RefSpec.class);
+    when(refSpecMock.matchSource(any(String.class))).thenReturn(true);
+    when(refSpecMock.expandFromSource(any(String.class))).thenReturn(refSpecMock);
+    when(refSpecMock.getDestination()).thenReturn("fooProject");
+    when(refSpecMock.isForceUpdate()).thenReturn(false);
   }
 
   private void setupDestinationMock() {
-    destinationMock = createNiceMock(Destination.class);
-    expect(destinationMock.requestRunway(anyObject())).andReturn(RunwayStatus.allowed());
+    destinationMock = mock(Destination.class);
+    when(destinationMock.requestRunway(any())).thenReturn(RunwayStatus.allowed());
   }
 
   private void setupPermissionBackedMock() {
-    permissionBackendMock = createNiceMock(PermissionBackend.class);
-    expect(permissionBackendMock.currentUser()).andReturn(withUserMock);
+    permissionBackendMock = mock(PermissionBackend.class);
+    when(permissionBackendMock.currentUser()).thenReturn(withUserMock);
   }
 
   private void setupWithUserMock() {
-    withUserMock = createNiceMock(WithUser.class);
-    expect(withUserMock.project(projectNameKey)).andReturn(forProjectMock);
+    withUserMock = mock(WithUser.class);
+    when(withUserMock.project(projectNameKey)).thenReturn(forProjectMock);
   }
 
   private void setupGitRepoManagerMock() throws IOException {
-    gitRepositoryManagerMock = createNiceMock(GitRepositoryManager.class);
-    expect(gitRepositoryManagerMock.openRepository(projectNameKey)).andReturn(repositoryMock);
+    gitRepositoryManagerMock = mock(GitRepositoryManager.class);
+    when(gitRepositoryManagerMock.openRepository(projectNameKey)).thenReturn(repositoryMock);
   }
 
   private void setupRepositoryMock(FileBasedConfig config) throws IOException {
-    repositoryMock = createNiceMock(Repository.class);
-    refDatabaseMock = createNiceMock(RefDatabase.class);
-    expect(repositoryMock.getConfig()).andReturn(config).anyTimes();
-    expect(repositoryMock.getRefDatabase()).andReturn(refDatabaseMock);
-    expect(refDatabaseMock.getRefs()).andReturn(localRefs);
-    expect(repositoryMock.updateRef("fooProject")).andReturn(refUpdateMock);
+    repositoryMock = mock(Repository.class);
+    refDatabaseMock = mock(RefDatabase.class);
+    when(repositoryMock.getConfig()).thenReturn(config);
+    when(repositoryMock.getRefDatabase()).thenReturn(refDatabaseMock);
+    when(refDatabaseMock.getRefs()).thenReturn(localRefs);
+    when(repositoryMock.updateRef("fooProject")).thenReturn(refUpdateMock);
   }
 
   private void setupRefUpdateMock() {
-    refUpdateMock = createNiceMock(RefUpdate.class);
-    expect(refUpdateMock.getOldObjectId())
-        .andReturn(ObjectId.fromString("0000000000000000000000000000000000000001"))
-        .anyTimes();
+    refUpdateMock = mock(RefUpdate.class);
+    when(refUpdateMock.getOldObjectId())
+        .thenReturn(ObjectId.fromString("0000000000000000000000000000000000000001"));
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java
deleted file mode 100644
index 983e97f..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java
+++ /dev/null
@@ -1,83 +0,0 @@
-// Copyright (C) 2013 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-
-public class RefReplicatedEventEquals implements IArgumentMatcher {
-
-  private RefReplicatedEvent expected;
-
-  public RefReplicatedEventEquals(RefReplicatedEvent expected) {
-    this.expected = expected;
-  }
-
-  public static final RefReplicatedEvent eqEvent(RefReplicatedEvent refReplicatedEvent) {
-    EasyMock.reportMatcher(new RefReplicatedEventEquals(refReplicatedEvent));
-    return null;
-  }
-
-  @Override
-  public boolean matches(Object actual) {
-    if (!(actual instanceof RefReplicatedEvent)) {
-      return false;
-    }
-    RefReplicatedEvent actualRefReplicatedEvent = (RefReplicatedEvent) actual;
-    if (!equals(expected.project, actualRefReplicatedEvent.project)) {
-      return false;
-    }
-    if (!equals(expected.ref, actualRefReplicatedEvent.ref)) {
-      return false;
-    }
-    if (!equals(expected.targetNode, actualRefReplicatedEvent.targetNode)) {
-      return false;
-    }
-    if (!equals(expected.status, actualRefReplicatedEvent.status)) {
-      return false;
-    }
-    if (!equals(expected.refStatus, actualRefReplicatedEvent.refStatus)) {
-      return false;
-    }
-    return true;
-  }
-
-  private static boolean equals(Object object1, Object object2) {
-    if (object1 == object2) {
-      return true;
-    }
-    if (object1 != null && !object1.equals(object2)) {
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public void appendTo(StringBuffer buffer) {
-    buffer.append("eqEvent(");
-    buffer.append(expected.getClass().getName());
-    buffer.append(" with project \"");
-    buffer.append(expected.project);
-    buffer.append("\" and ref \"");
-    buffer.append(expected.ref);
-    buffer.append("\" and targetNode \"");
-    buffer.append(expected.targetNode);
-    buffer.append("\" and status \"");
-    buffer.append(expected.status);
-    buffer.append("\" and refStatus \"");
-    buffer.append(expected.refStatus);
-    buffer.append("\")");
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java
deleted file mode 100644
index d1284e1..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright (C) 2013 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-
-public class RefReplicationDoneEventEquals implements IArgumentMatcher {
-
-  private RefReplicationDoneEvent expected;
-
-  public RefReplicationDoneEventEquals(RefReplicationDoneEvent expected) {
-    this.expected = expected;
-  }
-
-  public static final RefReplicationDoneEvent eqEvent(RefReplicationDoneEvent refReplicatedEvent) {
-    EasyMock.reportMatcher(new RefReplicationDoneEventEquals(refReplicatedEvent));
-    return null;
-  }
-
-  @Override
-  public boolean matches(Object actual) {
-    if (!(actual instanceof RefReplicationDoneEvent)) {
-      return false;
-    }
-    RefReplicationDoneEvent actualRefReplicatedDoneEvent = (RefReplicationDoneEvent) actual;
-    if (!equals(expected.project, actualRefReplicatedDoneEvent.project)) {
-      return false;
-    }
-    if (!equals(expected.ref, actualRefReplicatedDoneEvent.ref)) {
-      return false;
-    }
-    if (expected.nodesCount != actualRefReplicatedDoneEvent.nodesCount) {
-      return false;
-    }
-    return true;
-  }
-
-  private static boolean equals(Object object1, Object object2) {
-    if (object1 == object2) {
-      return true;
-    }
-    if (object1 != null && !object1.equals(object2)) {
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public void appendTo(StringBuffer buffer) {
-    buffer.append("eqEvent(");
-    buffer.append(expected.getClass().getName());
-    buffer.append(" with project \"");
-    buffer.append(expected.project);
-    buffer.append("\" and ref \"");
-    buffer.append(expected.ref);
-    buffer.append("\" and nodesCount \"");
-    buffer.append(expected.nodesCount);
-    buffer.append("\")");
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
deleted file mode 100644
index 111a792..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import java.util.Collection;
-import java.util.Objects;
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-import org.eclipse.jgit.transport.RemoteRefUpdate;
-
-public class RemoteRefUpdateCollectionMatcher implements IArgumentMatcher {
-  Collection<RemoteRefUpdate> expectedRemoteRefs;
-
-  public static Collection<RemoteRefUpdate> eqRemoteRef(
-      Collection<RemoteRefUpdate> expectedRemoteRefs) {
-    EasyMock.reportMatcher(new RemoteRefUpdateCollectionMatcher(expectedRemoteRefs));
-    return null;
-  }
-
-  public RemoteRefUpdateCollectionMatcher(Collection<RemoteRefUpdate> expectedRemoteRefs) {
-    this.expectedRemoteRefs = expectedRemoteRefs;
-  }
-
-  @Override
-  public boolean matches(Object argument) {
-    if (!(argument instanceof Collection)) return false;
-
-    @SuppressWarnings("unchecked")
-    Collection<RemoteRefUpdate> refs = (Collection<RemoteRefUpdate>) argument;
-
-    if (expectedRemoteRefs.size() != refs.size()) return false;
-    return refs.stream()
-        .allMatch(
-            ref -> expectedRemoteRefs.stream().anyMatch(expectedRef -> compare(ref, expectedRef)));
-  }
-
-  @Override
-  public void appendTo(StringBuffer buffer) {
-    buffer.append("expected:" + expectedRemoteRefs.toString());
-  }
-
-  private boolean compare(RemoteRefUpdate ref, RemoteRefUpdate expectedRef) {
-    return Objects.equals(ref.getRemoteName(), expectedRef.getRemoteName())
-        && Objects.equals(ref.getStatus(), expectedRef.getStatus())
-        && Objects.equals(ref.getExpectedOldObjectId(), expectedRef.getExpectedOldObjectId())
-        && Objects.equals(ref.getNewObjectId(), expectedRef.getNewObjectId())
-        && Objects.equals(ref.isFastForward(), expectedRef.isFastForward())
-        && Objects.equals(ref.getSrcRef(), expectedRef.getSrcRef())
-        && Objects.equals(ref.isForceUpdate(), expectedRef.isForceUpdate())
-        && Objects.equals(ref.getMessage(), expectedRef.getMessage());
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
index 36cc209..f2b027c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
@@ -19,7 +19,6 @@
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import java.io.IOException;
 import java.util.List;
-import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.junit.Test;
 
@@ -37,8 +36,10 @@
     config.setString("remote", remoteName, "url", remoteUrl);
     config.save();
 
-    ReplicationFileBasedConfig replicationConfig = newReplicationFileBasedConfig();
-    List<Destination> destinations = replicationConfig.getDestinations(FilterType.ALL);
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(newReplicationFileBasedConfig());
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(1);
 
     assertThatIsDestination(destinations.get(0), remoteName, remoteUrl);
@@ -55,18 +56,19 @@
     config.setString("remote", remoteName2, "url", remoteUrl2);
     config.save();
 
-    ReplicationFileBasedConfig replicationConfig = newReplicationFileBasedConfig();
-    List<Destination> destinations = replicationConfig.getDestinations(FilterType.ALL);
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(newReplicationFileBasedConfig());
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(2);
 
     assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
     assertThatIsDestination(destinations.get(1), remoteName2, remoteUrl2);
   }
 
-  private ReplicationFileBasedConfig newReplicationFileBasedConfig()
-      throws ConfigInvalidException, IOException {
+  private ReplicationFileBasedConfig newReplicationFileBasedConfig() {
     ReplicationFileBasedConfig replicationConfig =
-        new ReplicationFileBasedConfig(sitePaths, destinationFactoryMock, pluginDataPath);
+        new ReplicationFileBasedConfig(sitePaths, pluginDataPath);
     return replicationConfig;
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index a6a8ac8..9cf5489 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -24,10 +24,10 @@
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.extensions.api.projects.BranchInput;
 import com.google.gerrit.extensions.common.ProjectInfo;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import com.google.inject.Key;
@@ -100,7 +100,7 @@
 
     assertThat(listReplicationTasks("refs/meta/config")).hasSize(1);
 
-    waitUntil(() -> projectExists(new Project.NameKey(sourceProject + "replica.git")));
+    waitUntil(() -> projectExists(Project.nameKey(sourceProject + "replica.git")));
 
     ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get();
     assertThat(replicaProject).isNotNull();
@@ -115,7 +115,7 @@
 
     Result pushResult = createChange();
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1);
 
@@ -164,7 +164,7 @@
 
     Result pushResult = createChange();
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
 
@@ -267,10 +267,10 @@
     setReplicationDestination(remoteName, "replica", ALL_PROJECTS);
 
     Result pushResult = createChange();
-    shutdownConfig();
+    shutdownDestinations();
 
     pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     assertThrows(
         InterruptedException.class,
@@ -293,10 +293,10 @@
     reloadConfig();
 
     Result pushResult = createChange();
-    shutdownConfig();
+    shutdownDestinations();
 
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -318,7 +318,7 @@
     replicationQueueStart();
 
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -357,6 +357,7 @@
     config.setStringList("remote", remoteName, "url", replicaUrls);
     config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
     project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+    config.setBoolean("gerrit", null, "autoReload", true);
     config.save();
   }
 
@@ -365,11 +366,11 @@
   }
 
   private void reloadConfig() {
-    getAutoReloadConfigDecoratorInstance().forceReload();
+    getAutoReloadConfigDecoratorInstance().reload();
   }
 
-  private void shutdownConfig() {
-    getAutoReloadConfigDecoratorInstance().shutdown();
+  private void shutdownDestinations() {
+    getInstance(DestinationsCollection.class).shutdown();
   }
 
   private void replicationQueueStart() {
@@ -403,10 +404,18 @@
         .collect(toList());
   }
 
-  private void cleanupReplicationTasks() throws IOException {
-    try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) {
+  public void cleanupReplicationTasks() throws IOException {
+    cleanupReplicationTasks(storagePath);
+  }
+
+  private void cleanupReplicationTasks(Path basePath) throws IOException {
+    try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) {
       for (Path path : files) {
-        path.toFile().delete();
+        if (Files.isDirectory(path)) {
+          cleanupReplicationTasks(path);
+        } else {
+          path.toFile().delete();
+        }
       }
     }
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
index 193af1e..e0de577 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
@@ -15,11 +15,9 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import static com.google.common.truth.Truth.assertThat;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.resetToDefault;
-import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
 import java.net.URISyntaxException;
@@ -35,8 +33,7 @@
 
   @Before
   public void setUp() throws Exception {
-    pushResultProcessingMock = createNiceMock(PushResultProcessing.class);
-    replay(pushResultProcessingMock);
+    pushResultProcessingMock = mock(PushResultProcessing.class);
     replicationState = new ReplicationState(pushResultProcessingMock);
   }
 
@@ -53,52 +50,36 @@
 
   @Test
   public void shouldFireOneReplicationEventWhenNothingToReplicate() {
-    resetToDefault(pushResultProcessingMock);
-
-    // expected event
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(0);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.markAllPushTasksScheduled();
-    verify(pushResultProcessingMock);
+
+    // expected event
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(0);
   }
 
   @Test
   public void shouldFireEventsForReplicationOfOneRefToOneNode() throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri = new URIish("git://someHost/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "someRef", 1);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(1);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "someRef");
     replicationState.markAllPushTasksScheduled();
     replicationState.notifyRefReplicated(
         "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "someRef", 1);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(1);
   }
 
   @Test
   public void shouldFireEventsForReplicationOfOneRefToMultipleNodes() throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri1 = new URIish("git://someHost1/someRepo.git");
     URIish uri2 = new URIish("git://someHost2/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "someRef", uri2, RefPushResult.FAILED, RemoteRefUpdate.Status.NON_EXISTING);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "someRef", 2);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "someRef");
     replicationState.increasePushTaskCount("someProject", "someRef");
@@ -107,33 +88,29 @@
         "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.notifyRefReplicated(
         "someProject", "someRef", uri2, RefPushResult.FAILED, RemoteRefUpdate.Status.NON_EXISTING);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject",
+            "someRef",
+            uri2,
+            RefPushResult.FAILED,
+            RemoteRefUpdate.Status.NON_EXISTING);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "someRef", 2);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2);
   }
 
   @Test
   public void shouldFireEventsForReplicationOfMultipleRefsToMultipleNodes()
       throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri1 = new URIish("git://host1/someRepo.git");
     URIish uri2 = new URIish("git://host2/someRepo.git");
     URIish uri3 = new URIish("git://host3/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri3, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref1", 3);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref2", 2);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(5);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "ref1");
     replicationState.increasePushTaskCount("someProject", "ref1");
@@ -151,24 +128,32 @@
         "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.notifyRefReplicated(
         "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri3, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref1", 3);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref2", 2);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(5);
   }
 
   @Test
   public void shouldFireEventsForReplicationSameRefDifferentProjects() throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri = new URIish("git://host1/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("project1", "ref1", 1);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("project2", "ref2", 1);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("project1", "ref1");
     replicationState.increasePushTaskCount("project2", "ref2");
@@ -177,25 +162,24 @@
         "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.notifyRefReplicated(
         "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("project1", "ref1", 1);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("project2", "ref2", 1);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2);
   }
 
   @Test
   public void shouldFireEventsWhenSomeReplicationCompleteBeforeAllTasksAreScheduled()
       throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri1 = new URIish("git://host1/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref1", 1);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref2", 1);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "ref1");
     replicationState.increasePushTaskCount("someProject", "ref2");
@@ -204,7 +188,17 @@
     replicationState.notifyRefReplicated(
         "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.markAllPushTasksScheduled();
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref1", 1);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref2", 1);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2);
   }
 
   @Test