Merge branch 'stable-3.0'

* stable-3.0:
  Clarify that starred-changes ref is not needed on slaves

Change-Id: I4248889146c31d8f956dd0b3ff7a3c4881b97773
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
index acbf763..f3d2103 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 
 public interface AdminApi {
   public boolean createProject(Project.NameKey project, String head);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 945f869..c043baf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -14,127 +14,45 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Multimap;
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.common.FileUtil;
-import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
 import com.google.gerrit.extensions.annotations.PluginName;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
-import com.google.inject.Provider;
 import com.google.inject.Singleton;
-import java.io.IOException;
 import java.nio.file.Path;
-import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import org.eclipse.jgit.errors.ConfigInvalidException;
-import org.eclipse.jgit.transport.URIish;
 
 @Singleton
-public class AutoReloadConfigDecorator implements ReplicationConfig {
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+public class AutoReloadConfigDecorator implements ReplicationConfig, LifecycleListener {
   private static final long RELOAD_DELAY = 120;
   private static final long RELOAD_INTERVAL = 60;
 
   private volatile ReplicationFileBasedConfig currentConfig;
-  private long currentConfigTs;
-  private long lastFailedConfigTs;
 
-  private final SitePaths site;
-  private final Destination.Factory destinationFactory;
-  private final Path pluginDataDir;
-  // Use Provider<> instead of injecting the ReplicationQueue because of circular dependency with
-  // ReplicationConfig
-  private final Provider<ReplicationQueue> replicationQueue;
   private final ScheduledExecutorService autoReloadExecutor;
   private ScheduledFuture<?> autoReloadRunnable;
-
-  private volatile boolean shuttingDown;
+  private final AutoReloadRunnable reloadRunner;
 
   @Inject
   public AutoReloadConfigDecorator(
-      SitePaths site,
-      Destination.Factory destinationFactory,
-      Provider<ReplicationQueue> replicationQueue,
-      @PluginData Path pluginDataDir,
       @PluginName String pluginName,
-      WorkQueue workQueue)
-      throws ConfigInvalidException, IOException {
-    this.site = site;
-    this.destinationFactory = destinationFactory;
-    this.pluginDataDir = pluginDataDir;
-    this.currentConfig = loadConfig();
-    this.currentConfigTs = getLastModified(currentConfig);
-    this.replicationQueue = replicationQueue;
+      WorkQueue workQueue,
+      ReplicationFileBasedConfig replicationConfig,
+      AutoReloadRunnable reloadRunner,
+      EventBus eventBus) {
+    this.currentConfig = replicationConfig;
     this.autoReloadExecutor = workQueue.createQueue(1, pluginName + "_auto-reload-config");
-  }
-
-  private static long getLastModified(ReplicationFileBasedConfig cfg) {
-    return FileUtil.lastModified(cfg.getCfgPath());
-  }
-
-  private ReplicationFileBasedConfig loadConfig() throws ConfigInvalidException, IOException {
-    return new ReplicationFileBasedConfig(site, destinationFactory, pluginDataDir);
-  }
-
-  private synchronized boolean isAutoReload() {
-    return currentConfig.getConfig().getBoolean("gerrit", "autoReload", false);
-  }
-
-  @Override
-  public synchronized List<Destination> getDestinations(FilterType filterType) {
-    return currentConfig.getDestinations(filterType);
-  }
-
-  @Override
-  public synchronized Multimap<Destination, URIish> getURIs(
-      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
-    return currentConfig.getURIs(remoteName, projectName, filterType);
-  }
-
-  private synchronized void reloadIfNeeded() {
-    reload(false);
+    this.reloadRunner = reloadRunner;
+    eventBus.register(this);
   }
 
   @VisibleForTesting
-  public void forceReload() {
-    reload(true);
-  }
-
-  private void reload(boolean force) {
-    if (force || isAutoReload()) {
-      ReplicationQueue queue = replicationQueue.get();
-
-      long lastModified = getLastModified(currentConfig);
-      try {
-        if (force
-            || (!shuttingDown
-                && lastModified > currentConfigTs
-                && lastModified > lastFailedConfigTs
-                && queue.isRunning()
-                && !queue.isReplaying())) {
-          queue.stop();
-          currentConfig = loadConfig();
-          currentConfigTs = lastModified;
-          lastFailedConfigTs = 0;
-          logger.atInfo().log(
-              "Configuration reloaded: %d destinations",
-              currentConfig.getDestinations(FilterType.ALL).size());
-        }
-      } catch (Exception e) {
-        logger.atSevere().withCause(e).log(
-            "Cannot reload replication configuration: keeping existing settings");
-        lastFailedConfigTs = lastModified;
-        return;
-      } finally {
-        queue.start();
-      }
-    }
+  public void reload() {
+    reloadRunner.reload();
   }
 
   @Override
@@ -148,13 +66,13 @@
   }
 
   @Override
-  public synchronized int getMaxRefsToLog() {
-    return currentConfig.getMaxRefsToLog();
+  public int getDistributionInterval() {
+    return currentConfig.getDistributionInterval();
   }
 
   @Override
-  public synchronized boolean isEmpty() {
-    return currentConfig.isEmpty();
+  public synchronized int getMaxRefsToLog() {
+    return currentConfig.getMaxRefsToLog();
   }
 
   @Override
@@ -162,36 +80,32 @@
     return currentConfig.getEventsDirectory();
   }
 
-  /* shutdown() cannot be set as a synchronized method because
-   * it may need to wait for pending events to complete;
-   * e.g. when enabling the drain of replication events before
-   * shutdown.
-   *
-   * As a rule of thumb for synchronized methods, because they
-   * implicitly define a critical section and associated lock,
-   * they should never hold waiting for another resource, otherwise
-   * the risk of deadlock is very high.
-   *
-   * See more background about deadlocks, what they are and how to
-   * prevent them at: https://en.wikipedia.org/wiki/Deadlock
-   */
   @Override
-  public int shutdown() {
-    this.shuttingDown = true;
-    if (autoReloadRunnable != null) {
-      autoReloadRunnable.cancel(false);
-      autoReloadRunnable = null;
-    }
-    return currentConfig.shutdown();
+  public synchronized void start() {
+    autoReloadRunnable =
+        autoReloadExecutor.scheduleAtFixedRate(
+            reloadRunner, RELOAD_DELAY, RELOAD_INTERVAL, TimeUnit.SECONDS);
   }
 
   @Override
-  public synchronized void startup(WorkQueue workQueue) {
-    shuttingDown = false;
-    currentConfig.startup(workQueue);
-    autoReloadRunnable =
-        autoReloadExecutor.scheduleAtFixedRate(
-            this::reloadIfNeeded, RELOAD_DELAY, RELOAD_INTERVAL, TimeUnit.SECONDS);
+  public synchronized void stop() {
+    if (autoReloadRunnable != null) {
+      if (!autoReloadRunnable.cancel(true)) {
+        throw new IllegalStateException(
+            "Unable to cancel replication reload task: cannot guarantee orderly shutdown");
+      }
+      autoReloadRunnable = null;
+    }
+  }
+
+  @Override
+  public String getVersion() {
+    return currentConfig.getVersion();
+  }
+
+  @Subscribe
+  public void onReload(ReplicationFileBasedConfig newConfig) {
+    currentConfig = newConfig;
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java
new file mode 100644
index 0000000..a1084a8
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java
@@ -0,0 +1,87 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import java.nio.file.Path;
+import java.util.List;
+
+public class AutoReloadRunnable implements Runnable {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final SitePaths site;
+  private final Path pluginDataDir;
+  private final EventBus eventBus;
+  private final Provider<ObservableQueue> queueObserverProvider;
+  private final ReplicationConfigValidator configValidator;
+
+  private ReplicationFileBasedConfig loadedConfig;
+  private String loadedConfigVersion;
+  private String lastFailedConfigVersion;
+
+  @Inject
+  public AutoReloadRunnable(
+      ReplicationConfigValidator configValidator,
+      ReplicationFileBasedConfig config,
+      SitePaths site,
+      @PluginData Path pluginDataDir,
+      EventBus eventBus,
+      Provider<ObservableQueue> queueObserverProvider) {
+    this.loadedConfig = config;
+    this.loadedConfigVersion = config.getVersion();
+    this.lastFailedConfigVersion = "";
+    this.site = site;
+    this.pluginDataDir = pluginDataDir;
+    this.eventBus = eventBus;
+    this.queueObserverProvider = queueObserverProvider;
+    this.configValidator = configValidator;
+  }
+
+  @Override
+  public synchronized void run() {
+    String pendingConfigVersion = loadedConfig.getVersion();
+    ObservableQueue queue = queueObserverProvider.get();
+    if (pendingConfigVersion.equals(loadedConfigVersion)
+        || pendingConfigVersion.equals(lastFailedConfigVersion)
+        || !queue.isRunning()
+        || queue.isReplaying()) {
+      return;
+    }
+
+    reload();
+  }
+
+  synchronized void reload() {
+    String pendingConfigVersion = loadedConfig.getVersion();
+    try {
+      ReplicationFileBasedConfig newConfig = new ReplicationFileBasedConfig(site, pluginDataDir);
+      final List<RemoteConfiguration> newValidDestinations =
+          configValidator.validateConfig(newConfig);
+      loadedConfig = newConfig;
+      loadedConfigVersion = newConfig.getVersion();
+      lastFailedConfigVersion = "";
+      eventBus.post(newValidDestinations);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log(
+          "Cannot reload replication configuration: keeping existing settings");
+      lastFailedConfigVersion = pendingConfigVersion;
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
index a8dede3..fa26e82 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
@@ -16,8 +16,8 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
@@ -31,7 +31,7 @@
   }
 
   private final RemoteConfig config;
-  private final ReplicationConfig replicationConfig;
+  private final DestinationsCollection destinations;
   private final DynamicItem<AdminApiFactory> adminApiFactory;
   private final Project.NameKey project;
   private final String head;
@@ -39,21 +39,20 @@
   @Inject
   CreateProjectTask(
       RemoteConfig config,
-      ReplicationConfig replicationConfig,
+      DestinationsCollection destinations,
       DynamicItem<AdminApiFactory> adminApiFactory,
       @Assisted Project.NameKey project,
       @Assisted String head) {
     this.config = config;
-    this.replicationConfig = replicationConfig;
+    this.destinations = destinations;
     this.adminApiFactory = adminApiFactory;
     this.project = project;
     this.head = head;
   }
 
   public boolean create() {
-    return replicationConfig
-        .getURIs(Optional.of(config.getName()), project, FilterType.PROJECT_CREATION).values()
-        .stream()
+    return destinations.getURIs(Optional.of(config.getName()), project, FilterType.PROJECT_CREATION)
+        .values().stream()
         .map(u -> createProject(u, project, head))
         .reduce(true, (a, b) -> a && b);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
index f9b2ad7..4617672 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
@@ -16,8 +16,8 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.ioutil.HexFormat;
 import com.google.gerrit.server.util.IdGenerator;
 import com.google.inject.Inject;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 679776f..1524c2a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -26,13 +26,13 @@
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Lists;
 import com.google.gerrit.common.data.GroupReference;
+import com.google.gerrit.entities.AccountGroup;
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.extensions.config.FactoryModule;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
-import com.google.gerrit.reviewdb.client.AccountGroup;
-import com.google.gerrit.reviewdb.client.Branch;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.reviewdb.client.RefNames;
 import com.google.gerrit.server.CurrentUser;
 import com.google.gerrit.server.PluginUser;
 import com.google.gerrit.server.account.GroupBackend;
@@ -60,11 +60,11 @@
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.servlet.RequestScoped;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
-import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -522,6 +522,7 @@
         pending.put(uri, pushOp);
         switch (reason) {
           case COLLISION:
+            replicationTasksStorage.get().reset(pushOp);
             @SuppressWarnings("unused")
             ScheduledFuture<?> ignored =
                 pool.schedule(pushOp, config.getRescheduleDelay(), TimeUnit.SECONDS);
@@ -536,6 +537,7 @@
             postReplicationFailedEvent(pushOp, status);
             if (pushOp.setToRetry()) {
               postReplicationScheduledEvent(pushOp);
+              replicationTasksStorage.get().reset(pushOp);
               @SuppressWarnings("unused")
               ScheduledFuture<?> ignored2 =
                   pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES);
@@ -562,34 +564,30 @@
       if (inFlightOp != null) {
         return RunwayStatus.denied(inFlightOp.getId());
       }
+      if (!replicationTasksStorage.get().start(op)) {
+        return RunwayStatus.deniedExternal();
+      }
       inFlight.put(op.getURI(), op);
     }
     return RunwayStatus.allowed();
   }
 
-  void notifyFinished(PushOne task) {
+  void notifyFinished(PushOne op) {
     synchronized (stateLock) {
-      inFlight.remove(task.getURI());
-      if (!task.wasCanceled()) {
-        for (String ref : task.getRefs()) {
-          if (!refHasPendingPush(task.getURI(), ref)) {
-            replicationTasksStorage
-                .get()
-                .delete(
-                    new ReplicateRefUpdate(
-                        task.getProjectNameKey().get(), ref, task.getURI(), getRemoteConfigName()));
-          }
-        }
-      }
+      replicationTasksStorage.get().finish(op);
+      inFlight.remove(op.getURI());
     }
   }
 
-  private boolean refHasPendingPush(URIish opUri, String ref) {
-    return pushContainsRef(pending.get(opUri), ref) || pushContainsRef(inFlight.get(opUri), ref);
-  }
-
-  private boolean pushContainsRef(PushOne op, String ref) {
-    return op != null && op.getRefs().contains(ref);
+  public Set<String> getPrunableTaskNames() {
+    Set<String> names = new HashSet<>();
+    for (PushOne push : pending.values()) {
+      if (!replicationTasksStorage.get().isWaiting(push)) {
+        repLog.debug("No longer isWaiting, can prune " + push.getURI());
+        names.add(push.toString());
+      }
+    }
+    return names;
   }
 
   boolean wouldPushProject(Project.NameKey project) {
@@ -607,27 +605,16 @@
   }
 
   boolean isSingleProjectMatch() {
-    List<String> projects = config.getProjects();
-    boolean ret = (projects.size() == 1);
-    if (ret) {
-      String projectMatch = projects.get(0);
-      if (ReplicationFilter.getPatternType(projectMatch)
-          != ReplicationFilter.PatternType.EXACT_MATCH) {
-        // projectMatch is either regular expression, or wild-card.
-        //
-        // Even though they might refer to a single project now, they need not
-        // after new projects have been created. Hence, we do not treat them as
-        // matching a single project.
-        ret = false;
-      }
-    }
-    return ret;
+    return config.isSingleProjectMatch();
   }
 
   boolean wouldPushRef(String ref) {
     if (!config.replicatePermissions() && RefNames.REFS_CONFIG.equals(ref)) {
       return false;
     }
+    if (PushOne.ALL_REFS.equals(ref)) {
+      return true;
+    }
     for (RefSpec s : config.getRemoteConfig().getPushRefSpecs()) {
       if (s.matchSource(ref)) {
         return true;
@@ -666,7 +653,7 @@
         } else if (!remoteNameStyle.equals("slash")) {
           repLog.debug("Unknown remoteNameStyle: {}, falling back to slash", remoteNameStyle);
         }
-        String replacedPath = replaceName(uri.getPath(), name, isSingleProjectMatch());
+        String replacedPath = replaceName(uri.getPath(), name, config.isSingleProjectMatch());
         if (replacedPath != null) {
           uri = uri.setPath(replacedPath);
           r.add(uri);
@@ -731,6 +718,10 @@
     return config.getDelay() * 1000;
   }
 
+  int getSlowLatencyThreshold() {
+    return config.getSlowLatencyThreshold();
+  }
+
   private static boolean matches(URIish uri, String urlMatch) {
     if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
       return true;
@@ -750,7 +741,7 @@
       ReplicationScheduledEvent event =
           new ReplicationScheduledEvent(project.get(), ref, targetNode);
       try {
-        eventDispatcher.get().postEvent(new Branch.NameKey(project, ref), event);
+        eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
       } catch (PermissionBackendException e) {
         repLog.error("error posting event", e);
       }
@@ -764,7 +755,7 @@
       RefReplicatedEvent event =
           new RefReplicatedEvent(project.get(), ref, targetNode, RefPushResult.FAILED, status);
       try {
-        eventDispatcher.get().postEvent(new Branch.NameKey(project, ref), event);
+        eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
       } catch (PermissionBackendException e) {
         repLog.error("error posting event", e);
       }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index f688cfc..4b757ea 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -16,13 +16,16 @@
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.config.ConfigUtil;
+import java.util.concurrent.TimeUnit;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.transport.RemoteConfig;
 
-public class DestinationConfiguration {
+public class DestinationConfiguration implements RemoteConfiguration {
   static final int DEFAULT_REPLICATION_DELAY = 15;
   static final int DEFAULT_RESCHEDULE_DELAY = 3;
   static final int DEFAULT_DRAIN_QUEUE_ATTEMPTS = 0;
+  private static final int DEFAULT_SLOW_LATENCY_THRESHOLD_SECS = 900;
 
   private final int delay;
   private final int rescheduleDelay;
@@ -41,6 +44,7 @@
   private final ImmutableList<String> authGroupNames;
   private final RemoteConfig remoteConfig;
   private final int maxRetries;
+  private final int slowLatencyThreshold;
 
   protected DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) {
     this.remoteConfig = remoteConfig;
@@ -67,16 +71,29 @@
     maxRetries =
         getInt(
             remoteConfig, cfg, "replicationMaxRetries", cfg.getInt("replication", "maxRetries", 0));
+
+    slowLatencyThreshold =
+        (int)
+            ConfigUtil.getTimeUnit(
+                cfg,
+                "remote",
+                remoteConfig.getName(),
+                "slowLatencyThreshold",
+                DEFAULT_SLOW_LATENCY_THRESHOLD_SECS,
+                TimeUnit.SECONDS);
   }
 
+  @Override
   public int getDelay() {
     return delay;
   }
 
+  @Override
   public int getRescheduleDelay() {
     return rescheduleDelay;
   }
 
+  @Override
   public int getRetryDelay() {
     return retryDelay;
   }
@@ -93,26 +110,32 @@
     return lockErrorMaxRetries;
   }
 
+  @Override
   public ImmutableList<String> getUrls() {
     return urls;
   }
 
+  @Override
   public ImmutableList<String> getAdminUrls() {
     return adminUrls;
   }
 
+  @Override
   public ImmutableList<String> getProjects() {
     return projects;
   }
 
+  @Override
   public ImmutableList<String> getAuthGroupNames() {
     return authGroupNames;
   }
 
+  @Override
   public String getRemoteNameStyle() {
     return remoteNameStyle;
   }
 
+  @Override
   public boolean replicatePermissions() {
     return replicatePermissions;
   }
@@ -129,10 +152,12 @@
     return replicateHiddenProjects;
   }
 
+  @Override
   public RemoteConfig getRemoteConfig() {
     return remoteConfig;
   }
 
+  @Override
   public int getMaxRetries() {
     return maxRetries;
   }
@@ -140,4 +165,9 @@
   private static int getInt(RemoteConfig rc, Config cfg, String name, int defValue) {
     return cfg.getInt("remote", rc.getName(), name, defValue);
   }
+
+  @Override
+  public int getSlowLatencyThreshold() {
+    return slowLatencyThreshold;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
new file mode 100644
index 0000000..8aa3919
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -0,0 +1,348 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp;
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
+import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.Destination.Factory;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.URIish;
+
+@Singleton
+public class DestinationsCollection implements ReplicationDestinations, ReplicationConfigValidator {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final Factory destinationFactory;
+  private final Provider<ReplicationQueue> replicationQueue;
+  private volatile List<Destination> destinations;
+  private boolean shuttingDown;
+
+  public static class EventQueueNotEmptyException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public EventQueueNotEmptyException(String errorMessage) {
+      super(errorMessage);
+    }
+  }
+
+  @Inject
+  public DestinationsCollection(
+      Destination.Factory destinationFactory,
+      Provider<ReplicationQueue> replicationQueue,
+      ReplicationFileBasedConfig replicationConfig,
+      EventBus eventBus)
+      throws ConfigInvalidException {
+    this.destinationFactory = destinationFactory;
+    this.replicationQueue = replicationQueue;
+    this.destinations = allDestinations(destinationFactory, validateConfig(replicationConfig));
+    eventBus.register(this);
+  }
+
+  @Override
+  public Multimap<Destination, URIish> getURIs(
+      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
+    if (getAll(filterType).isEmpty()) {
+      return ImmutableMultimap.of();
+    }
+
+    SetMultimap<Destination, URIish> uris = HashMultimap.create();
+    for (Destination config : getAll(filterType)) {
+      if (!config.wouldPushProject(projectName)) {
+        continue;
+      }
+
+      if (remoteName.isPresent() && !config.getRemoteConfigName().equals(remoteName.get())) {
+        continue;
+      }
+
+      boolean adminURLUsed = false;
+
+      for (String url : config.getAdminUrls()) {
+        if (Strings.isNullOrEmpty(url)) {
+          continue;
+        }
+
+        URIish uri;
+        try {
+          uri = new URIish(url);
+        } catch (URISyntaxException e) {
+          repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage());
+          continue;
+        }
+
+        if (!isGerrit(uri) && !isGerritHttp(uri)) {
+          String path =
+              replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
+          if (path == null) {
+            repLog.warn("adminURL {} does not contain ${name}", uri);
+            continue;
+          }
+
+          uri = uri.setPath(path);
+          if (!isSSH(uri)) {
+            repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri);
+            continue;
+          }
+        }
+        uris.put(config, uri);
+        adminURLUsed = true;
+      }
+
+      if (!adminURLUsed) {
+        for (URIish uri : config.getURIs(projectName, "*")) {
+          uris.put(config, uri);
+        }
+      }
+    }
+    return uris;
+  }
+
+  @Override
+  public synchronized List<Destination> getAll(FilterType filterType) {
+    Predicate<? super Destination> filter;
+    switch (filterType) {
+      case PROJECT_CREATION:
+        filter = dest -> dest.isCreateMissingRepos();
+        break;
+      case PROJECT_DELETION:
+        filter = dest -> dest.isReplicateProjectDeletions();
+        break;
+      case ALL:
+      default:
+        filter = dest -> true;
+        break;
+    }
+    return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList());
+  }
+
+  @Override
+  public synchronized boolean isEmpty() {
+    return destinations.isEmpty();
+  }
+
+  @Override
+  public synchronized void startup(WorkQueue workQueue) {
+    shuttingDown = false;
+    for (Destination cfg : destinations) {
+      cfg.start(workQueue);
+    }
+  }
+
+  /* shutdown() cannot be set as a synchronized method because
+   * it may need to wait for pending events to complete;
+   * e.g. when enabling the drain of replication events before
+   * shutdown.
+   *
+   * As a rule of thumb for synchronized methods, because they
+   * implicitly define a critical section and associated lock,
+   * they should never hold waiting for another resource, otherwise
+   * the risk of deadlock is very high.
+   *
+   * See more background about deadlocks, what they are and how to
+   * prevent them at: https://en.wikipedia.org/wiki/Deadlock
+   */
+  @Override
+  public int shutdown() {
+    synchronized (this) {
+      shuttingDown = true;
+    }
+
+    int discarded = 0;
+    for (Destination cfg : destinations) {
+      try {
+        drainReplicationEvents(cfg);
+      } catch (EventQueueNotEmptyException e) {
+        logger.atWarning().log("Event queue not empty: %s", e.getMessage());
+      } finally {
+        discarded += cfg.shutdown();
+      }
+    }
+    return discarded;
+  }
+
+  void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
+    int drainQueueAttempts = destination.getDrainQueueAttempts();
+    if (drainQueueAttempts == 0) {
+      return;
+    }
+    int pending = destination.getQueueInfo().pending.size();
+    int inFlight = destination.getQueueInfo().inFlight.size();
+    while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
+      try {
+        logger.atInfo().log(
+            "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
+            inFlight, pending);
+        Thread.sleep(destination.getReplicationDelaySeconds());
+      } catch (InterruptedException ie) {
+        logger.atWarning().withCause(ie).log(
+            "Wait for replication events to drain has been interrupted");
+      }
+      pending = destination.getQueueInfo().pending.size();
+      inFlight = destination.getQueueInfo().inFlight.size();
+      drainQueueAttempts--;
+    }
+    if (pending > 0 || inFlight > 0) {
+      throw new EventQueueNotEmptyException(
+          String.format("Pending: %d - InFlight: %d", pending, inFlight));
+    }
+  }
+
+  @Subscribe
+  public synchronized void onReload(List<RemoteConfiguration> remoteConfigurations) {
+    if (shuttingDown) {
+      logger.atWarning().log("Shutting down: configuration reload ignored");
+      return;
+    }
+
+    try {
+      replicationQueue.get().stop();
+      destinations = allDestinations(destinationFactory, remoteConfigurations);
+      logger.atInfo().log("Configuration reloaded: %d destinations", getAll(FilterType.ALL).size());
+    } finally {
+      replicationQueue.get().start();
+    }
+  }
+
+  @Override
+  public List<RemoteConfiguration> validateConfig(ReplicationFileBasedConfig replicationConfig)
+      throws ConfigInvalidException {
+    if (!replicationConfig.getConfig().getFile().exists()) {
+      logger.atWarning().log(
+          "Config file %s does not exist; not replicating",
+          replicationConfig.getConfig().getFile());
+      return Collections.emptyList();
+    }
+    if (replicationConfig.getConfig().getFile().length() == 0) {
+      logger.atInfo().log(
+          "Config file %s is empty; not replicating", replicationConfig.getConfig().getFile());
+      return Collections.emptyList();
+    }
+
+    try {
+      replicationConfig.getConfig().load();
+    } catch (ConfigInvalidException e) {
+      throw new ConfigInvalidException(
+          String.format(
+              "Config file %s is invalid: %s",
+              replicationConfig.getConfig().getFile(), e.getMessage()),
+          e);
+    } catch (IOException e) {
+      throw new ConfigInvalidException(
+          String.format(
+              "Cannot read %s: %s", replicationConfig.getConfig().getFile(), e.getMessage()),
+          e);
+    }
+
+    boolean defaultForceUpdate =
+        replicationConfig.getConfig().getBoolean("gerrit", "defaultForceUpdate", false);
+
+    ImmutableList.Builder<RemoteConfiguration> confs = ImmutableList.builder();
+    for (RemoteConfig c : allRemotes(replicationConfig.getConfig())) {
+      if (c.getURIs().isEmpty()) {
+        continue;
+      }
+
+      // If destination for push is not set assume equal to source.
+      for (RefSpec ref : c.getPushRefSpecs()) {
+        if (ref.getDestination() == null) {
+          ref.setDestination(ref.getSource());
+        }
+      }
+
+      if (c.getPushRefSpecs().isEmpty()) {
+        c.addPushRefSpec(
+            new RefSpec()
+                .setSourceDestination("refs/*", "refs/*")
+                .setForceUpdate(defaultForceUpdate));
+      }
+
+      DestinationConfiguration destinationConfiguration =
+          new DestinationConfiguration(c, replicationConfig.getConfig());
+
+      if (!destinationConfiguration.isSingleProjectMatch()) {
+        for (URIish u : c.getURIs()) {
+          if (u.getPath() == null || !u.getPath().contains("${name}")) {
+            throw new ConfigInvalidException(
+                String.format(
+                    "remote.%s.url \"%s\" lacks ${name} placeholder in %s",
+                    c.getName(), u, replicationConfig.getConfig().getFile()));
+          }
+        }
+      }
+
+      confs.add(destinationConfiguration);
+    }
+
+    return confs.build();
+  }
+
+  private List<Destination> allDestinations(
+      Destination.Factory destinationFactory, List<RemoteConfiguration> remoteConfigurations) {
+
+    ImmutableList.Builder<Destination> dest = ImmutableList.builder();
+    for (RemoteConfiguration c : remoteConfigurations) {
+      if (c instanceof DestinationConfiguration) {
+        dest.add(destinationFactory.create((DestinationConfiguration) c));
+      }
+    }
+    return dest.build();
+  }
+
+  private static List<RemoteConfig> allRemotes(FileBasedConfig cfg) throws ConfigInvalidException {
+    Set<String> names = cfg.getSubsections("remote");
+    List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size());
+    for (String name : names) {
+      try {
+        result.add(new RemoteConfig(cfg, name));
+      } catch (URISyntaxException e) {
+        throw new ConfigInvalidException(
+            String.format("remote %s has invalid URL in %s", name, cfg.getFile()));
+      }
+    }
+    return result;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
index aaf2b15..f910a40 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
@@ -18,8 +18,8 @@
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
 import com.google.common.base.Charsets;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.restapi.Url;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import java.io.IOException;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
index 56cff5a..d37321a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.ssh.SshAddressesModule;
 import java.io.IOException;
 import java.io.OutputStream;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
index fa17dce..d337883 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
@@ -40,11 +40,11 @@
   @Option(name = "--json", usage = "output in json format")
   private boolean json;
 
-  @Inject private ReplicationConfig config;
+  @Inject private ReplicationDestinations destinations;
 
   @Override
   protected void run() {
-    for (Destination d : config.getDestinations(FilterType.ALL)) {
+    for (Destination d : destinations.getAll(FilterType.ALL)) {
       if (matches(d.getRemoteConfigName())) {
         printRemote(d);
       }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
index aa6e16c..da960e6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -16,7 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import java.io.File;
 import java.io.IOException;
 import org.eclipse.jgit.internal.storage.file.FileRepository;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java
new file mode 100644
index 0000000..1007ae5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+/** Allows a queue activity to be observed */
+public interface ObservableQueue {
+  /**
+   * Indicates whether the observed queue is running
+   *
+   * @return true, when the queue is running, false otherwise
+   */
+  boolean isRunning();
+
+  /**
+   * Indicates whether the observed queue is replaying queued events
+   *
+   * @return true, when the queue is replaying, false otherwise
+   */
+  boolean isReplaying();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
index 833b02b..4f60319 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.gerrit.common.Nullable;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.inject.Inject;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 56cecfe..d31ae3b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -16,19 +16,20 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toMap;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Sets;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.ResourceConflictException;
 import com.google.gerrit.metrics.Timer1;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.reviewdb.client.RefNames;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.PerThreadRequestScope;
 import com.google.gerrit.server.git.ProjectRunnable;
@@ -318,6 +319,8 @@
     if (!status.isAllowed()) {
       if (status.isCanceled()) {
         repLog.info("PushOp for replication to {} was canceled and thus won't be rescheduled", uri);
+      } else if (status.isExternalInflight()) {
+        repLog.info("PushOp for replication to {} was denied externally", uri);
       } else {
         repLog.info(
             "Rescheduling replication to {} to avoid collision with the in-flight push [{}].",
@@ -329,14 +332,19 @@
     }
 
     repLog.info("Replication to {} started...", uri);
-    Timer1.Context context = metrics.start(config.getName());
+    Timer1.Context<String> destinationContext = metrics.start(config.getName());
     try {
-      long startedAt = context.getStartTime();
+      long startedAt = destinationContext.getStartTime();
       long delay = NANOSECONDS.toMillis(startedAt - createdAt);
       metrics.record(config.getName(), delay, retryCount);
       git = gitManager.openRepository(projectName);
       runImpl();
-      long elapsed = NANOSECONDS.toMillis(context.stop());
+      long elapsed = NANOSECONDS.toMillis(destinationContext.stop());
+
+      if (elapsed > SECONDS.toMillis(pool.getSlowLatencyThreshold())) {
+        metrics.recordSlowProjectReplication(
+            config.getName(), projectName.get(), pool.getSlowLatencyThreshold(), elapsed);
+      }
       repLog.info(
           "Replication to {} completed in {}ms, {}ms delay, {} retries",
           uri,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
index fccdb7b..d1ab790 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
@@ -14,9 +14,10 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.events.RefEvent;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import java.util.Objects;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.RemoteRefUpdate.Status;
 
@@ -45,11 +46,40 @@
 
   @Override
   public Project.NameKey getProjectNameKey() {
-    return new Project.NameKey(project);
+    return Project.nameKey(project);
   }
 
   @Override
   public String getRefName() {
     return ref;
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof RefReplicatedEvent)) {
+      return false;
+    }
+    RefReplicatedEvent event = (RefReplicatedEvent) other;
+    if (!Objects.equals(event.project, this.project)) {
+      return false;
+    }
+    if (!Objects.equals(event.ref, this.ref)) {
+      return false;
+    }
+    if (!Objects.equals(event.targetNode, this.targetNode)) {
+      return false;
+    }
+    if (!Objects.equals(event.status, this.status)) {
+      return false;
+    }
+    if (!Objects.equals(event.refStatus, this.refStatus)) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
index 4789a96..e663194 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
@@ -14,8 +14,9 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.events.RefEvent;
+import java.util.Objects;
 
 public class RefReplicationDoneEvent extends RefEvent {
   public static final String TYPE = "ref-replication-done";
@@ -33,11 +34,35 @@
 
   @Override
   public Project.NameKey getProjectNameKey() {
-    return new Project.NameKey(project);
+    return Project.nameKey(project);
   }
 
   @Override
   public String getRefName() {
     return ref;
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof RefReplicationDoneEvent)) {
+      return false;
+    }
+
+    RefReplicationDoneEvent event = (RefReplicationDoneEvent) other;
+    if (!Objects.equals(event.project, this.project)) {
+      return false;
+    }
+    if (!Objects.equals(event.ref, this.ref)) {
+      return false;
+    }
+    if (event.nodesCount != this.nodesCount) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
new file mode 100644
index 0000000..b66e73c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
@@ -0,0 +1,122 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.eclipse.jgit.transport.RemoteConfig;
+
+/** Remote configuration for a replication endpoint */
+public interface RemoteConfiguration {
+  /**
+   * Time to wait before scheduling a remote replication operation. Setting to 0 effectively
+   * disables the delay.
+   *
+   * @return the delay value in seconds
+   */
+  int getDelay();
+  /**
+   * Time to wait before rescheduling a remote replication operation, which might have failed the
+   * first time round. Setting to 0 effectively disables the delay.
+   *
+   * @return the delay value in seconds
+   */
+  int getRescheduleDelay();
+  /**
+   * Time to wait before retrying a failed remote replication operation, Setting to 0 effectively
+   * disables the delay.
+   *
+   * @return the delay value in seconds
+   */
+  int getRetryDelay();
+  /**
+   * List of the remote endpoint addresses used for replication.
+   *
+   * @return list of remote URL strings
+   */
+  ImmutableList<String> getUrls();
+  /**
+   * List of alternative remote endpoint addresses, used for admin operations, such as repository
+   * creation
+   *
+   * @return list of remote URL strings
+   */
+  ImmutableList<String> getAdminUrls();
+  /**
+   * List of repositories that should be replicated
+   *
+   * @return list of project strings
+   */
+  ImmutableList<String> getProjects();
+  /**
+   * List of groups that should be used to access the repositories.
+   *
+   * @return list of group strings
+   */
+  ImmutableList<String> getAuthGroupNames();
+  /**
+   * Influence how the name of the remote repository should be computed.
+   *
+   * @return a string representing a remote style name
+   */
+  String getRemoteNameStyle();
+  /**
+   * If true, permissions-only projects and the refs/meta/config branch will also be replicated
+   *
+   * @return a string representing a remote style name
+   */
+  boolean replicatePermissions();
+  /**
+   * the JGIT remote configuration representing the replication for this endpoint
+   *
+   * @return The remote config {@link RemoteConfig}
+   */
+  RemoteConfig getRemoteConfig();
+  /**
+   * Number of times to retry a replication operation
+   *
+   * @return the number of retries
+   */
+  int getMaxRetries();
+
+  /**
+   * the time duration after which the replication for a project should be considered “slow”
+   *
+   * @return the slow latency threshold
+   */
+  int getSlowLatencyThreshold();
+
+  /**
+   * Whether the remote configuration is for a single project only
+   *
+   * @return true, when configuration is for a single project, false otherwise
+   */
+  default boolean isSingleProjectMatch() {
+    List<String> projects = getProjects();
+    boolean ret = (projects.size() == 1);
+    if (ret) {
+      String projectMatch = projects.get(0);
+      if (ReplicationFilter.getPatternType(projectMatch)
+          != ReplicationFilter.PatternType.EXACT_MATCH) {
+        // projectMatch is either regular expression, or wild-card.
+        //
+        // Even though they might refer to a single project now, they need not
+        // after new projects have been created. Hence, we do not treat them as
+        // matching a single project.
+        ret = false;
+      }
+    }
+    return ret;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
index ee9d4c0..d4d979f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -16,7 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import java.io.IOException;
 import java.io.OutputStream;
 import org.eclipse.jgit.transport.URIish;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index 929c538..68fe430 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -11,44 +11,70 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
+
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.common.collect.Multimap;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.git.WorkQueue;
 import java.nio.file.Path;
-import java.util.List;
-import java.util.Optional;
-import org.eclipse.jgit.transport.URIish;
 
+/** Configuration of all the replication end points. */
 public interface ReplicationConfig {
 
+  /** Filter for accessing replication projects. */
   enum FilterType {
     PROJECT_CREATION,
     PROJECT_DELETION,
     ALL
   }
 
-  List<Destination> getDestinations(FilterType filterType);
-
-  Multimap<Destination, URIish> getURIs(
-      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType);
-
+  /**
+   * Returns current replication configuration of whether to replicate or not all the projects when
+   * the plugin starts.
+   *
+   * @return true if replication at plugin start, false otherwise.
+   */
   boolean isReplicateAllOnPluginStart();
 
+  /**
+   * Returns the default behaviour of the replication plugin when pushing to remote replication
+   * ends. Even though the property name has the 'update' suffix, it actually refers to Git push
+   * operation and not to a Git update.
+   *
+   * @return true if forced push is the default, false otherwise.
+   */
   boolean isDefaultForceUpdate();
 
+  /**
+   * Returns the interval in seconds for running task distribution.
+   *
+   * @return number of seconds, zero if never.
+   */
+  int getDistributionInterval();
+
+  /**
+   * Returns the maximum number of ref-specs to log into the replication_log whenever a push
+   * operation is completed against a replication end.
+   *
+   * @return maximum number of refs to log, zero if unlimited.
+   */
   int getMaxRefsToLog();
 
-  boolean isEmpty();
-
+  /**
+   * Configured location where the replication events are stored on the filesystem for being resumed
+   * and kept across restarts.
+   *
+   * @return path to store persisted events.
+   */
   Path getEventsDirectory();
 
-  int shutdown();
-
-  void startup(WorkQueue workQueue);
-
   int getSshConnectionTimeout();
 
   int getSshCommandTimeout();
+
+  /**
+   * Current logical version string of the current configuration loaded in memory, depending on the
+   * actual implementation of the configuration on the persistent storage.
+   *
+   * @return current logical version number.
+   */
+  String getVersion();
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfigValidator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfigValidator.java
new file mode 100644
index 0000000..7883114
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfigValidator.java
@@ -0,0 +1,31 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.util.List;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+
+public interface ReplicationConfigValidator {
+
+  /**
+   * validate the new replication.config
+   *
+   * @param newConfig new configuration detected
+   * @return List of validated {@link RemoteConfiguration}
+   * @throws ConfigInvalidException if the new configuration is not valid.
+   */
+  List<RemoteConfiguration> validateConfig(ReplicationFileBasedConfig newConfig)
+      throws ConfigInvalidException;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
new file mode 100644
index 0000000..b191d7d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
@@ -0,0 +1,63 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.collect.Multimap;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.WorkQueue;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import java.util.List;
+import java.util.Optional;
+import org.eclipse.jgit.transport.URIish;
+
+/** Git destinations currently active for replication. */
+public interface ReplicationDestinations {
+
+  /**
+   * Return all the URIs associated to a project and a filter criteria.
+   *
+   * @param remoteName name of the replication end or empty if selecting all ends.
+   * @param projectName name of the project
+   * @param filterType type of filter criteria for selecting projects
+   * @return the multi-map of destinations and the associated replication URIs
+   */
+  Multimap<Destination, URIish> getURIs(
+      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType);
+
+  /**
+   * List of currently active replication destinations.
+   *
+   * @param filterType type project filtering
+   * @return the list of active destinations
+   */
+  List<Destination> getAll(FilterType filterType);
+
+  /** @return true if there are no destinations, false otherwise. */
+  boolean isEmpty();
+
+  /**
+   * Start replicating to all destinations.
+   *
+   * @param workQueue execution queue for scheduling the replication events.
+   */
+  void startup(WorkQueue workQueue);
+
+  /**
+   * Stop the replication to all destinations.
+   *
+   * @return number of events cancelled during shutdown.
+   */
+  int shutdown();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 6476412..d714376 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -13,51 +13,19 @@
 // limitations under the License.
 package com.googlesource.gerrit.plugins.replication;
 
-import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
-import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp;
-import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
-import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static java.util.stream.Collectors.toList;
-
 import com.google.common.base.Strings;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.SetMultimap;
-import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.annotations.PluginData;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.config.ConfigUtil;
 import com.google.gerrit.server.config.SitePaths;
-import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import java.io.IOException;
-import java.net.URISyntaxException;
 import java.nio.file.Path;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Predicate;
-import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.transport.RemoteConfig;
-import org.eclipse.jgit.transport.URIish;
 import org.eclipse.jgit.util.FS;
 
 @Singleton
 public class ReplicationFileBasedConfig implements ReplicationConfig {
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final int DEFAULT_SSH_CONNECTION_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes
 
-  private List<Destination> destinations;
   private final SitePaths site;
   private Path cfgPath;
   private boolean replicateAllOnPluginStart;
@@ -69,177 +37,17 @@
   private final Path pluginDataDir;
 
   @Inject
-  public ReplicationFileBasedConfig(
-      SitePaths site, Destination.Factory destinationFactory, @PluginData Path pluginDataDir)
-      throws ConfigInvalidException, IOException {
+  public ReplicationFileBasedConfig(SitePaths site, @PluginData Path pluginDataDir) {
     this.site = site;
     this.cfgPath = site.etc_dir.resolve("replication.config");
     this.config = new FileBasedConfig(cfgPath.toFile(), FS.DETECTED);
-    this.destinations = allDestinations(destinationFactory);
+    this.replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
+    this.defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
+    this.maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0);
     this.pluginDataDir = pluginDataDir;
   }
 
-  /*
-   * (non-Javadoc)
-   * @see
-   * com.googlesource.gerrit.plugins.replication.ReplicationConfig#getDestinations
-   * (com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType)
-   */
-  @Override
-  public List<Destination> getDestinations(FilterType filterType) {
-    Predicate<? super Destination> filter;
-    switch (filterType) {
-      case PROJECT_CREATION:
-        filter = dest -> dest.isCreateMissingRepos();
-        break;
-      case PROJECT_DELETION:
-        filter = dest -> dest.isReplicateProjectDeletions();
-        break;
-      case ALL:
-      default:
-        filter = dest -> true;
-        break;
-    }
-    return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList());
-  }
-
-  private List<Destination> allDestinations(Destination.Factory destinationFactory)
-      throws ConfigInvalidException, IOException {
-    if (!config.getFile().exists()) {
-      logger.atWarning().log("Config file %s does not exist; not replicating", config.getFile());
-      return Collections.emptyList();
-    }
-    if (config.getFile().length() == 0) {
-      logger.atInfo().log("Config file %s is empty; not replicating", config.getFile());
-      return Collections.emptyList();
-    }
-
-    try {
-      config.load();
-    } catch (ConfigInvalidException e) {
-      throw new ConfigInvalidException(
-          String.format("Config file %s is invalid: %s", config.getFile(), e.getMessage()), e);
-    } catch (IOException e) {
-      throw new IOException(
-          String.format("Cannot read %s: %s", config.getFile(), e.getMessage()), e);
-    }
-
-    replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
-
-    defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
-
-    maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0);
-
-    sshCommandTimeout =
-        (int) ConfigUtil.getTimeUnit(config, "gerrit", null, "sshCommandTimeout", 0, SECONDS);
-    sshConnectionTimeout =
-        (int)
-            ConfigUtil.getTimeUnit(
-                config,
-                "gerrit",
-                null,
-                "sshConnectionTimeout",
-                DEFAULT_SSH_CONNECTION_TIMEOUT_MS,
-                MILLISECONDS);
-
-    ImmutableList.Builder<Destination> dest = ImmutableList.builder();
-    for (RemoteConfig c : allRemotes(config)) {
-      if (c.getURIs().isEmpty()) {
-        continue;
-      }
-
-      // If destination for push is not set assume equal to source.
-      for (RefSpec ref : c.getPushRefSpecs()) {
-        if (ref.getDestination() == null) {
-          ref.setDestination(ref.getSource());
-        }
-      }
-
-      if (c.getPushRefSpecs().isEmpty()) {
-        c.addPushRefSpec(
-            new RefSpec()
-                .setSourceDestination("refs/*", "refs/*")
-                .setForceUpdate(defaultForceUpdate));
-      }
-
-      Destination destination = destinationFactory.create(new DestinationConfiguration(c, config));
-
-      if (!destination.isSingleProjectMatch()) {
-        for (URIish u : c.getURIs()) {
-          if (u.getPath() == null || !u.getPath().contains("${name}")) {
-            throw new ConfigInvalidException(
-                String.format(
-                    "remote.%s.url \"%s\" lacks ${name} placeholder in %s",
-                    c.getName(), u, config.getFile()));
-          }
-        }
-      }
-
-      dest.add(destination);
-    }
-    return dest.build();
-  }
-
-  @Override
-  public Multimap<Destination, URIish> getURIs(
-      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
-    if (getDestinations(filterType).isEmpty()) {
-      return ImmutableMultimap.of();
-    }
-
-    SetMultimap<Destination, URIish> uris = HashMultimap.create();
-    for (Destination config : getDestinations(filterType)) {
-      if (!config.wouldPushProject(projectName)) {
-        continue;
-      }
-
-      if (remoteName.isPresent() && !config.getRemoteConfigName().equals(remoteName.get())) {
-        continue;
-      }
-
-      boolean adminURLUsed = false;
-
-      for (String url : config.getAdminUrls()) {
-        if (Strings.isNullOrEmpty(url)) {
-          continue;
-        }
-
-        URIish uri;
-        try {
-          uri = new URIish(url);
-        } catch (URISyntaxException e) {
-          repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage());
-          continue;
-        }
-
-        if (!isGerrit(uri) && !isGerritHttp(uri)) {
-          String path =
-              replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
-          if (path == null) {
-            repLog.warn("adminURL {} does not contain ${name}", uri);
-            continue;
-          }
-
-          uri = uri.setPath(path);
-          if (!isSSH(uri)) {
-            repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri);
-            continue;
-          }
-        }
-        uris.put(config, uri);
-        adminURLUsed = true;
-      }
-
-      if (!adminURLUsed) {
-        for (URIish uri : config.getURIs(projectName, "*")) {
-          uris.put(config, uri);
-        }
-      }
-    }
-    return uris;
-  }
-
-  static String replaceName(String in, String name, boolean keyIsOptional) {
+  public static String replaceName(String in, String name, boolean keyIsOptional) {
     String key = "${name}";
     int n = in.indexOf(key);
     if (0 <= n) {
@@ -268,32 +76,15 @@
   }
 
   @Override
+  public int getDistributionInterval() {
+    return config.getInt("replication", "distributionInterval", 0);
+  }
+
+  @Override
   public int getMaxRefsToLog() {
     return maxRefsToLog;
   }
 
-  private static List<RemoteConfig> allRemotes(FileBasedConfig cfg) throws ConfigInvalidException {
-    Set<String> names = cfg.getSubsections("remote");
-    List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size());
-    for (String name : names) {
-      try {
-        result.add(new RemoteConfig(cfg, name));
-      } catch (URISyntaxException e) {
-        throw new ConfigInvalidException(
-            String.format("remote %s has invalid URL in %s", name, cfg.getFile()));
-      }
-    }
-    return result;
-  }
-
-  /* (non-Javadoc)
-   * @see com.googlesource.gerrit.plugins.replication.ReplicationConfig#isEmpty()
-   */
-  @Override
-  public boolean isEmpty() {
-    return destinations.isEmpty();
-  }
-
   @Override
   public Path getEventsDirectory() {
     String eventsDirectory = config.getString("replication", null, "eventsDirectory");
@@ -307,67 +98,13 @@
     return cfgPath;
   }
 
-  @Override
-  public int shutdown() {
-    int discarded = 0;
-    for (Destination cfg : destinations) {
-      try {
-        drainReplicationEvents(cfg);
-      } catch (EventQueueNotEmptyException e) {
-        logger.atWarning().log("Event queue not empty: %s", e.getMessage());
-      } finally {
-        discarded += cfg.shutdown();
-      }
-    }
-    return discarded;
-  }
-
-  void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
-    int drainQueueAttempts = destination.getDrainQueueAttempts();
-    if (drainQueueAttempts == 0) {
-      return;
-    }
-    int pending = destination.getQueueInfo().pending.size();
-    int inFlight = destination.getQueueInfo().inFlight.size();
-
-    while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
-      try {
-        logger.atInfo().log(
-            "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
-            inFlight, pending);
-        Thread.sleep(destination.getReplicationDelaySeconds());
-      } catch (InterruptedException ie) {
-        logger.atWarning().withCause(ie).log(
-            "Wait for replication events to drain has been interrupted");
-      }
-      pending = destination.getQueueInfo().pending.size();
-      inFlight = destination.getQueueInfo().inFlight.size();
-      drainQueueAttempts--;
-    }
-
-    if (pending > 0 || inFlight > 0) {
-      throw new EventQueueNotEmptyException(
-          String.format("Pending: %d - InFlight: %d", pending, inFlight));
-    }
-  }
-
-  public static class EventQueueNotEmptyException extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    public EventQueueNotEmptyException(String errorMessage) {
-      super(errorMessage);
-    }
-  }
-
-  FileBasedConfig getConfig() {
+  public FileBasedConfig getConfig() {
     return config;
   }
 
   @Override
-  public void startup(WorkQueue workQueue) {
-    for (Destination cfg : destinations) {
-      cfg.start(workQueue);
-    }
+  public String getVersion() {
+    return Long.toString(config.getFile().lastModified());
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
index 05bbb03..5b4204e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.gerrit.common.data.AccessSection;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import java.util.Collections;
 import java.util.List;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java
index afc7926..1bc17ec 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java
@@ -14,11 +14,14 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.metrics.Description;
 import com.google.gerrit.metrics.Field;
 import com.google.gerrit.metrics.Histogram1;
+import com.google.gerrit.metrics.Histogram3;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.server.logging.PluginMetadata;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -27,10 +30,37 @@
   private final Timer1<String> executionTime;
   private final Histogram1<String> executionDelay;
   private final Histogram1<String> executionRetries;
+  private final Histogram3<Integer, String, String> slowProjectReplicationLatency;
 
   @Inject
-  ReplicationMetrics(MetricMaker metricMaker) {
-    Field<String> DEST_FIELD = Field.ofString("destination");
+  ReplicationMetrics(@PluginName String pluginName, MetricMaker metricMaker) {
+    Field<String> DEST_FIELD =
+        Field.ofString(
+                "destination",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(PluginMetadata.create("destination", fieldValue)))
+            .build();
+
+    Field<String> PROJECT_FIELD =
+        Field.ofString(
+                "project",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(PluginMetadata.create("project", fieldValue)))
+            .build();
+
+    Field<Integer> SLOW_THRESHOLD_FIELD =
+        Field.ofInteger(
+                "slow_threshold",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(
+                            PluginMetadata.create("slow_threshold", fieldValue.toString())))
+            .build();
 
     executionTime =
         metricMaker.newTimer(
@@ -55,6 +85,17 @@
                 .setCumulative()
                 .setUnit("retries"),
             DEST_FIELD);
+
+    slowProjectReplicationLatency =
+        metricMaker.newHistogram(
+            "latency_slower_than_threshold" + "",
+            new Description(
+                    "latency for project to destination, where latency was slower than threshold")
+                .setCumulative()
+                .setUnit(Description.Units.MILLISECONDS),
+            SLOW_THRESHOLD_FIELD,
+            PROJECT_FIELD,
+            DEST_FIELD);
   }
 
   /**
@@ -63,7 +104,7 @@
    * @param name the destination name.
    * @return the timer context.
    */
-  Timer1.Context start(String name) {
+  Timer1.Context<String> start(String name) {
     return executionTime.start(name);
   }
 
@@ -78,4 +119,17 @@
     executionDelay.record(name, delay);
     executionRetries.record(name, retries);
   }
+
+  /**
+   * Record replication latency for project to destination, where latency was slower than threshold
+   *
+   * @param destinationName the destination name.
+   * @param projectName the project name.
+   * @param slowThreshold replication initialDelay in milliseconds.
+   * @param latency number of retries.
+   */
+  void recordSlowProjectReplication(
+      String destinationName, String projectName, Integer slowThreshold, long latency) {
+    slowProjectReplicationLatency.record(slowThreshold, destinationName, projectName, latency);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index 835d068..979c8e3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -16,6 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.StartReplicationCapability.START_REPLICATION;
 
+import com.google.common.eventbus.EventBus;
 import com.google.gerrit.extensions.annotations.Exports;
 import com.google.gerrit.extensions.config.CapabilityDefinition;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
@@ -23,18 +24,35 @@
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.events.EventTypes;
 import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.ProvisionException;
 import com.google.inject.Scopes;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.internal.UniqueAnnotations;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.transport.SshSessionFactory;
+import org.eclipse.jgit.util.FS;
 
 class ReplicationModule extends AbstractModule {
+  private final Path cfgPath;
+
+  @Inject
+  public ReplicationModule(SitePaths site) {
+    cfgPath = site.etc_dir.resolve("replication.config");
+  }
+
   @Override
   protected void configure() {
     install(new FactoryModuleBuilder().build(Destination.Factory.class));
     bind(ReplicationQueue.class).in(Scopes.SINGLETON);
+    bind(ObservableQueue.class).to(ReplicationQueue.class);
     bind(LifecycleListener.class)
         .annotatedWith(UniqueAnnotations.create())
         .to(ReplicationQueue.class);
@@ -57,7 +75,19 @@
 
     install(new FactoryModuleBuilder().build(PushAll.Factory.class));
 
-    bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
+    bind(EventBus.class).in(Scopes.SINGLETON);
+    bind(ReplicationDestinations.class).to(DestinationsCollection.class);
+    bind(ReplicationConfigValidator.class).to(DestinationsCollection.class);
+
+    if (getReplicationConfig().getBoolean("gerrit", "autoReload", false)) {
+      bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
+      bind(LifecycleListener.class)
+          .annotatedWith(UniqueAnnotations.create())
+          .to(AutoReloadConfigDecorator.class);
+    } else {
+      bind(ReplicationConfig.class).to(ReplicationFileBasedConfig.class);
+    }
+
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
     DynamicSet.bind(binder(), ReplicationStateListener.class).to(ReplicationStateLogger.class);
 
@@ -68,4 +98,15 @@
 
     bind(TransportFactory.class).to(TransportFactoryImpl.class).in(Scopes.SINGLETON);
   }
+
+  private FileBasedConfig getReplicationConfig() {
+    File replicationConfigFile = cfgPath.toFile();
+    FileBasedConfig config = new FileBasedConfig(replicationConfigFile, FS.DETECTED);
+    try {
+      config.load();
+    } catch (IOException | ConfigInvalidException e) {
+      throw new ProvisionException("Unable to load " + replicationConfigFile.getAbsolutePath(), e);
+    }
+    return config;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 8030d28..0dcfc95 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,18 +14,21 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
@@ -33,13 +36,16 @@
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import org.eclipse.jgit.transport.URIish;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Manages automatic replication to remote repositories. */
 public class ReplicationQueue
-    implements LifecycleListener,
+    implements ObservableQueue,
+        LifecycleListener,
         GitReferenceUpdatedListener,
         ProjectDeletedListener,
         HeadUpdatedListener {
@@ -48,24 +54,28 @@
 
   private final ReplicationStateListener stateLog;
 
+  private final ReplicationConfig replConfig;
   private final WorkQueue workQueue;
   private final DynamicItem<EventDispatcher> dispatcher;
-  private final ReplicationConfig config;
+  private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency
   private final ReplicationTasksStorage replicationTasksStorage;
   private volatile boolean running;
   private volatile boolean replaying;
   private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
+  private Distributor distributor;
 
   @Inject
   ReplicationQueue(
-      WorkQueue wq,
       ReplicationConfig rc,
+      WorkQueue wq,
+      Provider<ReplicationDestinations> rd,
       DynamicItem<EventDispatcher> dis,
       ReplicationStateListeners sl,
       ReplicationTasksStorage rts) {
+    replConfig = rc;
     workQueue = wq;
     dispatcher = dis;
-    config = rc;
+    destinations = rd;
     stateLog = sl;
     replicationTasksStorage = rts;
     beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
@@ -74,26 +84,31 @@
   @Override
   public void start() {
     if (!running) {
-      config.startup(workQueue);
+      destinations.get().startup(workQueue);
       running = true;
+      replicationTasksStorage.resetAll();
       firePendingEvents();
       fireBeforeStartupEvents();
+      distributor = new Distributor(workQueue);
     }
   }
 
   @Override
   public void stop() {
     running = false;
-    int discarded = config.shutdown();
+    distributor.stop();
+    int discarded = destinations.get().shutdown();
     if (discarded > 0) {
       repLog.warn("Canceled {} replication events during shutdown", discarded);
     }
   }
 
+  @Override
   public boolean isRunning() {
     return running;
   }
 
+  @Override
   public boolean isReplaying() {
     return replaying;
   }
@@ -105,49 +120,46 @@
   @VisibleForTesting
   public void scheduleFullSync(
       Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
-    if (!running) {
-      stateLog.warn("Replication plugin did not finish startup before event", state);
-      return;
-    }
-
-    for (Destination cfg : config.getDestinations(FilterType.ALL)) {
-      if (cfg.wouldPushProject(project)) {
-        for (URIish uri : cfg.getURIs(project, urlMatch)) {
-          cfg.schedule(project, PushOne.ALL_REFS, uri, state, now);
-          replicationTasksStorage.persist(
-              new ReplicateRefUpdate(
-                  project.get(), PushOne.ALL_REFS, uri, cfg.getRemoteConfigName()));
-        }
-      }
-    }
+    fire(project, urlMatch, PushOne.ALL_REFS, state, now, false);
   }
 
   @Override
   public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
-    onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+    fire(event.getProjectName(), event.getRefName(), false);
   }
 
-  private void onGitReferenceUpdated(String projectName, String refName) {
+  private void fire(String projectName, String refName, boolean isPersisted) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+    fire(Project.nameKey(projectName), null, refName, state, false, isPersisted);
+    state.markAllPushTasksScheduled();
+  }
+
+  private void fire(
+      Project.NameKey project,
+      String urlMatch,
+      String refName,
+      ReplicationState state,
+      boolean now,
+      boolean isPersisted) {
     if (!running) {
       stateLog.warn(
           "Replication plugin did not finish startup before event, event replication is postponed",
           state);
-      beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(projectName, refName));
+      beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(project.get(), refName));
       return;
     }
 
-    Project.NameKey project = new Project.NameKey(projectName);
-    for (Destination cfg : config.getDestinations(FilterType.ALL)) {
+    for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
       if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
-        for (URIish uri : cfg.getURIs(project, null)) {
-          replicationTasksStorage.persist(
-              new ReplicateRefUpdate(projectName, refName, uri, cfg.getRemoteConfigName()));
-          cfg.schedule(project, refName, uri, state);
+        for (URIish uri : cfg.getURIs(project, urlMatch)) {
+          if (!isPersisted) {
+            replicationTasksStorage.create(
+                new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+          }
+          cfg.schedule(project, refName, uri, state, now);
         }
       }
     }
-    state.markAllPushTasksScheduled();
   }
 
   private void firePendingEvents() {
@@ -155,11 +167,11 @@
     try {
       Set<String> eventsReplayed = new HashSet<>();
       replaying = true;
-      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) {
+      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
         String eventKey = String.format("%s:%s", t.project, t.ref);
         if (!eventsReplayed.contains(eventKey)) {
           repLog.info("Firing pending task {}", eventKey);
-          onGitReferenceUpdated(t.project, t.ref);
+          fire(t.project, t.ref, true);
           eventsReplayed.add(eventKey);
         }
       }
@@ -168,17 +180,41 @@
     }
   }
 
+  private void pruneCompleted() {
+    // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes.
+    // We also cannot access them by taskId since PushOnes don't have a taskId, they do have
+    // and Id, but it not the id assigned to the task in the queues. The tasks in the queue
+    // do use the same name as returned by toString() though, so that be used to correlate
+    // PushOnes with queue tasks despite their wrappers.
+    Set<String> prunableTaskNames = new HashSet<>();
+    for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
+      prunableTaskNames.addAll(destination.getPrunableTaskNames());
+    }
+
+    for (WorkQueue.Task<?> task : workQueue.getTasks()) {
+      WorkQueue.Task.State state = task.getState();
+      if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) {
+        if (task instanceof WorkQueue.ProjectTask) {
+          if (prunableTaskNames.contains(task.toString())) {
+            repLog.debug("Pruning externally completed task:" + task);
+            task.cancel(false);
+          }
+        }
+      }
+    }
+  }
+
   @Override
   public void onProjectDeleted(ProjectDeletedListener.Event event) {
-    Project.NameKey p = new Project.NameKey(event.getProjectName());
-    config.getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries().stream()
+    Project.NameKey p = Project.nameKey(event.getProjectName());
+    destinations.get().getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries().stream()
         .forEach(e -> e.getKey().scheduleDeleteProject(e.getValue(), p));
   }
 
   @Override
   public void onHeadUpdated(HeadUpdatedListener.Event event) {
-    Project.NameKey p = new Project.NameKey(event.getProjectName());
-    config.getURIs(Optional.empty(), p, FilterType.ALL).entries().stream()
+    Project.NameKey p = Project.nameKey(event.getProjectName());
+    destinations.get().getURIs(Optional.empty(), p, FilterType.ALL).entries().stream()
         .forEach(e -> e.getKey().scheduleUpdateHead(e.getValue(), p, event.getNewHeadName()));
   }
 
@@ -188,7 +224,7 @@
       String eventKey = String.format("%s:%s", event.projectName(), event.refName());
       if (!eventsReplayed.contains(eventKey)) {
         repLog.info("Firing pending task {}", event);
-        onGitReferenceUpdated(event.projectName(), event.refName());
+        fire(event.projectName(), event.refName(), false);
         eventsReplayed.add(eventKey);
       }
     }
@@ -205,4 +241,49 @@
 
     public abstract String refName();
   }
+
+  protected class Distributor implements WorkQueue.CancelableRunnable {
+    public ScheduledThreadPoolExecutor executor;
+    public ScheduledFuture<?> future;
+
+    public Distributor(WorkQueue wq) {
+      int distributionInterval = replConfig.getDistributionInterval();
+      if (distributionInterval > 0) {
+        executor = wq.createQueue(1, "Replication Distribution", false);
+        future =
+            executor.scheduleWithFixedDelay(
+                this, distributionInterval, distributionInterval, SECONDS);
+      }
+    }
+
+    @Override
+    public void run() {
+      if (!running) {
+        return;
+      }
+      try {
+        firePendingEvents();
+        pruneCompleted();
+      } catch (Exception e) {
+        repLog.error("error distributing tasks", e);
+      }
+    }
+
+    @Override
+    public void cancel() {
+      future.cancel(true);
+    }
+
+    public void stop() {
+      if (executor != null) {
+        cancel();
+        executor.getQueue().remove(this);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "Replication Distributor";
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
index aa965fe..28f6b6b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.events.RefEvent;
 
 public class ReplicationScheduledEvent extends RefEvent {
@@ -38,6 +38,6 @@
 
   @Override
   public Project.NameKey getProjectNameKey() {
-    return new Project.NameKey(project);
+    return Project.nameKey(project);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 64397f9..992cf5b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -25,63 +25,108 @@
 import com.google.inject.Singleton;
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.List;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.transport.URIish;
 
+/**
+ * A persistent store for replication tasks.
+ *
+ * <p>The data of this store lives under <replication_data>/ref-updates where replication_data is
+ * determined by the replication.eventsDirectory config option and defaults to
+ * <site_dir>/data/replication. Atomic renames must be supported from anywhere within the store to
+ * anywhere within the store. This generally means that all the contents of the store needs to live
+ * on the same filesystem.
+ *
+ * <p>Individual tasks are stored in files under the following directories using the sha1 of the
+ * task:
+ *
+ * <p><code>
+ *   .../building/<tmp_name>             new replication tasks under construction
+ *   .../running/<uri_sha1>/             lock for URI
+ *   .../running/<uri_sha1>/<task_sha1>  running replication tasks
+ *   .../waiting/<task_sha1>             outstanding replication tasks
+ * </code>
+ *
+ * <p>The URI lock is acquired by creating the directory and released by removing it.
+ *
+ * <p>Tasks are moved atomically via a rename between those directories to indicate the current
+ * state of each task.
+ */
 @Singleton
 public class ReplicationTasksStorage {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   private boolean disableDeleteForTesting;
 
-  public static class ReplicateRefUpdate {
-    public final String project;
+  public static class ReplicateRefUpdate extends UriUpdate {
     public final String ref;
+
+    public ReplicateRefUpdate(UriUpdate update, String ref) {
+      this(update.project, ref, update.uri, update.remote);
+    }
+
+    public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
+      this(project, ref, uri.toASCIIString(), remote);
+    }
+
+    protected ReplicateRefUpdate(String project, String ref, String uri, String remote) {
+      super(project, uri, remote);
+      this.ref = ref;
+    }
+
+    @Override
+    public String toString() {
+      return "ref-update " + ref + " (" + super.toString() + ")";
+    }
+  }
+
+  public static class UriUpdate {
+    public final String project;
     public final String uri;
     public final String remote;
 
-    public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
+    public UriUpdate(PushOne push) {
+      this(push.getProjectNameKey().get(), push.getURI(), push.getRemoteName());
+    }
+
+    public UriUpdate(String project, URIish uri, String remote) {
+      this(project, uri.toASCIIString(), remote);
+    }
+
+    public UriUpdate(String project, String uri, String remote) {
       this.project = project;
-      this.ref = ref;
-      this.uri = uri.toASCIIString();
+      this.uri = uri;
       this.remote = remote;
     }
 
     @Override
     public String toString() {
-      return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
+      return "uri-update " + project + " uri:" + uri + " remote:" + remote;
     }
   }
 
   private static Gson GSON = new Gson();
 
-  private final Path refUpdates;
+  private final Path buildingUpdates;
+  private final Path runningUpdates;
+  private final Path waitingUpdates;
 
   @Inject
   ReplicationTasksStorage(ReplicationConfig config) {
-    refUpdates = config.getEventsDirectory().resolve("ref-updates");
+    Path refUpdates = config.getEventsDirectory().resolve("ref-updates");
+    buildingUpdates = refUpdates.resolve("building");
+    runningUpdates = refUpdates.resolve("running");
+    waitingUpdates = refUpdates.resolve("waiting");
   }
 
-  public String persist(ReplicateRefUpdate r) {
-    String json = GSON.toJson(r) + "\n";
-    String eventKey = sha1(json).name();
-    Path file = refUpdates().resolve(eventKey);
-
-    if (Files.exists(file)) {
-      return eventKey;
-    }
-
-    try {
-      logger.atFine().log("CREATE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
-      Files.write(file, json.getBytes(UTF_8));
-    } catch (IOException e) {
-      logger.atWarning().withCause(e).log("Couldn't persist event %s", json);
-    }
-    return eventKey;
+  public String create(ReplicateRefUpdate r) {
+    return new Task(r).create();
   }
 
   @VisibleForTesting
@@ -89,37 +134,79 @@
     this.disableDeleteForTesting = deleteDisabled;
   }
 
-  public void delete(ReplicateRefUpdate r) {
-    String taskJson = GSON.toJson(r) + "\n";
-    String taskKey = sha1(taskJson).name();
-    Path file = refUpdates().resolve(taskKey);
-
-    if (disableDeleteForTesting) {
-      logger.atFine().log("DELETE %s (%s:%s => %s) DISABLED", file, r.project, r.ref, r.uri);
-      return;
+  public boolean start(PushOne push) {
+    UriLock lock = new UriLock(push);
+    if (!lock.acquire()) {
+      return false;
     }
 
-    try {
-      logger.atFine().log("DELETE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
-      Files.delete(file);
-    } catch (IOException e) {
-      logger.atSevere().withCause(e).log("Error while deleting event %s", taskKey);
+    boolean started = false;
+    for (String ref : push.getRefs()) {
+      started = new Task(lock, ref).start() || started;
     }
+
+    if (!started) { // No tasks left, likely replicated externally
+      lock.release();
+    }
+    return started;
   }
 
-  public List<ReplicateRefUpdate> list() {
-    ArrayList<ReplicateRefUpdate> result = new ArrayList<>();
-    try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) {
-      for (Path e : events) {
-        if (Files.isRegularFile(e)) {
-          String json = new String(Files.readAllBytes(e), UTF_8);
-          result.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+  public void reset(PushOne push) {
+    UriLock lock = new UriLock(push);
+    for (String ref : push.getRefs()) {
+      new Task(lock, ref).reset();
+    }
+    lock.release();
+  }
+
+  public void resetAll() {
+    try (DirectoryStream<Path> dirs = Files.newDirectoryStream(createDir(runningUpdates))) {
+      for (Path dir : dirs) {
+        UriLock lock = null;
+        for (ReplicateRefUpdate u : list(dir)) {
+          if (lock == null) {
+            lock = new UriLock(u);
+          }
+          new Task(u).reset();
+        }
+        if (lock != null) {
+          lock.release();
         }
       }
     } catch (IOException e) {
-      logger.atSevere().withCause(e).log("Error when firing pending events");
+      logger.atSevere().withCause(e).log("Error while aborting running tasks");
     }
-    return result;
+  }
+
+  public boolean isWaiting(PushOne push) {
+    return push.getRefs().stream().map(ref -> new Task(push, ref)).anyMatch(Task::isWaiting);
+  }
+
+  public void finish(PushOne push) {
+    UriLock lock = new UriLock(push);
+    for (ReplicateRefUpdate r : list(lock.runningDir)) {
+      new Task(lock, r.ref).finish();
+    }
+    lock.release();
+  }
+
+  public List<ReplicateRefUpdate> listWaiting() {
+    return list(createDir(waitingUpdates));
+  }
+
+  private List<ReplicateRefUpdate> list(Path tasks) {
+    List<ReplicateRefUpdate> results = new ArrayList<>();
+    try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) {
+      for (Path e : events) {
+        if (Files.isRegularFile(e)) {
+          String json = new String(Files.readAllBytes(e), UTF_8);
+          results.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+        }
+      }
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Error while listing tasks");
+    }
+    return results;
   }
 
   @SuppressWarnings("deprecation")
@@ -127,11 +214,140 @@
     return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
   }
 
-  private Path refUpdates() {
+  private static Path createDir(Path dir) {
     try {
-      return Files.createDirectories(refUpdates);
+      return Files.createDirectories(dir);
     } catch (IOException e) {
-      throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e);
+      throw new ProvisionException(String.format("Couldn't create %s", dir), e);
+    }
+  }
+
+  private class UriLock {
+    public final UriUpdate update;
+    public final String uriKey;
+    public final Path runningDir;
+
+    public UriLock(PushOne push) {
+      this(new UriUpdate(push));
+    }
+
+    public UriLock(UriUpdate update) {
+      this.update = update;
+      uriKey = sha1(update.uri).name();
+      runningDir = createDir(runningUpdates).resolve(uriKey);
+    }
+
+    public boolean acquire() {
+      try {
+        logger.atFine().log("MKDIR %s %s", runningDir, updateLog());
+        Files.createDirectory(runningDir);
+        return true;
+      } catch (FileAlreadyExistsException e) {
+        return false; // already running, likely externally
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while starting uri %s", uriKey);
+        return true; // safer to risk a duplicate than to skip it
+      }
+    }
+
+    public void release() {
+      if (disableDeleteForTesting) {
+        logger.atFine().log("DELETE %s %s DISABLED", runningDir, updateLog());
+        return;
+      }
+
+      try {
+        logger.atFine().log("DELETE %s %s", runningDir, updateLog());
+        Files.delete(runningDir);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while releasing uri %s", uriKey);
+      }
+    }
+
+    private String updateLog() {
+      return String.format("(%s => %s)", update.project, update.uri);
+    }
+  }
+
+  private class Task {
+    public final ReplicateRefUpdate update;
+    public final String json;
+    public final String taskKey;
+    public final Path running;
+    public final Path waiting;
+
+    public Task(ReplicateRefUpdate r) {
+      this(new UriLock(r), r.ref);
+    }
+
+    public Task(PushOne push, String ref) {
+      this(new UriLock(push), ref);
+    }
+
+    public Task(UriLock lock, String ref) {
+      update = new ReplicateRefUpdate(lock.update, ref);
+      json = GSON.toJson(update) + "\n";
+      String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
+      taskKey = sha1(key).name();
+      running = lock.runningDir.resolve(taskKey);
+      waiting = createDir(waitingUpdates).resolve(taskKey);
+    }
+
+    public String create() {
+      if (Files.exists(waiting)) {
+        return taskKey;
+      }
+
+      try {
+        Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null);
+        logger.atFine().log("CREATE %s %s", tmp, updateLog());
+        Files.write(tmp, json.getBytes(UTF_8));
+        logger.atFine().log("RENAME %s %s %s", tmp, waiting, updateLog());
+        rename(tmp, waiting);
+      } catch (IOException e) {
+        logger.atWarning().withCause(e).log("Couldn't create task %s", json);
+      }
+      return taskKey;
+    }
+
+    public boolean start() {
+      rename(waiting, running);
+      return Files.exists(running);
+    }
+
+    public void reset() {
+      rename(running, waiting);
+    }
+
+    public boolean isWaiting() {
+      return Files.exists(waiting);
+    }
+
+    public void finish() {
+      if (disableDeleteForTesting) {
+        logger.atFine().log("DELETE %s %s DISABLED", running, updateLog());
+        return;
+      }
+
+      try {
+        logger.atFine().log("DELETE %s %s", running, updateLog());
+        Files.delete(running);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while finishing task %s", taskKey);
+      }
+    }
+
+    private void rename(Path from, Path to) {
+      try {
+        logger.atFine().log("RENAME %s to %s %s", from, to, updateLog());
+        Files.move(from, to, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while renaming task %s", taskKey);
+      }
+    }
+
+    private String updateLog() {
+      return String.format("(%s:%s => %s)", update.project, update.ref, update.uri);
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
index bcb1e2f..f7071d8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
@@ -27,6 +27,10 @@
     return new RunwayStatus(false, inFlightPushId);
   }
 
+  public static RunwayStatus deniedExternal() {
+    return new RunwayStatus(false, -1);
+  }
+
   private final boolean allowed;
   private final int inFlightPushId;
 
@@ -43,6 +47,10 @@
     return !allowed && inFlightPushId == 0;
   }
 
+  public boolean isExternalInflight() {
+    return !allowed && inFlightPushId == -1;
+  }
+
   public int getInFlightPushId() {
     return inFlightPushId;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
index 18a4cc2..ed15b92 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
@@ -26,11 +26,11 @@
 import org.eclipse.jgit.util.FS;
 
 /** Looks up a remote's password in secure.config. */
-class SecureCredentialsFactory implements CredentialsFactory {
+public class SecureCredentialsFactory implements CredentialsFactory {
   private final Config config;
 
   @Inject
-  SecureCredentialsFactory(SitePaths site) throws ConfigInvalidException, IOException {
+  public SecureCredentialsFactory(SitePaths site) throws ConfigInvalidException, IOException {
     config = load(site);
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
index 70452b4..ffa6be1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
@@ -16,8 +16,8 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
 
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.ioutil.HexFormat;
 import com.google.gerrit.server.util.IdGenerator;
 import com.google.inject.Inject;
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 32bc630..19a478d 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -94,6 +94,20 @@
 :	Timeout for SSH connections. If 0, there is no timeout and
         the client waits indefinitely. By default, 2 minutes.
 
+replication.distributionInterval
+:	Interval in seconds for running the replication distributor. When
+	run, the replication distributor will add all persisted waiting tasks
+	to the queue to ensure that externally loaded tasks are visible to
+	the current process. If zero, turn off the replication distributor. By
+	default, zero.
+
+	Turning this on is likely only useful when there are other processes
+	(such as other masters in the same cluster) writing to the same
+	persistence store. To ensure that updates are seen well before their
+	replicationDelay expires when the distributor is used, the recommended
+	value for this is approximately the smallest remote.NAME.replicationDelay
+	divided by 5.
+
 replication.lockErrorMaxRetries
 :	Number of times to retry a replication operation if a lock
 	error is detected.
@@ -142,6 +156,11 @@
 	persisted events will not be deleted. When the plugin is started again,
 	it will trigger all replications found under this directory.
 
+	For replication to work, is is important that atomic renames be possible
+	from within any subdirectory of the eventsDirectory to within any other
+	subdirectory of the eventsDirectory. This generally means that the entire
+	contents of the eventsDirectory should live on the same filesystem.
+
 	When not set, defaults to the plugin's data directory.
 
 remote.NAME.url
@@ -419,6 +438,15 @@
 	By default, replicates without matching, i.e. replicates
 	everything to all remotes.
 
+remote.NAME.slowLatencyThreshold
+:	the time duration after which the replication of a project to this
+	destination will be considered "slow". A slow project replication
+	will cause additional metrics to be exposed for further investigation.
+	See [metrics.md](metrics.md) for further details.
+
+	default: 15 minutes
+
+
 File `secure.config`
 --------------------
 
diff --git a/src/main/resources/Documentation/metrics.md b/src/main/resources/Documentation/metrics.md
new file mode 100644
index 0000000..ef8b150
--- /dev/null
+++ b/src/main/resources/Documentation/metrics.md
@@ -0,0 +1,61 @@
+# Metrics
+
+Some metrics are emitted when replication occurs to a remote destination.
+The granularity of the metrics recorded is at destination level, however when a particular project replication is flagged
+as slow. This happens when the replication took longer than allowed threshold (see _remote.NAME.slowLatencyThreshold_ in [config.md](config.md))
+
+The reason only slow metrics are published, rather than all, is to contain their number, which, on a big Gerrit installation
+could potentially be considerably big.
+
+### Project level
+
+* plugins_replication_latency_slower_than_<threshold>_<destinationName>_<ProjectName> - Time spent pushing <ProjectName> to remote <destinationName> (in ms)
+
+### Destination level
+
+* plugins_replication_replication_delay_<destinationName> - Time spent waiting before pushing to remote <destinationName> (in ms)
+* plugins_replication_replication_retries_<destinationName> - Number of retries when pushing to remote <destinationName>
+* plugins_replication_replication_latency_<destinationName> - Time spent pushing to remote <destinationName> (in ms)
+
+### Example
+```
+# HELP plugins_replication_replication_delay_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_delay/destination, type=com.codahale.metrics.Histogram)
+# TYPE plugins_replication_replication_delay_destination summary
+plugins_replication_replication_delay_destinationName{quantile="0.5",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.75",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.95",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.98",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.99",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.999",} 65726.0
+plugins_replication_replication_delay_destinationName_count 3.0
+
+# HELP plugins_replication_replication_retries_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_retries/destination, type=com.codahale.metrics.Histogram)
+# TYPE plugins_replication_replication_retries_destination summary
+plugins_replication_replication_retries_destinationName{quantile="0.5",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.75",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.95",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.98",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.99",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.999",} 1.0
+plugins_replication_replication_retries_destinationName_count 3.0
+
+# HELP plugins_replication_replication_latency_destinationName Generated from Dropwizard metric import (metric=plugins/replication/replication_latency/destinationName, type=com.codahale.metrics.Timer)
+# TYPE plugins_replication_replication_latency_destinationName summary
+plugins_replication_replication_latency_destinationName{quantile="0.5",} 0.21199641400000002
+plugins_replication_replication_latency_destinationName{quantile="0.75",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.95",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.98",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.99",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.999",} 0.321083881
+plugins_replication_replication_latency_destinationName_count 2.0
+
+# HELP plugins_replication_latency_slower_than_60_destinationName_projectName Generated from Dropwizard metric import (metric=plugins/replication/latency_slower_than/60/destinationName/projectName, type=com.codahale.metrics.Histogram)
+# TYPE plugins_replication_latency_slower_than_60_destinationName_projectName summary
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.5",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.75",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.95",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.98",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.99",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.999",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName 1.0
+```
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
index 77dc1cc..3932fb3 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
@@ -16,26 +16,28 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static java.nio.file.Files.createTempDirectory;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.getCurrentArguments;
-import static org.easymock.EasyMock.isA;
-import static org.easymock.EasyMock.replay;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+import com.google.common.eventbus.EventBus;
 import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Injector;
 import com.google.inject.Module;
+import com.google.inject.util.Providers;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.stream.Collectors;
-import org.easymock.IAnswer;
+import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.util.FS;
 import org.junit.Before;
 import org.junit.Ignore;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 @Ignore
 public abstract class AbstractConfigTest {
@@ -43,6 +45,10 @@
   protected final SitePaths sitePaths;
   protected final Destination.Factory destinationFactoryMock;
   protected final Path pluginDataPath;
+  protected ReplicationQueue replicationQueueMock;
+  protected WorkQueue workQueueMock;
+  protected EventBus eventBus = new EventBus();
+  protected FakeExecutorService executorService = new FakeExecutorService();
 
   static class FakeDestination extends Destination {
     public final DestinationConfiguration config;
@@ -53,11 +59,9 @@
     }
 
     private static Injector injectorMock() {
-      Injector injector = createNiceMock(Injector.class);
-      Injector childInjectorMock = createNiceMock(Injector.class);
-      expect(injector.createChildInjector((Module) anyObject())).andReturn(childInjectorMock);
-      replay(childInjectorMock);
-      replay(injector);
+      Injector injector = mock(Injector.class);
+      Injector childInjectorMock = mock(Injector.class);
+      when(injector.createChildInjector(any(Module.class))).thenReturn(childInjectorMock);
       return injector;
     }
   }
@@ -66,21 +70,25 @@
     sitePath = createTempPath("site");
     sitePaths = new SitePaths(sitePath);
     pluginDataPath = createTempPath("data");
-    destinationFactoryMock = createMock(Destination.Factory.class);
+    destinationFactoryMock = mock(Destination.Factory.class);
   }
 
   @Before
   public void setup() {
-    expect(destinationFactoryMock.create(isA(DestinationConfiguration.class)))
-        .andAnswer(
-            new IAnswer<Destination>() {
+    when(destinationFactoryMock.create(any(DestinationConfiguration.class)))
+        .thenAnswer(
+            new Answer<Destination>() {
               @Override
-              public Destination answer() throws Throwable {
-                return new FakeDestination((DestinationConfiguration) getCurrentArguments()[0]);
+              public Destination answer(InvocationOnMock invocation) throws Throwable {
+                return new FakeDestination((DestinationConfiguration) invocation.getArguments()[0]);
               }
-            })
-        .anyTimes();
-    replay(destinationFactoryMock);
+            });
+
+    replicationQueueMock = mock(ReplicationQueue.class);
+    when(replicationQueueMock.isRunning()).thenReturn(Boolean.TRUE);
+
+    workQueueMock = mock(WorkQueue.class);
+    when(workQueueMock.createQueue(anyInt(), any(String.class))).thenReturn(executorService);
   }
 
   protected static Path createTempPath(String prefix) throws IOException {
@@ -113,4 +121,13 @@
 
     assertThatIsDestination(matchingDestinations.get(0), remoteName, remoteUrls);
   }
+
+  protected DestinationsCollection newDestinationsCollections(
+      ReplicationFileBasedConfig replicationFileBasedConfig) throws ConfigInvalidException {
+    return new DestinationsCollection(
+        destinationFactoryMock,
+        Providers.of(replicationQueueMock),
+        replicationFileBasedConfig,
+        eventBus);
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
index 211cafa..d85f622 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
@@ -15,127 +15,19 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import static com.google.common.truth.Truth.assertThat;
-import static org.easymock.EasyMock.anyInt;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
 
-import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.util.Providers;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.junit.Before;
 import org.junit.Test;
 
 public class AutoReloadConfigDecoratorTest extends AbstractConfigTest {
-  private AutoReloadConfigDecorator autoReloadConfig;
-  private ReplicationQueue replicationQueueMock;
-  private WorkQueue workQueueMock;
-  private FakeExecutorService executorService = new FakeExecutorService();
-
-  public class FakeExecutorService implements ScheduledExecutorService {
-    public Runnable refreshCommand;
-
-    @Override
-    public void shutdown() {}
-
-    @Override
-    public List<Runnable> shutdownNow() {
-      return null;
-    }
-
-    @Override
-    public boolean isShutdown() {
-      return false;
-    }
-
-    @Override
-    public boolean isTerminated() {
-      return false;
-    }
-
-    @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
-      return false;
-    }
-
-    @Override
-    public <T> Future<T> submit(Callable<T> task) {
-      return null;
-    }
-
-    @Override
-    public <T> Future<T> submit(Runnable task, T result) {
-      return null;
-    }
-
-    @Override
-    public Future<?> submit(Runnable task) {
-      return null;
-    }
-
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException {
-      return null;
-    }
-
-    @Override
-    public <T> List<Future<T>> invokeAll(
-        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-        throws InterruptedException {
-      return null;
-    }
-
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-        throws InterruptedException, ExecutionException {
-      return null;
-    }
-
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-      return null;
-    }
-
-    @Override
-    public void execute(Runnable command) {}
-
-    @Override
-    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
-      return null;
-    }
-
-    @Override
-    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
-      return null;
-    }
-
-    @Override
-    public ScheduledFuture<?> scheduleAtFixedRate(
-        Runnable command, long initialDelay, long period, TimeUnit unit) {
-      refreshCommand = command;
-      return null;
-    }
-
-    @Override
-    public ScheduledFuture<?> scheduleWithFixedDelay(
-        Runnable command, long initialDelay, long delay, TimeUnit unit) {
-      return null;
-    }
-  }
+  ReplicationFileBasedConfig replicationFileBasedConfig;
 
   public AutoReloadConfigDecoratorTest() throws IOException {
     super();
@@ -146,39 +38,7 @@
   public void setup() {
     super.setup();
 
-    setupMocks();
-  }
-
-  private void setupMocks() {
-    replicationQueueMock = createNiceMock(ReplicationQueue.class);
-    expect(replicationQueueMock.isRunning()).andReturn(true);
-    replay(replicationQueueMock);
-
-    workQueueMock = createNiceMock(WorkQueue.class);
-    expect(workQueueMock.createQueue(anyInt(), anyObject(String.class))).andReturn(executorService);
-    replay(workQueueMock);
-  }
-
-  @Test
-  public void shouldLoadNotEmptyInitialReplicationConfig() throws Exception {
-    FileBasedConfig replicationConfig = newReplicationConfig();
-    String remoteName = "foo";
-    String remoteUrl = "ssh://git@git.somewhere.com/${name}";
-    replicationConfig.setString("remote", remoteName, "url", remoteUrl);
-    replicationConfig.save();
-
-    autoReloadConfig =
-        new AutoReloadConfigDecorator(
-            sitePaths,
-            destinationFactoryMock,
-            Providers.of(replicationQueueMock),
-            pluginDataPath,
-            "replication",
-            workQueueMock);
-
-    List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL);
-    assertThat(destinations).hasSize(1);
-    assertThatIsDestination(destinations.get(0), remoteName, remoteUrl);
+    replicationFileBasedConfig = newReplicationFileBasedConfig();
   }
 
   @Test
@@ -190,17 +50,12 @@
     replicationConfig.setString("remote", remoteName1, "url", remoteUrl1);
     replicationConfig.save();
 
-    autoReloadConfig =
-        new AutoReloadConfigDecorator(
-            sitePaths,
-            destinationFactoryMock,
-            Providers.of(replicationQueueMock),
-            pluginDataPath,
-            "replication",
-            workQueueMock);
-    autoReloadConfig.startup(workQueueMock);
+    newAutoReloadConfig().start();
 
-    List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL);
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(replicationFileBasedConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(1);
     assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
 
@@ -212,7 +67,7 @@
     replicationConfig.save();
     executorService.refreshCommand.run();
 
-    destinations = autoReloadConfig.getDestinations(FilterType.ALL);
+    destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(2);
     assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
     assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
@@ -227,17 +82,10 @@
     replicationConfig.setString("remote", remoteName1, "url", remoteUrl1);
     replicationConfig.save();
 
-    autoReloadConfig =
-        new AutoReloadConfigDecorator(
-            sitePaths,
-            destinationFactoryMock,
-            Providers.of(replicationQueueMock),
-            pluginDataPath,
-            "replication",
-            workQueueMock);
-    autoReloadConfig.startup(workQueueMock);
-
-    List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL);
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(replicationFileBasedConfig);
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(1);
     assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
 
@@ -247,6 +95,23 @@
     replicationConfig.save();
     executorService.refreshCommand.run();
 
-    assertThat(autoReloadConfig.getDestinations(FilterType.ALL)).isEqualTo(destinations);
+    assertThat(destinationsCollections.getAll(FilterType.ALL)).isEqualTo(destinations);
+  }
+
+  private AutoReloadConfigDecorator newAutoReloadConfig() throws ConfigInvalidException {
+    AutoReloadRunnable autoReloadRunnable =
+        new AutoReloadRunnable(
+            newDestinationsCollections(replicationFileBasedConfig),
+            replicationFileBasedConfig,
+            sitePaths,
+            pluginDataPath,
+            eventBus,
+            Providers.of(replicationQueueMock));
+    return new AutoReloadConfigDecorator(
+        "replication", workQueueMock, replicationFileBasedConfig, autoReloadRunnable, eventBus);
+  }
+
+  private ReplicationFileBasedConfig newReplicationFileBasedConfig() {
+    return new ReplicationFileBasedConfig(sitePaths, pluginDataPath);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
new file mode 100644
index 0000000..b1aa7c8
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
@@ -0,0 +1,120 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.util.Providers;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoReloadRunnableTest {
+
+  private SitePaths sitePaths;
+  private EventBus eventBus;
+  private ReloadTrackerSubscriber onReloadSubscriber;
+  private String pluginName;
+  private ReplicationQueue replicationQueueMock;
+
+  @Before
+  public void setUp() throws IOException {
+    Path tmp = Files.createTempFile(pluginName, "_site");
+    Files.deleteIfExists(tmp);
+    sitePaths = new SitePaths(tmp);
+    pluginName = "replication";
+    eventBus = new EventBus();
+    onReloadSubscriber = new ReloadTrackerSubscriber();
+    eventBus.register(onReloadSubscriber);
+
+    replicationQueueMock = mock(ReplicationQueue.class);
+    when(replicationQueueMock.isRunning()).thenReturn(Boolean.TRUE);
+  }
+
+  @Test
+  public void configurationIsReloadedWhenValidationSucceeds() {
+    ReplicationConfigValidator validator = new TestValidConfigurationListener();
+
+    attemptAutoReload(validator);
+
+    assertThat(onReloadSubscriber.reloaded).isTrue();
+  }
+
+  @Test
+  public void configurationIsNotReloadedWhenValidationFails() {
+    ReplicationConfigValidator validator = new TestInvalidConfigurationListener();
+
+    attemptAutoReload(validator);
+
+    assertThat(onReloadSubscriber.reloaded).isFalse();
+  }
+
+  private void attemptAutoReload(ReplicationConfigValidator validator) {
+    final AutoReloadRunnable autoReloadRunnable =
+        new AutoReloadRunnable(
+            validator,
+            newVersionConfig(),
+            sitePaths,
+            sitePaths.data_dir,
+            eventBus,
+            Providers.of(replicationQueueMock));
+
+    autoReloadRunnable.run();
+  }
+
+  private ReplicationFileBasedConfig newVersionConfig() {
+    return new ReplicationFileBasedConfig(sitePaths, sitePaths.data_dir) {
+      @Override
+      public String getVersion() {
+        return String.format("%s", System.nanoTime());
+      }
+    };
+  }
+
+  private static class ReloadTrackerSubscriber {
+    public boolean reloaded = false;
+
+    @Subscribe
+    public void onReload(
+        @SuppressWarnings("unused") List<DestinationConfiguration> destinationConfigurations) {
+      reloaded = true;
+    }
+  }
+
+  private static class TestValidConfigurationListener implements ReplicationConfigValidator {
+    @Override
+    public List<RemoteConfiguration> validateConfig(ReplicationFileBasedConfig newConfig) {
+      return Collections.emptyList();
+    }
+  }
+
+  private static class TestInvalidConfigurationListener implements ReplicationConfigValidator {
+    @Override
+    public List<RemoteConfiguration> validateConfig(
+        ReplicationFileBasedConfig configurationChangeEvent) throws ConfigInvalidException {
+      throw new ConfigInvalidException("expected test failure");
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java b/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java
new file mode 100644
index 0000000..2f7059e
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java
@@ -0,0 +1,118 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class FakeExecutorService implements ScheduledExecutorService {
+  public Runnable refreshCommand = () -> {};
+
+  @Override
+  public void shutdown() {}
+
+  @Override
+  public List<Runnable> shutdownNow() {
+    return null;
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return false;
+  }
+
+  @Override
+  public boolean isTerminated() {
+    return false;
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+    return false;
+  }
+
+  @Override
+  public <T> Future<T> submit(Callable<T> task) {
+    return null;
+  }
+
+  @Override
+  public <T> Future<T> submit(Runnable task, T result) {
+    return null;
+  }
+
+  @Override
+  public Future<?> submit(Runnable task) {
+    return null;
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException {
+    return null;
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(
+      Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException {
+    return null;
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException, ExecutionException {
+    return null;
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    return null;
+  }
+
+  @Override
+  public void execute(Runnable command) {}
+
+  @Override
+  public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+    return null;
+  }
+
+  @Override
+  public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+    return null;
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleAtFixedRate(
+      Runnable command, long initialDelay, long period, TimeUnit unit) {
+    refreshCommand = command;
+    return null;
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleWithFixedDelay(
+      Runnable command, long initialDelay, long delay, TimeUnit unit) {
+    return null;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
index 0f6d629..2ee9a39 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
@@ -14,11 +14,10 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
-import static org.easymock.EasyMock.verify;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.permissions.PermissionBackendException;
@@ -36,14 +35,12 @@
 
   @Before
   public void setUp() throws Exception {
-    dispatcherMock = createMock(EventDispatcher.class);
-    replay(dispatcherMock);
+    dispatcherMock = mock(EventDispatcher.class);
     gitUpdateProcessing = new GitUpdateProcessing(dispatcherMock);
   }
 
   @Test
   public void headRefReplicated() throws URISyntaxException, PermissionBackendException {
-    reset(dispatcherMock);
     RefReplicatedEvent expectedEvent =
         new RefReplicatedEvent(
             "someProject",
@@ -51,9 +48,6 @@
             "someHost",
             RefPushResult.SUCCEEDED,
             RemoteRefUpdate.Status.OK);
-    dispatcherMock.postEvent(RefReplicatedEventEquals.eqEvent(expectedEvent));
-    expectLastCall().once();
-    replay(dispatcherMock);
 
     gitUpdateProcessing.onRefReplicatedToOneNode(
         "someProject",
@@ -61,12 +55,11 @@
         new URIish("git://someHost/someProject.git"),
         RefPushResult.SUCCEEDED,
         RemoteRefUpdate.Status.OK);
-    verify(dispatcherMock);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
   }
 
   @Test
   public void changeRefReplicated() throws URISyntaxException, PermissionBackendException {
-    reset(dispatcherMock);
     RefReplicatedEvent expectedEvent =
         new RefReplicatedEvent(
             "someProject",
@@ -74,9 +67,6 @@
             "someHost",
             RefPushResult.FAILED,
             RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD);
-    dispatcherMock.postEvent(RefReplicatedEventEquals.eqEvent(expectedEvent));
-    expectLastCall().once();
-    replay(dispatcherMock);
 
     gitUpdateProcessing.onRefReplicatedToOneNode(
         "someProject",
@@ -84,19 +74,15 @@
         new URIish("git://someHost/someProject.git"),
         RefPushResult.FAILED,
         RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD);
-    verify(dispatcherMock);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
   }
 
   @Test
   public void onAllNodesReplicated() throws PermissionBackendException {
-    reset(dispatcherMock);
     RefReplicationDoneEvent expectedDoneEvent =
         new RefReplicationDoneEvent("someProject", "refs/heads/master", 5);
-    dispatcherMock.postEvent(RefReplicationDoneEventEquals.eqEvent(expectedDoneEvent));
-    expectLastCall().once();
-    replay(dispatcherMock);
 
     gitUpdateProcessing.onRefReplicatedToAllNodes("someProject", "refs/heads/master", 5);
-    verify(dispatcherMock);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedDoneEvent));
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index c010ddb..c09fcd1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -14,19 +14,18 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import static com.googlesource.gerrit.plugins.replication.RemoteRefUpdateCollectionMatcher.eqRemoteRef;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.getCurrentArguments;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.eclipse.jgit.lib.Ref.Storage.NEW;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.metrics.Timer1;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.PerThreadRequestScope;
 import com.google.gerrit.server.permissions.PermissionBackend;
@@ -45,8 +44,6 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import junit.framework.AssertionFailedError;
-import org.easymock.IAnswer;
 import org.eclipse.jgit.errors.NotSupportedException;
 import org.eclipse.jgit.errors.TransportException;
 import org.eclipse.jgit.lib.Config;
@@ -68,6 +65,8 @@
 import org.eclipse.jgit.util.FS;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class PushOneTest {
   private static final int TEST_PUSH_TIMEOUT_SECS = 10;
@@ -86,7 +85,7 @@
   private IdGenerator idGeneratorMock;
   private ReplicationStateListeners replicationStateListenersMock;
   private ReplicationMetrics replicationMetricsMock;
-  private Timer1.Context timerContextMock;
+  private Timer1.Context<String> timerContextMock;
   private ProjectCache projectCacheMock;
   private TransportFactory transportFactoryMock;
   private Transport transportMock;
@@ -108,7 +107,7 @@
 
   @Before
   public void setup() throws Exception {
-    projectNameKey = new Project.NameKey("fooProject");
+    projectNameKey = Project.nameKey("fooProject");
     urish = new URIish("http://foo.com/fooProject.git");
 
     newLocalRef =
@@ -126,6 +125,7 @@
     setupMocks();
   }
 
+  @SuppressWarnings("unchecked")
   private void setupMocks() throws Exception {
     FileBasedConfig config = new FileBasedConfig(new Config(), new File("/foo"), FS.DETECTED);
     config.setString("remote", "Replication", "push", "foo");
@@ -134,8 +134,8 @@
     setupRepositoryMock(config);
     setupGitRepoManagerMock();
 
-    projectStateMock = createNiceMock(ProjectState.class);
-    forProjectMock = createNiceMock(ForProject.class);
+    projectStateMock = mock(ProjectState.class);
+    forProjectMock = mock(ForProject.class);
     setupWithUserMock();
     setupPermissionBackedMock();
 
@@ -144,46 +144,22 @@
     setupRefSpecMock();
     setupRemoteConfigMock();
 
-    credentialsFactory = createNiceMock(CredentialsFactory.class);
+    credentialsFactory = mock(CredentialsFactory.class);
 
     setupFetchConnectionMock();
     setupPushConnectionMock();
     setupRequestScopeMock();
-    idGeneratorMock = createNiceMock(IdGenerator.class);
-    replicationStateListenersMock = createNiceMock(ReplicationStateListeners.class);
+    idGeneratorMock = mock(IdGenerator.class);
+    replicationStateListenersMock = mock(ReplicationStateListeners.class);
 
-    timerContextMock = createNiceMock(Timer1.Context.class);
+    timerContextMock = mock(Timer1.Context.class);
     setupReplicationMetricsMock();
 
     setupTransportMock();
 
     setupProjectCacheMock();
 
-    replicationConfigMock = createNiceMock(ReplicationConfig.class);
-
-    replay(
-        gitRepositoryManagerMock,
-        refUpdateMock,
-        repositoryMock,
-        permissionBackendMock,
-        destinationMock,
-        remoteConfigMock,
-        credentialsFactory,
-        threadRequestScoperMock,
-        idGeneratorMock,
-        replicationStateListenersMock,
-        replicationMetricsMock,
-        projectCacheMock,
-        timerContextMock,
-        transportFactoryMock,
-        projectStateMock,
-        withUserMock,
-        forProjectMock,
-        fetchConnection,
-        pushConnection,
-        refSpecMock,
-        refDatabaseMock,
-        replicationConfigMock);
+    replicationConfigMock = mock(ReplicationConfig.class);
   }
 
   @Test
@@ -212,10 +188,7 @@
 
     PushResult pushResult = new PushResult();
 
-    expect(transportMock.push(anyObject(), eqRemoteRef(expectedUpdates)))
-        .andReturn(pushResult)
-        .once();
-    replay(transportMock);
+    when(transportMock.push(any(), eq(expectedUpdates))).thenReturn(pushResult);
 
     PushOne pushOne = createPushOne(replicationPushFilter);
 
@@ -223,8 +196,6 @@
     pushOne.run();
 
     isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
-
-    verify(transportMock);
   }
 
   @Test
@@ -241,12 +212,6 @@
               }
             });
 
-    // easymock way to check if method was never called
-    expect(transportMock.push(anyObject(), anyObject()))
-        .andThrow(new AssertionFailedError())
-        .anyTimes();
-    replay(transportMock);
-
     PushOne pushOne = createPushOne(replicationPushFilter);
 
     pushOne.addRef(PushOne.ALL_REFS);
@@ -254,7 +219,7 @@
 
     isCallFinished.await(10, TimeUnit.SECONDS);
 
-    verify(transportMock);
+    verify(transportMock, never()).push(any(), any());
   }
 
   private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
@@ -281,31 +246,31 @@
   }
 
   private void setupProjectCacheMock() throws IOException {
-    projectCacheMock = createNiceMock(ProjectCache.class);
-    expect(projectCacheMock.checkedGet(projectNameKey)).andReturn(projectStateMock);
+    projectCacheMock = mock(ProjectCache.class);
+    when(projectCacheMock.checkedGet(projectNameKey)).thenReturn(projectStateMock);
   }
 
   private void setupTransportMock() throws NotSupportedException, TransportException {
-    transportMock = createNiceMock(Transport.class);
-    expect(transportMock.openFetch()).andReturn(fetchConnection);
-    transportFactoryMock = createNiceMock(TransportFactory.class);
-    expect(transportFactoryMock.open(repositoryMock, urish)).andReturn(transportMock).anyTimes();
+    transportMock = mock(Transport.class);
+    when(transportMock.openFetch()).thenReturn(fetchConnection);
+    transportFactoryMock = mock(TransportFactory.class);
+    when(transportFactoryMock.open(repositoryMock, urish)).thenReturn(transportMock);
   }
 
   private void setupReplicationMetricsMock() {
-    replicationMetricsMock = createNiceMock(ReplicationMetrics.class);
-    expect(replicationMetricsMock.start(anyObject())).andReturn(timerContextMock);
+    replicationMetricsMock = mock(ReplicationMetrics.class);
+    when(replicationMetricsMock.start(any())).thenReturn(timerContextMock);
   }
 
   private void setupRequestScopeMock() {
-    threadRequestScoperMock = createNiceMock(PerThreadRequestScope.Scoper.class);
-    expect(threadRequestScoperMock.scope(anyObject()))
-        .andAnswer(
-            new IAnswer<Callable<Object>>() {
+    threadRequestScoperMock = mock(PerThreadRequestScope.Scoper.class);
+    when(threadRequestScoperMock.scope(any()))
+        .thenAnswer(
+            new Answer<Callable<Object>>() {
               @SuppressWarnings("unchecked")
               @Override
-              public Callable<Object> answer() throws Throwable {
-                Callable<Object> originalCall = (Callable<Object>) getCurrentArguments()[0];
+              public Callable<Object> answer(InvocationOnMock invocation) throws Throwable {
+                Callable<Object> originalCall = (Callable<Object>) invocation.getArguments()[0];
                 return new Callable<Object>() {
 
                   @Override
@@ -316,66 +281,64 @@
                   }
                 };
               }
-            })
-        .anyTimes();
+            });
   }
 
   private void setupPushConnectionMock() {
-    pushConnection = createNiceMock(PushConnection.class);
-    expect(pushConnection.getRefsMap()).andReturn(remoteRefs);
+    pushConnection = mock(PushConnection.class);
+    when(pushConnection.getRefsMap()).thenReturn(remoteRefs);
   }
 
   private void setupFetchConnectionMock() {
-    fetchConnection = createNiceMock(FetchConnection.class);
-    expect(fetchConnection.getRefsMap()).andReturn(remoteRefs);
+    fetchConnection = mock(FetchConnection.class);
+    when(fetchConnection.getRefsMap()).thenReturn(remoteRefs);
   }
 
   private void setupRemoteConfigMock() {
-    remoteConfigMock = createNiceMock(RemoteConfig.class);
-    expect(remoteConfigMock.getPushRefSpecs()).andReturn(ImmutableList.of(refSpecMock));
+    remoteConfigMock = mock(RemoteConfig.class);
+    when(remoteConfigMock.getPushRefSpecs()).thenReturn(ImmutableList.of(refSpecMock));
   }
 
   private void setupRefSpecMock() {
-    refSpecMock = createNiceMock(RefSpec.class);
-    expect(refSpecMock.matchSource(anyObject(String.class))).andReturn(true);
-    expect(refSpecMock.expandFromSource(anyObject(String.class))).andReturn(refSpecMock);
-    expect(refSpecMock.getDestination()).andReturn("fooProject").anyTimes();
-    expect(refSpecMock.isForceUpdate()).andReturn(false).anyTimes();
+    refSpecMock = mock(RefSpec.class);
+    when(refSpecMock.matchSource(any(String.class))).thenReturn(true);
+    when(refSpecMock.expandFromSource(any(String.class))).thenReturn(refSpecMock);
+    when(refSpecMock.getDestination()).thenReturn("fooProject");
+    when(refSpecMock.isForceUpdate()).thenReturn(false);
   }
 
   private void setupDestinationMock() {
-    destinationMock = createNiceMock(Destination.class);
-    expect(destinationMock.requestRunway(anyObject())).andReturn(RunwayStatus.allowed());
+    destinationMock = mock(Destination.class);
+    when(destinationMock.requestRunway(any())).thenReturn(RunwayStatus.allowed());
   }
 
   private void setupPermissionBackedMock() {
-    permissionBackendMock = createNiceMock(PermissionBackend.class);
-    expect(permissionBackendMock.currentUser()).andReturn(withUserMock);
+    permissionBackendMock = mock(PermissionBackend.class);
+    when(permissionBackendMock.currentUser()).thenReturn(withUserMock);
   }
 
   private void setupWithUserMock() {
-    withUserMock = createNiceMock(WithUser.class);
-    expect(withUserMock.project(projectNameKey)).andReturn(forProjectMock);
+    withUserMock = mock(WithUser.class);
+    when(withUserMock.project(projectNameKey)).thenReturn(forProjectMock);
   }
 
   private void setupGitRepoManagerMock() throws IOException {
-    gitRepositoryManagerMock = createNiceMock(GitRepositoryManager.class);
-    expect(gitRepositoryManagerMock.openRepository(projectNameKey)).andReturn(repositoryMock);
+    gitRepositoryManagerMock = mock(GitRepositoryManager.class);
+    when(gitRepositoryManagerMock.openRepository(projectNameKey)).thenReturn(repositoryMock);
   }
 
   private void setupRepositoryMock(FileBasedConfig config) throws IOException {
-    repositoryMock = createNiceMock(Repository.class);
-    refDatabaseMock = createNiceMock(RefDatabase.class);
-    expect(repositoryMock.getConfig()).andReturn(config).anyTimes();
-    expect(repositoryMock.getRefDatabase()).andReturn(refDatabaseMock);
-    expect(refDatabaseMock.getRefs()).andReturn(localRefs);
-    expect(repositoryMock.updateRef("fooProject")).andReturn(refUpdateMock);
+    repositoryMock = mock(Repository.class);
+    refDatabaseMock = mock(RefDatabase.class);
+    when(repositoryMock.getConfig()).thenReturn(config);
+    when(repositoryMock.getRefDatabase()).thenReturn(refDatabaseMock);
+    when(refDatabaseMock.getRefs()).thenReturn(localRefs);
+    when(repositoryMock.updateRef("fooProject")).thenReturn(refUpdateMock);
   }
 
   private void setupRefUpdateMock() {
-    refUpdateMock = createNiceMock(RefUpdate.class);
-    expect(refUpdateMock.getOldObjectId())
-        .andReturn(ObjectId.fromString("0000000000000000000000000000000000000001"))
-        .anyTimes();
+    refUpdateMock = mock(RefUpdate.class);
+    when(refUpdateMock.getOldObjectId())
+        .thenReturn(ObjectId.fromString("0000000000000000000000000000000000000001"));
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java
deleted file mode 100644
index 983e97f..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java
+++ /dev/null
@@ -1,83 +0,0 @@
-// Copyright (C) 2013 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-
-public class RefReplicatedEventEquals implements IArgumentMatcher {
-
-  private RefReplicatedEvent expected;
-
-  public RefReplicatedEventEquals(RefReplicatedEvent expected) {
-    this.expected = expected;
-  }
-
-  public static final RefReplicatedEvent eqEvent(RefReplicatedEvent refReplicatedEvent) {
-    EasyMock.reportMatcher(new RefReplicatedEventEquals(refReplicatedEvent));
-    return null;
-  }
-
-  @Override
-  public boolean matches(Object actual) {
-    if (!(actual instanceof RefReplicatedEvent)) {
-      return false;
-    }
-    RefReplicatedEvent actualRefReplicatedEvent = (RefReplicatedEvent) actual;
-    if (!equals(expected.project, actualRefReplicatedEvent.project)) {
-      return false;
-    }
-    if (!equals(expected.ref, actualRefReplicatedEvent.ref)) {
-      return false;
-    }
-    if (!equals(expected.targetNode, actualRefReplicatedEvent.targetNode)) {
-      return false;
-    }
-    if (!equals(expected.status, actualRefReplicatedEvent.status)) {
-      return false;
-    }
-    if (!equals(expected.refStatus, actualRefReplicatedEvent.refStatus)) {
-      return false;
-    }
-    return true;
-  }
-
-  private static boolean equals(Object object1, Object object2) {
-    if (object1 == object2) {
-      return true;
-    }
-    if (object1 != null && !object1.equals(object2)) {
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public void appendTo(StringBuffer buffer) {
-    buffer.append("eqEvent(");
-    buffer.append(expected.getClass().getName());
-    buffer.append(" with project \"");
-    buffer.append(expected.project);
-    buffer.append("\" and ref \"");
-    buffer.append(expected.ref);
-    buffer.append("\" and targetNode \"");
-    buffer.append(expected.targetNode);
-    buffer.append("\" and status \"");
-    buffer.append(expected.status);
-    buffer.append("\" and refStatus \"");
-    buffer.append(expected.refStatus);
-    buffer.append("\")");
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java
deleted file mode 100644
index d1284e1..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright (C) 2013 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-
-public class RefReplicationDoneEventEquals implements IArgumentMatcher {
-
-  private RefReplicationDoneEvent expected;
-
-  public RefReplicationDoneEventEquals(RefReplicationDoneEvent expected) {
-    this.expected = expected;
-  }
-
-  public static final RefReplicationDoneEvent eqEvent(RefReplicationDoneEvent refReplicatedEvent) {
-    EasyMock.reportMatcher(new RefReplicationDoneEventEquals(refReplicatedEvent));
-    return null;
-  }
-
-  @Override
-  public boolean matches(Object actual) {
-    if (!(actual instanceof RefReplicationDoneEvent)) {
-      return false;
-    }
-    RefReplicationDoneEvent actualRefReplicatedDoneEvent = (RefReplicationDoneEvent) actual;
-    if (!equals(expected.project, actualRefReplicatedDoneEvent.project)) {
-      return false;
-    }
-    if (!equals(expected.ref, actualRefReplicatedDoneEvent.ref)) {
-      return false;
-    }
-    if (expected.nodesCount != actualRefReplicatedDoneEvent.nodesCount) {
-      return false;
-    }
-    return true;
-  }
-
-  private static boolean equals(Object object1, Object object2) {
-    if (object1 == object2) {
-      return true;
-    }
-    if (object1 != null && !object1.equals(object2)) {
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public void appendTo(StringBuffer buffer) {
-    buffer.append("eqEvent(");
-    buffer.append(expected.getClass().getName());
-    buffer.append(" with project \"");
-    buffer.append(expected.project);
-    buffer.append("\" and ref \"");
-    buffer.append(expected.ref);
-    buffer.append("\" and nodesCount \"");
-    buffer.append(expected.nodesCount);
-    buffer.append("\")");
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
deleted file mode 100644
index 111a792..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import java.util.Collection;
-import java.util.Objects;
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-import org.eclipse.jgit.transport.RemoteRefUpdate;
-
-public class RemoteRefUpdateCollectionMatcher implements IArgumentMatcher {
-  Collection<RemoteRefUpdate> expectedRemoteRefs;
-
-  public static Collection<RemoteRefUpdate> eqRemoteRef(
-      Collection<RemoteRefUpdate> expectedRemoteRefs) {
-    EasyMock.reportMatcher(new RemoteRefUpdateCollectionMatcher(expectedRemoteRefs));
-    return null;
-  }
-
-  public RemoteRefUpdateCollectionMatcher(Collection<RemoteRefUpdate> expectedRemoteRefs) {
-    this.expectedRemoteRefs = expectedRemoteRefs;
-  }
-
-  @Override
-  public boolean matches(Object argument) {
-    if (!(argument instanceof Collection)) return false;
-
-    @SuppressWarnings("unchecked")
-    Collection<RemoteRefUpdate> refs = (Collection<RemoteRefUpdate>) argument;
-
-    if (expectedRemoteRefs.size() != refs.size()) return false;
-    return refs.stream()
-        .allMatch(
-            ref -> expectedRemoteRefs.stream().anyMatch(expectedRef -> compare(ref, expectedRef)));
-  }
-
-  @Override
-  public void appendTo(StringBuffer buffer) {
-    buffer.append("expected:" + expectedRemoteRefs.toString());
-  }
-
-  private boolean compare(RemoteRefUpdate ref, RemoteRefUpdate expectedRef) {
-    return Objects.equals(ref.getRemoteName(), expectedRef.getRemoteName())
-        && Objects.equals(ref.getStatus(), expectedRef.getStatus())
-        && Objects.equals(ref.getExpectedOldObjectId(), expectedRef.getExpectedOldObjectId())
-        && Objects.equals(ref.getNewObjectId(), expectedRef.getNewObjectId())
-        && Objects.equals(ref.isFastForward(), expectedRef.isFastForward())
-        && Objects.equals(ref.getSrcRef(), expectedRef.getSrcRef())
-        && Objects.equals(ref.isForceUpdate(), expectedRef.isForceUpdate())
-        && Objects.equals(ref.getMessage(), expectedRef.getMessage());
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
index 36cc209..f2b027c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
@@ -19,7 +19,6 @@
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import java.io.IOException;
 import java.util.List;
-import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.junit.Test;
 
@@ -37,8 +36,10 @@
     config.setString("remote", remoteName, "url", remoteUrl);
     config.save();
 
-    ReplicationFileBasedConfig replicationConfig = newReplicationFileBasedConfig();
-    List<Destination> destinations = replicationConfig.getDestinations(FilterType.ALL);
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(newReplicationFileBasedConfig());
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(1);
 
     assertThatIsDestination(destinations.get(0), remoteName, remoteUrl);
@@ -55,18 +56,19 @@
     config.setString("remote", remoteName2, "url", remoteUrl2);
     config.save();
 
-    ReplicationFileBasedConfig replicationConfig = newReplicationFileBasedConfig();
-    List<Destination> destinations = replicationConfig.getDestinations(FilterType.ALL);
+    DestinationsCollection destinationsCollections =
+        newDestinationsCollections(newReplicationFileBasedConfig());
+    destinationsCollections.startup(workQueueMock);
+    List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
     assertThat(destinations).hasSize(2);
 
     assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
     assertThatIsDestination(destinations.get(1), remoteName2, remoteUrl2);
   }
 
-  private ReplicationFileBasedConfig newReplicationFileBasedConfig()
-      throws ConfigInvalidException, IOException {
+  private ReplicationFileBasedConfig newReplicationFileBasedConfig() {
     ReplicationFileBasedConfig replicationConfig =
-        new ReplicationFileBasedConfig(sitePaths, destinationFactoryMock, pluginDataPath);
+        new ReplicationFileBasedConfig(sitePaths, pluginDataPath);
     return replicationConfig;
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index a6a8ac8..19269f0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -24,10 +24,10 @@
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.extensions.api.projects.BranchInput;
 import com.google.gerrit.extensions.common.ProjectInfo;
-import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import com.google.inject.Key;
@@ -100,7 +100,7 @@
 
     assertThat(listReplicationTasks("refs/meta/config")).hasSize(1);
 
-    waitUntil(() -> projectExists(new Project.NameKey(sourceProject + "replica.git")));
+    waitUntil(() -> projectExists(Project.nameKey(sourceProject + "replica.git")));
 
     ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get();
     assertThat(replicaProject).isNotNull();
@@ -115,7 +115,7 @@
 
     Result pushResult = createChange();
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1);
 
@@ -164,7 +164,7 @@
 
     Result pushResult = createChange();
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
 
@@ -267,10 +267,10 @@
     setReplicationDestination(remoteName, "replica", ALL_PROJECTS);
 
     Result pushResult = createChange();
-    shutdownConfig();
+    shutdownDestinations();
 
     pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     assertThrows(
         InterruptedException.class,
@@ -293,10 +293,10 @@
     reloadConfig();
 
     Result pushResult = createChange();
-    shutdownConfig();
+    shutdownDestinations();
 
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -318,7 +318,7 @@
     replicationQueueStart();
 
     RevCommit sourceCommit = pushResult.getCommit();
-    String sourceRef = pushResult.getPatchSet().getRefName();
+    String sourceRef = pushResult.getPatchSet().refName();
 
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -357,6 +357,7 @@
     config.setStringList("remote", remoteName, "url", replicaUrls);
     config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
     project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+    config.setBoolean("gerrit", null, "autoReload", true);
     config.save();
   }
 
@@ -365,11 +366,11 @@
   }
 
   private void reloadConfig() {
-    getAutoReloadConfigDecoratorInstance().forceReload();
+    getAutoReloadConfigDecoratorInstance().reload();
   }
 
-  private void shutdownConfig() {
-    getAutoReloadConfigDecoratorInstance().shutdown();
+  private void shutdownDestinations() {
+    getInstance(DestinationsCollection.class).shutdown();
   }
 
   private void replicationQueueStart() {
@@ -398,7 +399,7 @@
 
   private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
     Pattern refmaskPattern = Pattern.compile(refRegex);
-    return tasksStorage.list().stream()
+    return tasksStorage.listWaiting().stream()
         .filter(task -> refmaskPattern.matcher(task.ref).matches())
         .collect(toList());
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
index 193af1e..e0de577 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
@@ -15,11 +15,9 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import static com.google.common.truth.Truth.assertThat;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.resetToDefault;
-import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
 import java.net.URISyntaxException;
@@ -35,8 +33,7 @@
 
   @Before
   public void setUp() throws Exception {
-    pushResultProcessingMock = createNiceMock(PushResultProcessing.class);
-    replay(pushResultProcessingMock);
+    pushResultProcessingMock = mock(PushResultProcessing.class);
     replicationState = new ReplicationState(pushResultProcessingMock);
   }
 
@@ -53,52 +50,36 @@
 
   @Test
   public void shouldFireOneReplicationEventWhenNothingToReplicate() {
-    resetToDefault(pushResultProcessingMock);
-
-    // expected event
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(0);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.markAllPushTasksScheduled();
-    verify(pushResultProcessingMock);
+
+    // expected event
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(0);
   }
 
   @Test
   public void shouldFireEventsForReplicationOfOneRefToOneNode() throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri = new URIish("git://someHost/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "someRef", 1);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(1);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "someRef");
     replicationState.markAllPushTasksScheduled();
     replicationState.notifyRefReplicated(
         "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "someRef", 1);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(1);
   }
 
   @Test
   public void shouldFireEventsForReplicationOfOneRefToMultipleNodes() throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri1 = new URIish("git://someHost1/someRepo.git");
     URIish uri2 = new URIish("git://someHost2/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "someRef", uri2, RefPushResult.FAILED, RemoteRefUpdate.Status.NON_EXISTING);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "someRef", 2);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "someRef");
     replicationState.increasePushTaskCount("someProject", "someRef");
@@ -107,33 +88,29 @@
         "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.notifyRefReplicated(
         "someProject", "someRef", uri2, RefPushResult.FAILED, RemoteRefUpdate.Status.NON_EXISTING);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject",
+            "someRef",
+            uri2,
+            RefPushResult.FAILED,
+            RemoteRefUpdate.Status.NON_EXISTING);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "someRef", 2);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2);
   }
 
   @Test
   public void shouldFireEventsForReplicationOfMultipleRefsToMultipleNodes()
       throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri1 = new URIish("git://host1/someRepo.git");
     URIish uri2 = new URIish("git://host2/someRepo.git");
     URIish uri3 = new URIish("git://host3/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri3, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref1", 3);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref2", 2);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(5);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "ref1");
     replicationState.increasePushTaskCount("someProject", "ref1");
@@ -151,24 +128,32 @@
         "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.notifyRefReplicated(
         "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri3, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref1", 3);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref2", 2);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(5);
   }
 
   @Test
   public void shouldFireEventsForReplicationSameRefDifferentProjects() throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri = new URIish("git://host1/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("project1", "ref1", 1);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("project2", "ref2", 1);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("project1", "ref1");
     replicationState.increasePushTaskCount("project2", "ref2");
@@ -177,25 +162,24 @@
         "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.notifyRefReplicated(
         "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("project1", "ref1", 1);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("project2", "ref2", 1);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2);
   }
 
   @Test
   public void shouldFireEventsWhenSomeReplicationCompleteBeforeAllTasksAreScheduled()
       throws URISyntaxException {
-    resetToDefault(pushResultProcessingMock);
     URIish uri1 = new URIish("git://host1/someRepo.git");
 
-    // expected events
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToOneNode(
-        "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref1", 1);
-    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref2", 1);
-    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
-    replay(pushResultProcessingMock);
-
     // actual test
     replicationState.increasePushTaskCount("someProject", "ref1");
     replicationState.increasePushTaskCount("someProject", "ref2");
@@ -204,7 +188,17 @@
     replicationState.notifyRefReplicated(
         "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
     replicationState.markAllPushTasksScheduled();
-    verify(pushResultProcessingMock);
+
+    // expected events
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock)
+        .onRefReplicatedToOneNode(
+            "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref1", 1);
+    verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref2", 1);
+    verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2);
   }
 
   @Test