Merge branch 'stable-2.15' into stable-2.16

* stable-2.15:
  ReplicationQueue: Check nulls in firePendingEvents
  Log stack trace when an error occur while deleting event
  Append LF to the json string of persisted replication event
  Change default for the replicateOnStartup to false
  Don't lose ref-updated events on plugin restart

Change-Id: Icc855dd26ee9f8fb195435d8902404b364242940
diff --git a/BUILD b/BUILD
index 0a54c3e..50615d8 100644
--- a/BUILD
+++ b/BUILD
@@ -21,7 +21,10 @@
 
 junit_tests(
     name = "replication_tests",
-    srcs = glob(["src/test/java/**/*Test.java"]),
+    srcs = glob([
+        "src/test/java/**/*Test.java",
+        "src/test/java/**/*IT.java",
+    ]),
     tags = ["replication"],
     visibility = ["//visibility:public"],
     deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
@@ -32,7 +35,7 @@
 
 java_library(
     name = "replication_util",
-    testonly = 1,
+    testonly = True,
     srcs = glob(
         ["src/test/java/**/*.java"],
         exclude = ["src/test/java/**/*Test.java"],
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
new file mode 100644
index 0000000..acbf763
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
@@ -0,0 +1,25 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.reviewdb.client.Project;
+
+public interface AdminApi {
+  public boolean createProject(Project.NameKey project, String head);
+
+  public boolean deleteProject(Project.NameKey project);
+
+  public boolean updateHead(Project.NameKey project, String newHead);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApiFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApiFactory.java
new file mode 100644
index 0000000..de6e91e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApiFactory.java
@@ -0,0 +1,73 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.Optional;
+import org.eclipse.jgit.transport.URIish;
+
+/** Factory for creating an {@link AdminApi} instance for a remote URI. */
+public interface AdminApiFactory {
+  /**
+   * Create an {@link AdminApi} for the given remote URI.
+   *
+   * @param uri the remote URI.
+   * @return An API for the given remote URI, or {@code Optional.empty} if there is no appropriate
+   *     API for the URI.
+   */
+  Optional<AdminApi> create(URIish uri);
+
+  @Singleton
+  static class DefaultAdminApiFactory implements AdminApiFactory {
+    protected final SshHelper sshHelper;
+
+    @Inject
+    public DefaultAdminApiFactory(SshHelper sshHelper) {
+      this.sshHelper = sshHelper;
+    }
+
+    @Override
+    public Optional<AdminApi> create(URIish uri) {
+      if (isGerrit(uri)) {
+        return Optional.of(new GerritSshApi(sshHelper, uri));
+      } else if (!uri.isRemote()) {
+        return Optional.of(new LocalFS(uri));
+      } else if (isSSH(uri)) {
+        return Optional.of(new RemoteSsh(sshHelper, uri));
+      }
+      return Optional.empty();
+    }
+  }
+
+  static boolean isGerrit(URIish uri) {
+    String scheme = uri.getScheme();
+    return scheme != null && scheme.toLowerCase().equals("gerrit+ssh");
+  }
+
+  static boolean isSSH(URIish uri) {
+    if (!uri.isRemote()) {
+      return false;
+    }
+    String scheme = uri.getScheme();
+    if (scheme != null && scheme.toLowerCase().contains("ssh")) {
+      return true;
+    }
+    if (scheme == null && uri.getHost() != null && uri.getPath() != null) {
+      return true;
+    }
+    return false;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index e73e049..02daa6d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -13,35 +13,42 @@
 // limitations under the License.
 package com.googlesource.gerrit.plugins.replication;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.common.FileUtil;
 import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.google.inject.Singleton;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
 import org.eclipse.jgit.errors.ConfigInvalidException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Singleton
 public class AutoReloadConfigDecorator implements ReplicationConfig {
-  private static final Logger log = LoggerFactory.getLogger(AutoReloadConfigDecorator.class);
-  private ReplicationFileBasedConfig currentConfig;
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private volatile ReplicationFileBasedConfig currentConfig;
   private long currentConfigTs;
+  private long lastFailedConfigTs;
 
   private final SitePaths site;
-  private final WorkQueue workQueue;
-  private final DestinationFactory destinationFactory;
+  private final Destination.Factory destinationFactory;
   private final Path pluginDataDir;
+  // Use Provider<> instead of injecting the ReplicationQueue because of circular dependency with
+  // ReplicationConfig
+  private final Provider<ReplicationQueue> replicationQueue;
+
+  private volatile boolean shuttingDown;
 
   @Inject
   public AutoReloadConfigDecorator(
       SitePaths site,
-      WorkQueue workQueue,
-      DestinationFactory destinationFactory,
+      Destination.Factory destinationFactory,
+      Provider<ReplicationQueue> replicationQueue,
       @PluginData Path pluginDataDir)
       throws ConfigInvalidException, IOException {
     this.site = site;
@@ -49,7 +56,7 @@
     this.pluginDataDir = pluginDataDir;
     this.currentConfig = loadConfig();
     this.currentConfigTs = getLastModified(currentConfig);
-    this.workQueue = workQueue;
+    this.replicationQueue = replicationQueue;
   }
 
   private static long getLastModified(ReplicationFileBasedConfig cfg) {
@@ -71,25 +78,42 @@
   }
 
   private void reloadIfNeeded() {
-    try {
-      if (isAutoReload()) {
-        long lastModified = getLastModified(currentConfig);
-        if (lastModified > currentConfigTs) {
-          ReplicationFileBasedConfig newConfig = loadConfig();
-          newConfig.startup(workQueue);
-          int discarded = currentConfig.shutdown();
+    reload(false);
+  }
 
-          this.currentConfig = newConfig;
-          this.currentConfigTs = lastModified;
-          log.info(
-              "Configuration reloaded: {} destinations, {} replication events discarded",
-              currentConfig.getDestinations(FilterType.ALL).size(),
-              discarded);
+  @VisibleForTesting
+  public void forceReload() {
+    reload(true);
+  }
+
+  private void reload(boolean force) {
+    if (force || isAutoReload()) {
+      ReplicationQueue queue = replicationQueue.get();
+
+      long lastModified = getLastModified(currentConfig);
+      try {
+        if (force
+            || (!shuttingDown
+                && lastModified > currentConfigTs
+                && lastModified > lastFailedConfigTs
+                && queue.isRunning()
+                && !queue.isReplaying())) {
+          queue.stop();
+          currentConfig = loadConfig();
+          currentConfigTs = lastModified;
+          lastFailedConfigTs = 0;
+          logger.atInfo().log(
+              "Configuration reloaded: %d destinations",
+              currentConfig.getDestinations(FilterType.ALL).size());
         }
+      } catch (Exception e) {
+        logger.atSevere().withCause(e).log(
+            "Cannot reload replication configuration: keeping existing settings");
+        lastFailedConfigTs = lastModified;
+        return;
+      } finally {
+        queue.start();
       }
-    } catch (Exception e) {
-      log.error("Cannot reload replication configuration: keeping existing settings", e);
-      return;
     }
   }
 
@@ -113,13 +137,28 @@
     return currentConfig.getEventsDirectory();
   }
 
+  /* shutdown() cannot be set as a synchronized method because
+   * it may need to wait for pending events to complete;
+   * e.g. when enabling the drain of replication events before
+   * shutdown.
+   *
+   * As a rule of thumb for synchronized methods, because they
+   * implicitly define a critical section and associated lock,
+   * they should never hold waiting for another resource, otherwise
+   * the risk of deadlock is very high.
+   *
+   * See more background about deadlocks, what they are and how to
+   * prevent them at: https://en.wikipedia.org/wiki/Deadlock
+   */
   @Override
-  public synchronized int shutdown() {
+  public int shutdown() {
+    this.shuttingDown = true;
     return currentConfig.shutdown();
   }
 
   @Override
   public synchronized void startup(WorkQueue workQueue) {
+    shuttingDown = false;
     currentConfig.startup(workQueue);
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
index f8737b6..29a7ee6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
@@ -16,18 +16,16 @@
 
 import static com.google.gerrit.common.FileUtil.lastModified;
 
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.concurrent.atomic.AtomicReference;
 import org.eclipse.jgit.errors.ConfigInvalidException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class AutoReloadSecureCredentialsFactoryDecorator implements CredentialsFactory {
-  private static final Logger log =
-      LoggerFactory.getLogger(AutoReloadSecureCredentialsFactoryDecorator.class);
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   private final AtomicReference<SecureCredentialsFactory> secureCredentialsFactory;
   private volatile long secureCredentialsFactoryLoadTs;
@@ -58,13 +56,12 @@
         secureCredentialsFactory.compareAndSet(
             secureCredentialsFactory.get(), new SecureCredentialsFactory(site));
         secureCredentialsFactoryLoadTs = getSecureConfigLastEditTs();
-        log.info("secure.config reloaded as it was updated on the file system");
+        logger.atInfo().log("secure.config reloaded as it was updated on the file system");
       }
     } catch (Exception e) {
-      log.error(
+      logger.atSevere().withCause(e).log(
           "Unexpected error while trying to reload "
-              + "secure.config: keeping existing credentials",
-          e);
+              + "secure.config: keeping existing credentials");
     }
 
     return secureCredentialsFactory.get().create(remoteName);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 0a06093..36960a1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -24,9 +24,7 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Lists;
-import com.google.gerrit.common.EventDispatcher;
 import com.google.gerrit.common.data.GroupReference;
-import com.google.gerrit.extensions.client.ProjectState;
 import com.google.gerrit.extensions.config.FactoryModule;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
@@ -42,6 +40,7 @@
 import com.google.gerrit.server.account.GroupIncludeCache;
 import com.google.gerrit.server.account.ListGroupMembership;
 import com.google.gerrit.server.config.RequestScopedReviewDbProvider;
+import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.PerThreadRequestScope;
 import com.google.gerrit.server.git.WorkQueue;
@@ -50,15 +49,18 @@
 import com.google.gerrit.server.permissions.ProjectPermission;
 import com.google.gerrit.server.permissions.RefPermission;
 import com.google.gerrit.server.project.NoSuchProjectException;
-import com.google.gerrit.server.project.PerRequestProjectControlCache;
-import com.google.gerrit.server.project.ProjectControl;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gerrit.server.project.ProjectState;
 import com.google.gerrit.server.util.RequestContext;
+import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.Provider;
 import com.google.inject.Provides;
+import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.servlet.RequestScoped;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
@@ -83,18 +85,25 @@
 
 public class Destination {
   private static final Logger repLog = ReplicationQueue.repLog;
+
+  public interface Factory {
+    Destination create(DestinationConfiguration config);
+  }
+
   private final ReplicationStateListener stateLog;
   private final Object stateLock = new Object();
   private final Map<URIish, PushOne> pending = new HashMap<>();
   private final Map<URIish, PushOne> inFlight = new HashMap<>();
   private final PushOne.Factory opFactory;
-  private final ProjectControl.Factory projectControlFactory;
   private final GitRepositoryManager gitManager;
   private final PermissionBackend permissionBackend;
+  private final Provider<CurrentUser> userProvider;
+  private final ProjectCache projectCache;
   private volatile ScheduledExecutorService pool;
   private final PerThreadRequestScope.Scoper threadScoper;
   private final DestinationConfiguration config;
   private final DynamicItem<EventDispatcher> eventDispatcher;
+  private final Provider<ReplicationTasksStorage> replicationTasksStorage;
 
   protected enum RetryReason {
     TRANSPORT_ERROR,
@@ -112,23 +121,28 @@
     }
   }
 
+  @Inject
   protected Destination(
       Injector injector,
-      DestinationConfiguration cfg,
-      RemoteSiteUser.Factory replicationUserFactory,
       PluginUser pluginUser,
       GitRepositoryManager gitRepositoryManager,
       PermissionBackend permissionBackend,
+      Provider<CurrentUser> userProvider,
+      ProjectCache projectCache,
       GroupBackend groupBackend,
-      ReplicationStateListener stateLog,
+      ReplicationStateListeners stateLog,
       GroupIncludeCache groupIncludeCache,
-      DynamicItem<EventDispatcher> eventDispatcher) {
-    config = cfg;
+      DynamicItem<EventDispatcher> eventDispatcher,
+      Provider<ReplicationTasksStorage> rts,
+      @Assisted DestinationConfiguration cfg) {
     this.eventDispatcher = eventDispatcher;
     gitManager = gitRepositoryManager;
     this.permissionBackend = permissionBackend;
+    this.userProvider = userProvider;
+    this.projectCache = projectCache;
     this.stateLog = stateLog;
-
+    this.replicationTasksStorage = rts;
+    config = cfg;
     CurrentUser remoteUser;
     if (!cfg.getAuthGroupNames().isEmpty()) {
       ImmutableSet.Builder<AccountGroup.UUID> builder = ImmutableSet.builder();
@@ -141,7 +155,7 @@
           repLog.warn("Group \"{}\" not recognized, removing from authGroup", name);
         }
       }
-      remoteUser = replicationUserFactory.create(new ListGroupMembership(builder.build()));
+      remoteUser = new RemoteSiteUser(new ListGroupMembership(builder.build()));
     } else {
       remoteUser = pluginUser;
     }
@@ -153,7 +167,6 @@
               protected void configure() {
                 bindScope(RequestScoped.class, PerThreadRequestScope.REQUEST);
                 bind(PerThreadRequestScope.Propagator.class);
-                bind(PerRequestProjectControlCache.class).in(RequestScoped.class);
 
                 bind(Destination.class).toInstance(Destination.this);
                 bind(RemoteConfig.class).toInstance(config.getRemoteConfig());
@@ -185,7 +198,6 @@
               }
             });
 
-    projectControlFactory = child.getInstance(ProjectControl.Factory.class);
     opFactory = child.getInstance(PushOne.Factory.class);
     threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class);
   }
@@ -245,15 +257,21 @@
     }
   }
 
-  private boolean shouldReplicate(ProjectControl ctl) throws PermissionBackendException {
-    if (!config.replicateHiddenProjects() && ctl.getProject().getState() == ProjectState.HIDDEN) {
+  private boolean shouldReplicate(ProjectState state, CurrentUser user)
+      throws PermissionBackendException {
+    if (!config.replicateHiddenProjects()
+        && state.getProject().getState()
+            == com.google.gerrit.extensions.client.ProjectState.HIDDEN) {
       return false;
     }
+
+    // Hidden projects(permitsRead = false) should only be accessible by the project owners.
+    // READ_CONFIG is checked here because it's only allowed to project owners(ACCESS may also
+    // be allowed for other users).
+    ProjectPermission permissionToCheck =
+        state.statePermitsRead() ? ProjectPermission.ACCESS : ProjectPermission.READ_CONFIG;
     try {
-      permissionBackend
-          .user(ctl.getUser())
-          .project(ctl.getProject().getNameKey())
-          .check(ProjectPermission.ACCESS);
+      permissionBackend.user(user).project(state.getNameKey()).check(permissionToCheck);
       return true;
     } catch (AuthException e) {
       return false;
@@ -268,8 +286,19 @@
               new Callable<Boolean>() {
                 @Override
                 public Boolean call() throws NoSuchProjectException, PermissionBackendException {
-                  ProjectControl projectControl = controlFor(project);
-                  if (!shouldReplicate(projectControl)) {
+                  ProjectState projectState;
+                  try {
+                    projectState = projectCache.checkedGet(project);
+                  } catch (IOException e) {
+                    return false;
+                  }
+                  if (projectState == null) {
+                    throw new NoSuchProjectException(project);
+                  }
+                  if (!projectState.statePermitsRead()) {
+                    return false;
+                  }
+                  if (!shouldReplicate(projectState, userProvider.get())) {
                     return false;
                   }
                   if (PushOne.ALL_REFS.equals(ref)) {
@@ -277,7 +306,7 @@
                   }
                   try {
                     permissionBackend
-                        .user(projectControl.getUser())
+                        .user(userProvider.get())
                         .project(project)
                         .ref(ref)
                         .check(RefPermission.READ);
@@ -304,7 +333,16 @@
               new Callable<Boolean>() {
                 @Override
                 public Boolean call() throws NoSuchProjectException, PermissionBackendException {
-                  return shouldReplicate(controlFor(project));
+                  ProjectState projectState;
+                  try {
+                    projectState = projectCache.checkedGet(project);
+                  } catch (IOException e) {
+                    return false;
+                  }
+                  if (projectState == null) {
+                    throw new NoSuchProjectException(project);
+                  }
+                  return shouldReplicate(projectState, userProvider.get());
                 }
               })
           .call();
@@ -354,21 +392,21 @@
     }
 
     synchronized (stateLock) {
-      PushOne e = getPendingPush(uri);
-      if (e == null) {
-        e = opFactory.create(project, uri);
-        addRef(e, ref);
-        e.addState(ref, state);
+      PushOne task = getPendingPush(uri);
+      if (task == null) {
+        task = opFactory.create(project, uri);
+        addRef(task, ref);
+        task.addState(ref, state);
         @SuppressWarnings("unused")
         ScheduledFuture<?> ignored =
-            pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
-        pending.put(uri, e);
-      } else if (!e.getRefs().contains(ref)) {
-        addRef(e, ref);
-        e.addState(ref, state);
+            pool.schedule(task, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
+        pending.put(uri, task);
+      } else if (!task.getRefs().contains(ref)) {
+        addRef(task, ref);
+        task.addState(ref, state);
       }
       state.increasePushTaskCount(project.get(), ref);
-      repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, e, config.getDelay());
+      repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, task, config.getDelay());
     }
   }
 
@@ -492,10 +530,6 @@
     }
   }
 
-  ProjectControl controlFor(Project.NameKey project) throws NoSuchProjectException {
-    return projectControlFactory.controlFor(project);
-  }
-
   RunwayStatus requestRunway(PushOne op) {
     synchronized (stateLock) {
       if (op.wasCanceled()) {
@@ -511,12 +545,31 @@
     return RunwayStatus.allowed();
   }
 
-  void notifyFinished(PushOne op) {
+  void notifyFinished(PushOne task) {
     synchronized (stateLock) {
-      inFlight.remove(op.getURI());
+      inFlight.remove(task.getURI());
+      if (!task.wasCanceled()) {
+        for (String ref : task.getRefs()) {
+          if (!refHasPendingPush(task.getURI(), ref)) {
+            replicationTasksStorage
+                .get()
+                .delete(
+                    new ReplicateRefUpdate(
+                        task.getProjectNameKey().get(), ref, task.getURI(), getRemoteConfigName()));
+          }
+        }
+      }
     }
   }
 
+  private boolean refHasPendingPush(URIish opUri, String ref) {
+    return pushContainsRef(pending.get(opUri), ref) || pushContainsRef(inFlight.get(opUri), ref);
+  }
+
+  private boolean pushContainsRef(PushOne op, String ref) {
+    return op != null && op.getRefs().contains(ref);
+  }
+
   boolean wouldPushProject(Project.NameKey project) {
     if (!shouldReplicate(project)) {
       return false;
@@ -649,6 +702,14 @@
     return config.getMaxRetries();
   }
 
+  public int getDrainQueueAttempts() {
+    return config.getDrainQueueAttempts();
+  }
+
+  public int getReplicationDelaySeconds() {
+    return config.getDelay() * 1000;
+  }
+
   private static boolean matches(URIish uri, String urlMatch) {
     if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
       return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index b2d0de2..f688cfc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -22,10 +22,12 @@
 public class DestinationConfiguration {
   static final int DEFAULT_REPLICATION_DELAY = 15;
   static final int DEFAULT_RESCHEDULE_DELAY = 3;
+  static final int DEFAULT_DRAIN_QUEUE_ATTEMPTS = 0;
 
   private final int delay;
   private final int rescheduleDelay;
   private final int retryDelay;
+  private final int drainQueueAttempts;
   private final int lockErrorMaxRetries;
   private final ImmutableList<String> adminUrls;
   private final int poolThreads;
@@ -50,6 +52,8 @@
     projects = ImmutableList.copyOf(cfg.getStringList("remote", name, "projects"));
     adminUrls = ImmutableList.copyOf(cfg.getStringList("remote", name, "adminUrl"));
     retryDelay = Math.max(0, getInt(remoteConfig, cfg, "replicationretry", 1));
+    drainQueueAttempts =
+        Math.max(0, getInt(remoteConfig, cfg, "drainQueueAttempts", DEFAULT_DRAIN_QUEUE_ATTEMPTS));
     poolThreads = Math.max(0, getInt(remoteConfig, cfg, "threads", 1));
     authGroupNames = ImmutableList.copyOf(cfg.getStringList("remote", name, "authGroup"));
     lockErrorMaxRetries = cfg.getInt("replication", "lockErrorMaxRetries", 0);
@@ -77,6 +81,10 @@
     return retryDelay;
   }
 
+  public int getDrainQueueAttempts() {
+    return drainQueueAttempts;
+  }
+
   public int getPoolThreads() {
     return poolThreads;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java
deleted file mode 100644
index 83eab86..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import com.google.gerrit.common.EventDispatcher;
-import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.server.PluginUser;
-import com.google.gerrit.server.account.GroupBackend;
-import com.google.gerrit.server.account.GroupIncludeCache;
-import com.google.gerrit.server.git.GitRepositoryManager;
-import com.google.gerrit.server.permissions.PermissionBackend;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Singleton;
-
-@Singleton
-public class DestinationFactory {
-  private final Injector injector;
-  private final RemoteSiteUser.Factory replicationUserFactory;
-  private final PluginUser pluginUser;
-  private final GitRepositoryManager gitRepositoryManager;
-  private final PermissionBackend permissionBackend;
-  private final GroupBackend groupBackend;
-  private final ReplicationStateListener stateLog;
-  private final GroupIncludeCache groupIncludeCache;
-  private final DynamicItem<EventDispatcher> eventDispatcher;
-
-  @Inject
-  public DestinationFactory(
-      Injector injector,
-      RemoteSiteUser.Factory replicationUserFactory,
-      PluginUser pluginUser,
-      GitRepositoryManager gitRepositoryManager,
-      PermissionBackend permissionBackend,
-      GroupBackend groupBackend,
-      ReplicationStateListener stateLog,
-      GroupIncludeCache groupIncludeCache,
-      DynamicItem<EventDispatcher> eventDispatcher) {
-    this.injector = injector;
-    this.replicationUserFactory = replicationUserFactory;
-    this.pluginUser = pluginUser;
-    this.gitRepositoryManager = gitRepositoryManager;
-    this.permissionBackend = permissionBackend;
-    this.groupBackend = groupBackend;
-    this.stateLog = stateLog;
-    this.groupIncludeCache = groupIncludeCache;
-    this.eventDispatcher = eventDispatcher;
-  }
-
-  Destination create(DestinationConfiguration config) {
-    return new Destination(
-        injector,
-        config,
-        replicationUserFactory,
-        pluginUser,
-        gitRepositoryManager,
-        permissionBackend,
-        groupBackend,
-        stateLog,
-        groupIncludeCache,
-        eventDispatcher);
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java
deleted file mode 100644
index c567876..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java
+++ /dev/null
@@ -1,110 +0,0 @@
-// Copyright (C) 2018 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.common.hash.Hashing;
-import com.google.gson.Gson;
-import com.google.inject.Inject;
-import com.google.inject.ProvisionException;
-import com.google.inject.Singleton;
-import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-import org.eclipse.jgit.lib.ObjectId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Singleton
-public class EventsStorage {
-  private static final Logger log = LoggerFactory.getLogger(EventsStorage.class);
-
-  public static class ReplicateRefUpdate {
-    public String project;
-    public String ref;
-  }
-
-  private static Gson GSON = new Gson();
-
-  private final Path refUpdates;
-
-  @Inject
-  EventsStorage(ReplicationConfig config) {
-    refUpdates = config.getEventsDirectory().resolve("ref-updates");
-  }
-
-  public String persist(String project, String ref) {
-    ReplicateRefUpdate r = new ReplicateRefUpdate();
-    r.project = project;
-    r.ref = ref;
-
-    String json = GSON.toJson(r) + "\n";
-    String eventKey = sha1(json).name();
-    Path file = refUpdates().resolve(eventKey);
-
-    if (Files.exists(file)) {
-      return eventKey;
-    }
-
-    try {
-      Files.write(file, json.getBytes(UTF_8));
-    } catch (IOException e) {
-      log.warn("Couldn't persist event {}", json);
-    }
-    return eventKey;
-  }
-
-  public void delete(String eventKey) {
-    if (eventKey != null) {
-      try {
-        Files.delete(refUpdates().resolve(eventKey));
-      } catch (IOException e) {
-        log.error("Error while deleting event {}", eventKey, e);
-      }
-    }
-  }
-
-  public List<ReplicateRefUpdate> list() {
-    ArrayList<ReplicateRefUpdate> result = new ArrayList<>();
-    try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) {
-      for (Path e : events) {
-        if (Files.isRegularFile(e)) {
-          String json = new String(Files.readAllBytes(e), UTF_8);
-          result.add(GSON.fromJson(json, ReplicateRefUpdate.class));
-          Files.delete(e);
-        }
-      }
-    } catch (IOException e) {
-      log.error("Error when firing pending events", e);
-    }
-    return result;
-  }
-
-  private ObjectId sha1(String s) {
-    return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
-  }
-
-  private Path refUpdates() {
-    try {
-      return Files.createDirectories(refUpdates);
-    } catch (IOException e) {
-      throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e);
-    }
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
index b46a0d9..ccd8bf4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
@@ -14,33 +14,34 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.ssh.SshAddressesModule;
-import com.google.inject.Inject;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URISyntaxException;
 import java.util.HashSet;
 import java.util.Set;
 import org.eclipse.jgit.transport.URIish;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class GerritSshApi {
-  static int SSH_COMMAND_FAILED = -1;
-  private static final Logger log = LoggerFactory.getLogger(GerritSshApi.class);
-  private static String GERRIT_ADMIN_PROTOCOL_PREFIX = "gerrit+";
+public class GerritSshApi implements AdminApi {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
-  private final SshHelper sshHelper;
+  static final int SSH_COMMAND_FAILED = -1;
+  private static final String GERRIT_ADMIN_PROTOCOL_PREFIX = "gerrit+";
+
+  protected final SshHelper sshHelper;
+  protected final URIish uri;
 
   private final Set<URIish> withoutDeleteProjectPlugin = new HashSet<>();
 
-  @Inject
-  protected GerritSshApi(SshHelper sshHelper) {
+  protected GerritSshApi(SshHelper sshHelper, URIish uri) {
     this.sshHelper = sshHelper;
+    this.uri = uri;
   }
 
-  protected boolean createProject(URIish uri, Project.NameKey projectName, String head) {
+  @Override
+  public boolean createProject(Project.NameKey projectName, String head) {
     OutputStream errStream = sshHelper.newErrorBufferStream();
     String cmd = "gerrit create-project --branch " + head + " " + projectName.get();
     try {
@@ -52,7 +53,8 @@
     return true;
   }
 
-  protected boolean deleteProject(URIish uri, Project.NameKey projectName) {
+  @Override
+  public boolean deleteProject(Project.NameKey projectName) {
     if (!withoutDeleteProjectPlugin.contains(uri)) {
       OutputStream errStream = sshHelper.newErrorBufferStream();
       String cmd = "deleteproject delete --yes-really-delete --force " + projectName.get();
@@ -64,30 +66,23 @@
         return false;
       }
       if (exitCode == 1) {
-        log.info(
-            "DeleteProject plugin is not installed on {}; will not try to forward this operation to that host");
+        logger.atInfo().log(
+            "DeleteProject plugin is not installed on %s;"
+                + " will not try to forward this operation to that host");
         withoutDeleteProjectPlugin.add(uri);
-        return true;
       }
     }
     return true;
   }
 
-  protected boolean updateHead(URIish uri, Project.NameKey projectName, String newHead) {
+  @Override
+  public boolean updateHead(Project.NameKey projectName, String newHead) {
     OutputStream errStream = sshHelper.newErrorBufferStream();
     String cmd = "gerrit set-head " + projectName.get() + " --new-head " + newHead;
     try {
       execute(uri, cmd, errStream);
     } catch (IOException e) {
-      log.error(
-          "Error updating HEAD of remote repository at {} to {}:\n"
-              + "  Exception: {}\n  Command: {}\n  Output: {}",
-          uri,
-          newHead,
-          e,
-          cmd,
-          errStream,
-          e);
+      logError("updating HEAD of", uri, errStream, cmd, e);
       return false;
     }
     return true;
@@ -114,19 +109,14 @@
       URIish sshUri = toSshUri(uri);
       return sshHelper.executeRemoteSsh(sshUri, cmd, errStream);
     } catch (URISyntaxException e) {
-      log.error("Cannot convert {} to SSH uri", uri, e);
+      logger.atSevere().withCause(e).log("Cannot convert %s to SSH uri", uri);
     }
     return SSH_COMMAND_FAILED;
   }
 
   public void logError(String msg, URIish uri, OutputStream errStream, String cmd, IOException e) {
-    log.error(
-        "Error {} remote repository at {}:\n  Exception: {}\n  Command: {}\n  Output: {}",
-        msg,
-        uri,
-        e,
-        cmd,
-        errStream,
-        e);
+    logger.atSevere().withCause(e).log(
+        "Error %s remote repository at %s:\n  Exception: %s\n  Command: %s\n  Output: %s",
+        msg, uri, e, cmd, errStream);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
index fa17dce..07978ab 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
@@ -72,7 +72,7 @@
   }
 
   private void addQueueDetails(JsonObject obj, String key, Collection<PushOne> values) {
-    if (values.size() > 0) {
+    if (!values.isEmpty()) {
       JsonArray list = new JsonArray();
       for (PushOne p : values) {
         list.add(new JsonPrimitive(p.toString()));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
new file mode 100644
index 0000000..aa6e16c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -0,0 +1,97 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
+import com.google.gerrit.reviewdb.client.Project;
+import java.io.File;
+import java.io.IOException;
+import org.eclipse.jgit.internal.storage.file.FileRepository;
+import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.URIish;
+
+public class LocalFS implements AdminApi {
+
+  private final URIish uri;
+
+  public LocalFS(URIish uri) {
+    this.uri = uri;
+  }
+
+  @Override
+  public boolean createProject(Project.NameKey project, String head) {
+    try (Repository repo = new FileRepository(uri.getPath())) {
+      repo.create(true /* bare */);
+
+      if (head != null && head.startsWith(Constants.R_REFS)) {
+        RefUpdate u = repo.updateRef(Constants.HEAD);
+        u.disableRefLog();
+        u.link(head);
+      }
+      repLog.info("Created local repository: {}", uri);
+    } catch (IOException e) {
+      repLog.error("Error creating local repository {}", uri.getPath(), e);
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public boolean deleteProject(Project.NameKey project) {
+    try {
+      recursivelyDelete(new File(uri.getPath()));
+      repLog.info("Deleted local repository: {}", uri);
+    } catch (IOException e) {
+      repLog.error("Error deleting local repository {}:\n", uri.getPath(), e);
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public boolean updateHead(Project.NameKey project, String newHead) {
+    try (Repository repo = new FileRepository(uri.getPath())) {
+      if (newHead != null) {
+        RefUpdate u = repo.updateRef(Constants.HEAD);
+        u.link(newHead);
+      }
+    } catch (IOException e) {
+      repLog.error("Failed to update HEAD of repository {} to {}", uri.getPath(), newHead, e);
+      return false;
+    }
+    return true;
+  }
+
+  private static void recursivelyDelete(File dir) throws IOException {
+    File[] contents = dir.listFiles();
+    if (contents != null) {
+      for (File d : contents) {
+        if (d.isDirectory()) {
+          recursivelyDelete(d);
+        } else {
+          if (!d.delete()) {
+            throw new IOException("Failed to delete: " + d.getAbsolutePath());
+          }
+        }
+      }
+    }
+    if (!dir.delete()) {
+      throw new IOException("Failed to delete: " + dir.getAbsolutePath());
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
index 2d60d86..8b0aa3d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
@@ -15,13 +15,12 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.common.util.concurrent.Atomics;
-import com.google.gerrit.common.EventDispatcher;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.events.EventDispatcher;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
-import com.googlesource.gerrit.plugins.replication.ReplicationState.Factory;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -30,36 +29,27 @@
   private final AtomicReference<Future<?>> pushAllFuture;
   private final ServerInformation srvInfo;
   private final PushAll.Factory pushAll;
-  private final ReplicationQueue queue;
   private final ReplicationConfig config;
   private final DynamicItem<EventDispatcher> eventDispatcher;
-  private final Factory replicationStateFactory;
 
   @Inject
   protected OnStartStop(
       ServerInformation srvInfo,
       PushAll.Factory pushAll,
-      ReplicationQueue queue,
       ReplicationConfig config,
-      DynamicItem<EventDispatcher> eventDispatcher,
-      ReplicationState.Factory replicationStateFactory) {
+      DynamicItem<EventDispatcher> eventDispatcher) {
     this.srvInfo = srvInfo;
     this.pushAll = pushAll;
-    this.queue = queue;
     this.config = config;
     this.eventDispatcher = eventDispatcher;
-    this.replicationStateFactory = replicationStateFactory;
     this.pushAllFuture = Atomics.newReference();
   }
 
   @Override
   public void start() {
-    queue.start();
-
     if (srvInfo.getState() == ServerInformation.State.STARTUP
         && config.isReplicateAllOnPluginStart()) {
-      ReplicationState state =
-          replicationStateFactory.create(new GitUpdateProcessing(eventDispatcher.get()));
+      ReplicationState state = new ReplicationState(new GitUpdateProcessing(eventDispatcher.get()));
       pushAllFuture.set(
           pushAll
               .create(null, ReplicationFilter.all(), state, false)
@@ -73,6 +63,5 @@
     if (f != null) {
       f.cancel(true);
     }
-    queue.stop();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
index db067e2..833b02b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
@@ -43,7 +43,7 @@
       WorkQueue wq,
       ProjectCache projectCache,
       ReplicationQueue rq,
-      ReplicationStateListener stateLog,
+      ReplicationStateListeners stateLog,
       @Assisted @Nullable String urlMatch,
       @Assisted ReplicationFilter filter,
       @Assisted ReplicationState state,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 5f0c066..5794f6e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -22,20 +22,23 @@
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Sets;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.extensions.restapi.ResourceConflictException;
 import com.google.gerrit.metrics.Timer1;
 import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.reviewdb.client.RefNames;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.PerThreadRequestScope;
 import com.google.gerrit.server.git.ProjectRunnable;
-import com.google.gerrit.server.git.VisibleRefFilter;
 import com.google.gerrit.server.git.WorkQueue.CanceledWhileRunning;
+import com.google.gerrit.server.ioutil.HexFormat;
 import com.google.gerrit.server.permissions.PermissionBackend;
+import com.google.gerrit.server.permissions.PermissionBackend.RefFilterOptions;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gerrit.server.permissions.ProjectPermission;
-import com.google.gerrit.server.project.NoSuchProjectException;
-import com.google.gerrit.server.project.ProjectControl;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gerrit.server.project.ProjectState;
 import com.google.gerrit.server.util.IdGenerator;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
@@ -92,7 +95,6 @@
   private final RemoteConfig config;
   private final CredentialsProvider credentialsProvider;
   private final PerThreadRequestScope.Scoper threadScoper;
-  private final VisibleRefFilter.Factory refFilterFactory;
   private final ReplicationQueue replicationQueue;
 
   private final Project.NameKey projectName;
@@ -110,7 +112,10 @@
   private final int id;
   private final long createdAt;
   private final ReplicationMetrics metrics;
+  private final ProjectCache projectCache;
   private final AtomicBoolean canceledWhileRunning;
+  private final TransportFactory transportFactory;
+  private DynamicItem<ReplicationPushFilter> replicationPushFilter;
 
   @Inject
   PushOne(
@@ -118,20 +123,20 @@
       PermissionBackend permissionBackend,
       Destination p,
       RemoteConfig c,
-      VisibleRefFilter.Factory rff,
       CredentialsFactory cpFactory,
       PerThreadRequestScope.Scoper ts,
       ReplicationQueue rq,
       IdGenerator ig,
-      ReplicationStateListener sl,
+      ReplicationStateListeners sl,
       ReplicationMetrics m,
+      ProjectCache pc,
+      TransportFactory tf,
       @Assisted Project.NameKey d,
       @Assisted URIish u) {
     gitManager = grm;
     this.permissionBackend = permissionBackend;
     pool = p;
     config = c;
-    refFilterFactory = rff;
     credentialsProvider = cpFactory.create(c.getName());
     threadScoper = ts;
     replicationQueue = rq;
@@ -143,13 +148,20 @@
     stateLog = sl;
     createdAt = System.nanoTime();
     metrics = m;
+    projectCache = pc;
     canceledWhileRunning = new AtomicBoolean(false);
     maxRetries = p.getMaxRetries();
+    transportFactory = tf;
+  }
+
+  @Inject(optional = true)
+  public void setReplicationPushFilter(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
+    this.replicationPushFilter = replicationPushFilter;
   }
 
   @Override
   public void cancel() {
-    repLog.info("Replication [{}] to {} was canceled", IdGenerator.format(id), getURI());
+    repLog.info("Replication [{}] to {} was canceled", HexFormat.fromInt(id), getURI());
     canceledByReplication();
     pool.pushWasCanceled(this);
   }
@@ -158,7 +170,7 @@
   public void setCanceledWhileRunning() {
     repLog.info(
         "Replication [{}] to {} was canceled while being executed",
-        IdGenerator.format(id),
+        HexFormat.fromInt(id),
         getURI());
     canceledWhileRunning.set(true);
   }
@@ -180,7 +192,7 @@
 
   @Override
   public String toString() {
-    String print = "[" + IdGenerator.format(id) + "] push " + uri;
+    String print = "[" + HexFormat.fromInt(id) + "] push " + uri;
 
     if (retryCount > 0) {
       print = "(retry " + retryCount + ") " + print;
@@ -301,7 +313,7 @@
     // we start replication (instead a new instance, with the same URI, is
     // created and scheduled for a future point in time.)
     //
-    MDC.put(ID_MDC_KEY, IdGenerator.format(id));
+    MDC.put(ID_MDC_KEY, HexFormat.fromInt(id));
     RunwayStatus status = pool.requestRunway(this);
     if (!status.isAllowed()) {
       if (status.isCanceled()) {
@@ -310,7 +322,7 @@
         repLog.info(
             "Rescheduling replication to {} to avoid collision with the in-flight push [{}].",
             uri,
-            IdGenerator.format(status.getInFlightPushId()));
+            HexFormat.fromInt(status.getInFlightPushId()));
         pool.reschedule(this, Destination.RetryReason.COLLISION);
       }
       return;
@@ -407,15 +419,12 @@
     if (pool.isCreateMissingRepos()) {
       try {
         Ref head = git.exactRef(Constants.HEAD);
-        if (replicationQueue.createProject(projectName, head != null ? getName(head) : null)) {
+        if (replicationQueue.createProject(
+            config.getName(), projectName, head != null ? getName(head) : null)) {
           repLog.warn("Missing repository created; retry replication to {}", uri);
           pool.reschedule(this, Destination.RetryReason.REPOSITORY_MISSING);
         } else {
-          repLog.warn(
-              "Missing repository could not be created when replicating {}. "
-                  + "You can only create missing repositories locally, over SSH or when "
-                  + "using adminUrl in replication.config. See documentation for more information.",
-              uri);
+          repLog.warn("Missing repository could not be created when replicating {}", uri);
         }
       } catch (IOException ioe) {
         stateLog.error(
@@ -438,7 +447,7 @@
 
   private void runImpl() throws IOException, PermissionBackendException {
     PushResult res;
-    try (Transport tn = Transport.open(git, uri)) {
+    try (Transport tn = transportFactory.open(git, uri)) {
       res = pushVia(tn);
     }
     updateStates(res.getRemoteUpdates());
@@ -465,19 +474,19 @@
 
   private List<RemoteRefUpdate> generateUpdates(Transport tn)
       throws IOException, PermissionBackendException {
-    ProjectControl pc;
-    try {
-      pc = pool.controlFor(projectName);
-    } catch (NoSuchProjectException e) {
+    ProjectState projectState = projectCache.checkedGet(projectName);
+    if (projectState == null) {
       return Collections.emptyList();
     }
 
     Map<String, Ref> local = git.getAllRefs();
     boolean filter;
+    PermissionBackend.ForProject forProject = permissionBackend.currentUser().project(projectName);
     try {
-      permissionBackend.user(pc.getUser()).project(projectName).check(ProjectPermission.READ);
+      projectState.checkStatePermitsRead();
+      forProject.check(ProjectPermission.READ);
       filter = false;
-    } catch (AuthException e) {
+    } catch (AuthException | ResourceConflictException e) {
       filter = true;
     }
     if (filter) {
@@ -494,10 +503,15 @@
         }
         local = n;
       }
-      local = refFilterFactory.create(pc.getProjectState(), git).filter(local, true);
+      local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build());
     }
 
-    return pushAllRefs ? doPushAll(tn, local) : doPushDelta(local);
+    List<RemoteRefUpdate> remoteUpdatesList =
+        pushAllRefs ? doPushAll(tn, local) : doPushDelta(local);
+
+    return replicationPushFilter == null || replicationPushFilter.get() == null
+        ? remoteUpdatesList
+        : replicationPushFilter.get().filter(projectName.get(), remoteUpdatesList);
   }
 
   private List<RemoteRefUpdate> doPushAll(Transport tn, Map<String, Ref> local)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
index c71a792..ae0662d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -14,7 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.common.EventDispatcher;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.events.RefEvent;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gwtorm.server.OrmException;
@@ -23,8 +24,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.URIish;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public abstract class PushResultProcessing {
 
@@ -159,7 +158,7 @@
   }
 
   public static class GitUpdateProcessing extends PushResultProcessing {
-    private static final Logger log = LoggerFactory.getLogger(GitUpdateProcessing.class);
+    private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
     private final EventDispatcher dispatcher;
 
@@ -189,7 +188,7 @@
       try {
         dispatcher.postEvent(event);
       } catch (OrmException | PermissionBackendException e) {
-        log.error("Cannot post event", e);
+        logger.atSevere().withCause(e).log("Cannot post event");
       }
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java
index 91fce7f..c3556af 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java
@@ -16,18 +16,11 @@
 
 import com.google.gerrit.server.CurrentUser;
 import com.google.gerrit.server.account.GroupMembership;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
 
 public class RemoteSiteUser extends CurrentUser {
-  public interface Factory {
-    RemoteSiteUser create(@Assisted GroupMembership authGroups);
-  }
-
   private final GroupMembership effectiveGroups;
 
-  @Inject
-  RemoteSiteUser(@Assisted GroupMembership authGroups) {
+  public RemoteSiteUser(GroupMembership authGroups) {
     effectiveGroups = authGroups;
   }
 
@@ -35,4 +28,10 @@
   public GroupMembership getEffectiveGroups() {
     return effectiveGroups;
   }
+
+  @Override
+  public Object getCacheKey() {
+    // Never cache a remote user
+    return new Object();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
new file mode 100644
index 0000000..ee9d4c0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -0,0 +1,110 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
+import com.google.gerrit.reviewdb.client.Project;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.eclipse.jgit.transport.URIish;
+import org.eclipse.jgit.util.QuotedString;
+
+public class RemoteSsh implements AdminApi {
+
+  private final SshHelper sshHelper;
+  private URIish uri;
+
+  RemoteSsh(SshHelper sshHelper, URIish uri) {
+    this.sshHelper = sshHelper;
+    this.uri = uri;
+  }
+
+  @Override
+  public boolean createProject(Project.NameKey project, String head) {
+    String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
+    String cmd = "mkdir -p " + quotedPath + " && cd " + quotedPath + " && git init --bare";
+    if (head != null) {
+      cmd = cmd + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(head);
+    }
+    OutputStream errStream = sshHelper.newErrorBufferStream();
+    try {
+      sshHelper.executeRemoteSsh(uri, cmd, errStream);
+      repLog.info("Created remote repository: {}", uri);
+    } catch (IOException e) {
+      repLog.error(
+          "Error creating remote repository at {}:\n"
+              + "  Exception: {}\n"
+              + "  Command: {}\n"
+              + "  Output: {}",
+          uri,
+          e,
+          cmd,
+          errStream,
+          e);
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public boolean deleteProject(Project.NameKey project) {
+    String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
+    String cmd = "rm -rf " + quotedPath;
+    OutputStream errStream = sshHelper.newErrorBufferStream();
+    try {
+      sshHelper.executeRemoteSsh(uri, cmd, errStream);
+      repLog.info("Deleted remote repository: {}", uri);
+    } catch (IOException e) {
+      repLog.error(
+          "Error deleting remote repository at {}:\n"
+              + "  Exception: {}\n"
+              + "  Command: {}\n"
+              + "  Output: {}",
+          uri,
+          e,
+          cmd,
+          errStream,
+          e);
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public boolean updateHead(Project.NameKey project, String newHead) {
+    String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
+    String cmd =
+        "cd " + quotedPath + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(newHead);
+    OutputStream errStream = sshHelper.newErrorBufferStream();
+    try {
+      sshHelper.executeRemoteSsh(uri, cmd, errStream);
+    } catch (IOException e) {
+      repLog.error(
+          "Error updating HEAD of remote repository at {} to {}:\n"
+              + "  Exception: {}\n"
+              + "  Command: {}\n"
+              + "  Output: {}",
+          uri,
+          newHead,
+          e,
+          cmd,
+          errStream,
+          e);
+      return false;
+    }
+    return true;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationExtensionPointModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationExtensionPointModule.java
new file mode 100644
index 0000000..b92a54a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationExtensionPointModule.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.inject.AbstractModule;
+
+/**
+ * Gerrit libModule for applying a ref-filter for outgoing replications.
+ *
+ * <p>It should be used only when an actual filter is defined, otherwise the default replication
+ * plugin behaviour will be pushing all refs without any filtering.
+ */
+public class ReplicationExtensionPointModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    DynamicItem.itemOf(binder(), ReplicationPushFilter.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index d68505c..683fb9b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -20,6 +20,7 @@
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.server.config.ConfigUtil;
 import com.google.gerrit.server.config.SitePaths;
@@ -40,12 +41,10 @@
 import org.eclipse.jgit.transport.RemoteConfig;
 import org.eclipse.jgit.transport.URIish;
 import org.eclipse.jgit.util.FS;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Singleton
 public class ReplicationFileBasedConfig implements ReplicationConfig {
-  private static final Logger log = LoggerFactory.getLogger(ReplicationFileBasedConfig.class);
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final int DEFAULT_SSH_CONNECTION_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes
 
   private List<Destination> destinations;
@@ -60,7 +59,7 @@
 
   @Inject
   public ReplicationFileBasedConfig(
-      SitePaths site, DestinationFactory destinationFactory, @PluginData Path pluginDataDir)
+      SitePaths site, Destination.Factory destinationFactory, @PluginData Path pluginDataDir)
       throws ConfigInvalidException, IOException {
     this.site = site;
     this.cfgPath = site.etc_dir.resolve("replication.config");
@@ -93,14 +92,14 @@
     return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList());
   }
 
-  private List<Destination> allDestinations(DestinationFactory destinationFactory)
+  private List<Destination> allDestinations(Destination.Factory destinationFactory)
       throws ConfigInvalidException, IOException {
     if (!config.getFile().exists()) {
-      log.warn("Config file {} does not exist; not replicating", config.getFile());
+      logger.atWarning().log("Config file %s does not exist; not replicating", config.getFile());
       return Collections.emptyList();
     }
     if (config.getFile().length() == 0) {
-      log.info("Config file {} is empty; not replicating", config.getFile());
+      logger.atInfo().log("Config file %s is empty; not replicating", config.getFile());
       return Collections.emptyList();
     }
 
@@ -192,7 +191,7 @@
         result.add(new RemoteConfig(cfg, name));
       } catch (URISyntaxException e) {
         throw new ConfigInvalidException(
-            String.format("remote %s has invalid URL in %s", name, cfg.getFile()));
+            String.format("remote %s has invalid URL in %s", name, cfg.getFile()), e);
       }
     }
     return result;
@@ -223,11 +222,54 @@
   public int shutdown() {
     int discarded = 0;
     for (Destination cfg : destinations) {
-      discarded += cfg.shutdown();
+      try {
+        drainReplicationEvents(cfg);
+      } catch (EventQueueNotEmptyException e) {
+        logger.atWarning().log("Event queue not empty: %s", e.getMessage());
+      } finally {
+        discarded += cfg.shutdown();
+      }
     }
     return discarded;
   }
 
+  void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
+    int drainQueueAttempts = destination.getDrainQueueAttempts();
+    if (drainQueueAttempts == 0) {
+      return;
+    }
+    int pending = destination.getQueueInfo().pending.size();
+    int inFlight = destination.getQueueInfo().inFlight.size();
+
+    while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
+      try {
+        logger.atInfo().log(
+            "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
+            inFlight, pending);
+        Thread.sleep(destination.getReplicationDelaySeconds());
+      } catch (InterruptedException ie) {
+        logger.atWarning().withCause(ie).log(
+            "Wait for replication events to drain has been interrupted");
+      }
+      pending = destination.getQueueInfo().pending.size();
+      inFlight = destination.getQueueInfo().inFlight.size();
+      drainQueueAttempts--;
+    }
+
+    if (pending > 0 || inFlight > 0) {
+      throw new EventQueueNotEmptyException(
+          String.format("Pending: %d - InFlight: %d", pending, inFlight));
+    }
+  }
+
+  public static class EventQueueNotEmptyException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public EventQueueNotEmptyException(String errorMessage) {
+      super(errorMessage);
+    }
+  }
+
   FileBasedConfig getConfig() {
     return config;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
index 7b3486b..05bbb03 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.gerrit.common.data.AccessSection;
-import com.google.gerrit.reviewdb.client.Project.NameKey;
+import com.google.gerrit.reviewdb.client.Project;
 import java.util.Collections;
 import java.util.List;
 
@@ -46,7 +46,7 @@
     projectPatterns = patterns;
   }
 
-  public boolean matches(NameKey name) {
+  public boolean matches(Project.NameKey name) {
     if (projectPatterns.isEmpty()) {
       return true;
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index 5e7c978..5fdb375 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -21,8 +21,8 @@
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.extensions.events.NewProjectCreatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventTypes;
 import com.google.inject.AbstractModule;
@@ -34,11 +34,13 @@
 class ReplicationModule extends AbstractModule {
   @Override
   protected void configure() {
-    bind(DestinationFactory.class).in(Scopes.SINGLETON);
+    install(new FactoryModuleBuilder().build(Destination.Factory.class));
     bind(ReplicationQueue.class).in(Scopes.SINGLETON);
+    bind(LifecycleListener.class)
+        .annotatedWith(UniqueAnnotations.create())
+        .to(ReplicationQueue.class);
 
     DynamicSet.bind(binder(), GitReferenceUpdatedListener.class).to(ReplicationQueue.class);
-    DynamicSet.bind(binder(), NewProjectCreatedListener.class).to(ReplicationQueue.class);
     DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ReplicationQueue.class);
     DynamicSet.bind(binder(), HeadUpdatedListener.class).to(ReplicationQueue.class);
 
@@ -55,15 +57,20 @@
         .to(StartReplicationCapability.class);
 
     install(new FactoryModuleBuilder().build(PushAll.Factory.class));
-    install(new FactoryModuleBuilder().build(RemoteSiteUser.Factory.class));
-    install(new FactoryModuleBuilder().build(ReplicationState.Factory.class));
 
     bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
-    bind(ReplicationStateListener.class).to(ReplicationStateLogger.class);
+    DynamicSet.setOf(binder(), ReplicationStateListener.class);
+    DynamicSet.bind(binder(), ReplicationStateListener.class).to(ReplicationStateLogger.class);
 
     EventTypes.register(RefReplicatedEvent.TYPE, RefReplicatedEvent.class);
     EventTypes.register(RefReplicationDoneEvent.TYPE, RefReplicationDoneEvent.class);
     EventTypes.register(ReplicationScheduledEvent.TYPE, ReplicationScheduledEvent.class);
     bind(SshSessionFactory.class).toProvider(ReplicationSshSessionFactoryProvider.class);
+
+    DynamicItem.itemOf(binder(), AdminApiFactory.class);
+    DynamicItem.bind(binder(), AdminApiFactory.class)
+        .to(AdminApiFactory.DefaultAdminApiFactory.class);
+
+    bind(TransportFactory.class).to(TransportFactoryImpl.class).in(Scopes.SINGLETON);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationPushFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationPushFilter.java
new file mode 100644
index 0000000..eb6ba90
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationPushFilter.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.extensions.annotations.ExtensionPoint;
+import java.util.List;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+
+/**
+ * Filter that is invoked before list of remote ref updates is pushed to remote instance.
+ *
+ * <p>It can be used to filter out unwanted updates.
+ */
+@ExtensionPoint
+public interface ReplicationPushFilter {
+
+  public List<RemoteRefUpdate> filter(String projectName, List<RemoteRefUpdate> remoteUpdatesList);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index da5a4d2..d73ab7b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,34 +14,33 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
 import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
-import com.google.gerrit.common.EventDispatcher;
+import com.google.gerrit.common.Nullable;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.extensions.events.NewProjectCreatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
-import org.eclipse.jgit.internal.storage.file.FileRepository;
-import org.eclipse.jgit.lib.Constants;
-import org.eclipse.jgit.lib.RefUpdate;
-import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.transport.URIish;
-import org.eclipse.jgit.util.QuotedString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +48,6 @@
 public class ReplicationQueue
     implements LifecycleListener,
         GitReferenceUpdatedListener,
-        NewProjectCreatedListener,
         ProjectDeletedListener,
         HeadUpdatedListener {
   static final String REPLICATION_LOG_NAME = "replication_log";
@@ -70,42 +68,39 @@
   }
 
   private final WorkQueue workQueue;
-  private final SshHelper sshHelper;
   private final DynamicItem<EventDispatcher> dispatcher;
   private final ReplicationConfig config;
-  private final GerritSshApi gerritAdmin;
-  private final ReplicationState.Factory replicationStateFactory;
-  private final EventsStorage eventsStorage;
+  private final DynamicItem<AdminApiFactory> adminApiFactory;
+  private final ReplicationTasksStorage replicationTasksStorage;
   private volatile boolean running;
-  private final Queue<GitReferenceUpdatedListener.Event> beforeStartupEventsQueue;
+  private volatile boolean replaying;
+  private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
 
   @Inject
   ReplicationQueue(
       WorkQueue wq,
-      SshHelper sh,
-      GerritSshApi ga,
+      DynamicItem<AdminApiFactory> aaf,
       ReplicationConfig rc,
       DynamicItem<EventDispatcher> dis,
-      ReplicationStateListener sl,
-      ReplicationState.Factory rsf,
-      EventsStorage es) {
+      ReplicationStateListeners sl,
+      ReplicationTasksStorage rts) {
     workQueue = wq;
-    sshHelper = sh;
     dispatcher = dis;
     config = rc;
     stateLog = sl;
-    gerritAdmin = ga;
-    replicationStateFactory = rsf;
-    eventsStorage = es;
+    adminApiFactory = aaf;
+    replicationTasksStorage = rts;
     beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
   }
 
   @Override
   public void start() {
-    config.startup(workQueue);
-    running = true;
-    firePendingEvents();
-    fireBeforeStartupEvents();
+    if (!running) {
+      config.startup(workQueue);
+      running = true;
+      firePendingEvents();
+      fireBeforeStartupEvents();
+    }
   }
 
   @Override
@@ -117,11 +112,20 @@
     }
   }
 
+  public boolean isRunning() {
+    return running;
+  }
+
+  public boolean isReplaying() {
+    return replaying;
+  }
+
   void scheduleFullSync(Project.NameKey project, String urlMatch, ReplicationState state) {
     scheduleFullSync(project, urlMatch, state, false);
   }
 
-  void scheduleFullSync(
+  @VisibleForTesting
+  public void scheduleFullSync(
       Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
     if (!running) {
       stateLog.warn("Replication plugin did not finish startup before event", state);
@@ -132,6 +136,9 @@
       if (cfg.wouldPushProject(project)) {
         for (URIish uri : cfg.getURIs(project, urlMatch)) {
           cfg.schedule(project, PushOne.ALL_REFS, uri, state, now);
+          replicationTasksStorage.persist(
+              new ReplicateRefUpdate(
+                  project.get(), PushOne.ALL_REFS, uri, cfg.getRemoteConfigName()));
         }
       }
     }
@@ -139,28 +146,25 @@
 
   @Override
   public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
-    ReplicationState state =
-        replicationStateFactory.create(new GitUpdateProcessing(dispatcher.get()));
-    if (!running) {
-      stateLog.warn(
-          "Replication plugin did not finish startup before event, event replication is postponed",
-          state);
-      beforeStartupEventsQueue.add(event);
-      return;
-    }
     onGitReferenceUpdated(event.getProjectName(), event.getRefName());
   }
 
   private void onGitReferenceUpdated(String projectName, String refName) {
-    ReplicationState state =
-        replicationStateFactory.create(new GitUpdateProcessing(dispatcher.get()));
+    ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+    if (!running) {
+      stateLog.warn(
+          "Replication plugin did not finish startup before event, event replication is postponed",
+          state);
+      beforeStartupEventsQueue.add(new ReferenceUpdatedEvent(projectName, refName));
+      return;
+    }
 
     Project.NameKey project = new Project.NameKey(projectName);
     for (Destination cfg : config.getDestinations(FilterType.ALL)) {
       if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
-        String eventKey = eventsStorage.persist(projectName, refName);
-        state.setEventKey(eventKey);
         for (URIish uri : cfg.getURIs(project, null)) {
+          replicationTasksStorage.persist(
+              new ReplicateRefUpdate(projectName, refName, uri, cfg.getRemoteConfigName()));
           cfg.schedule(project, refName, uri, state);
         }
       }
@@ -169,28 +173,26 @@
   }
 
   private void firePendingEvents() {
-    for (EventsStorage.ReplicateRefUpdate e : eventsStorage.list()) {
-      if (e == null) {
-        repLog.warn("Encountered null replication event in EventsStorage");
-      } else {
-        repLog.info("Firing pending event {}", e);
-        onGitReferenceUpdated(e.project, e.ref);
+    try {
+      Set<String> eventsReplayed = new HashSet<>();
+      replaying = true;
+      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) {
+        String eventKey = String.format("%s:%s", t.project, t.ref);
+        if (!eventsReplayed.contains(eventKey)) {
+          repLog.info("Firing pending task {}", eventKey);
+          onGitReferenceUpdated(t.project, t.ref);
+          eventsReplayed.add(eventKey);
+        }
       }
-    }
-  }
-
-  @Override
-  public void onNewProjectCreated(NewProjectCreatedListener.Event event) {
-    Project.NameKey projectName = new Project.NameKey(event.getProjectName());
-    for (URIish uri : getURIs(projectName, FilterType.PROJECT_CREATION)) {
-      createProject(uri, projectName, event.getHeadName());
+    } finally {
+      replaying = false;
     }
   }
 
   @Override
   public void onProjectDeleted(ProjectDeletedListener.Event event) {
     Project.NameKey projectName = new Project.NameKey(event.getProjectName());
-    for (URIish uri : getURIs(projectName, FilterType.PROJECT_DELETION)) {
+    for (URIish uri : getURIs(null, projectName, FilterType.PROJECT_DELETION)) {
       deleteProject(uri, projectName);
     }
   }
@@ -198,24 +200,25 @@
   @Override
   public void onHeadUpdated(HeadUpdatedListener.Event event) {
     Project.NameKey project = new Project.NameKey(event.getProjectName());
-    for (URIish uri : getURIs(project, FilterType.ALL)) {
+    for (URIish uri : getURIs(null, project, FilterType.ALL)) {
       updateHead(uri, project, event.getNewHeadName());
     }
   }
 
   private void fireBeforeStartupEvents() {
     Set<String> eventsReplayed = new HashSet<>();
-    for (GitReferenceUpdatedListener.Event event : beforeStartupEventsQueue) {
+    for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) {
       String eventKey = String.format("%s:%s", event.getProjectName(), event.getRefName());
       if (!eventsReplayed.contains(eventKey)) {
         repLog.info("Firing pending task {}", event);
-        onGitReferenceUpdated(event);
+        onGitReferenceUpdated(event.getProjectName(), event.getRefName());
         eventsReplayed.add(eventKey);
       }
     }
   }
 
-  private Set<URIish> getURIs(Project.NameKey projectName, FilterType filterType) {
+  private Set<URIish> getURIs(
+      @Nullable String remoteName, Project.NameKey projectName, FilterType filterType) {
     if (config.getDestinations(filterType).isEmpty()) {
       return Collections.emptySet();
     }
@@ -230,6 +233,10 @@
         continue;
       }
 
+      if (remoteName != null && !config.getRemoteConfigName().equals(remoteName)) {
+        continue;
+      }
+
       boolean adminURLUsed = false;
 
       for (String url : config.getAdminUrls()) {
@@ -272,201 +279,75 @@
     return uris;
   }
 
-  public boolean createProject(Project.NameKey project, String head) {
+  public boolean createProject(String remoteName, Project.NameKey project, String head) {
     boolean success = true;
-    for (URIish uri : getURIs(project, FilterType.PROJECT_CREATION)) {
+    for (URIish uri : getURIs(remoteName, project, FilterType.PROJECT_CREATION)) {
       success &= createProject(uri, project, head);
     }
     return success;
   }
 
   private boolean createProject(URIish replicateURI, Project.NameKey projectName, String head) {
-    if (isGerrit(replicateURI)) {
-      gerritAdmin.createProject(replicateURI, projectName, head);
-    } else if (!replicateURI.isRemote()) {
-      createLocally(replicateURI, head);
-    } else if (isSSH(replicateURI)) {
-      createRemoteSsh(replicateURI, head);
-    } else {
-      repLog.warn(
-          "Cannot create new project on remote site {}."
-              + " Only local paths and SSH URLs are supported"
-              + " for remote repository creation",
-          replicateURI);
-      return false;
-    }
-    return true;
-  }
-
-  private static void createLocally(URIish uri, String head) {
-    try (Repository repo = new FileRepository(uri.getPath())) {
-      repo.create(true /* bare */);
-
-      if (head != null && head.startsWith(Constants.R_REFS)) {
-        RefUpdate u = repo.updateRef(Constants.HEAD);
-        u.disableRefLog();
-        u.link(head);
-      }
-      repLog.info("Created local repository: {}", uri);
-    } catch (IOException e) {
-      repLog.error("Error creating local repository {}:\n", uri.getPath(), e);
-    }
-  }
-
-  private void createRemoteSsh(URIish uri, String head) {
-    String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
-    String cmd = "mkdir -p " + quotedPath + " && cd " + quotedPath + " && git init --bare";
-    if (head != null) {
-      cmd = cmd + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(head);
-    }
-    OutputStream errStream = sshHelper.newErrorBufferStream();
-    try {
-      sshHelper.executeRemoteSsh(uri, cmd, errStream);
-      repLog.info("Created remote repository: {}", uri);
-    } catch (IOException e) {
-      repLog.error(
-          "Error creating remote repository at {}:\n"
-              + "  Exception: {}\n"
-              + "  Command: {}\n"
-              + "  Output: {}",
-          uri,
-          e,
-          cmd,
-          errStream,
-          e);
-    }
-  }
-
-  private void deleteProject(URIish replicateURI, Project.NameKey projectName) {
-    if (isGerrit(replicateURI)) {
-      gerritAdmin.deleteProject(replicateURI, projectName);
-      repLog.info("Deleted remote repository: " + replicateURI);
-    } else if (!replicateURI.isRemote()) {
-      deleteLocally(replicateURI);
-    } else if (isSSH(replicateURI)) {
-      deleteRemoteSsh(replicateURI);
-    } else {
-      repLog.warn(
-          "Cannot delete project on remote site {}. "
-              + "Only local paths and SSH URLs are supported"
-              + " for remote repository deletion",
-          replicateURI);
-    }
-  }
-
-  private static void deleteLocally(URIish uri) {
-    try {
-      recursivelyDelete(new File(uri.getPath()));
-      repLog.info("Deleted local repository: {}", uri);
-    } catch (IOException e) {
-      repLog.error("Error deleting local repository {}:\n", uri.getPath(), e);
-    }
-  }
-
-  private static void recursivelyDelete(File dir) throws IOException {
-    File[] contents = dir.listFiles();
-    if (contents != null) {
-      for (File d : contents) {
-        if (d.isDirectory()) {
-          recursivelyDelete(d);
-        } else {
-          if (!d.delete()) {
-            throw new IOException("Failed to delete: " + d.getAbsolutePath());
-          }
-        }
-      }
-    }
-    if (!dir.delete()) {
-      throw new IOException("Failed to delete: " + dir.getAbsolutePath());
-    }
-  }
-
-  private void deleteRemoteSsh(URIish uri) {
-    String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
-    String cmd = "rm -rf " + quotedPath;
-    OutputStream errStream = sshHelper.newErrorBufferStream();
-    try {
-      sshHelper.executeRemoteSsh(uri, cmd, errStream);
-      repLog.info("Deleted remote repository: {}", uri);
-    } catch (IOException e) {
-      repLog.error(
-          "Error deleting remote repository at {}:\n"
-              + "  Exception: {}\n"
-              + "  Command: {}\n"
-              + "  Output: {}",
-          uri,
-          e,
-          cmd,
-          errStream,
-          e);
-    }
-  }
-
-  private void updateHead(URIish replicateURI, Project.NameKey projectName, String newHead) {
-    if (isGerrit(replicateURI)) {
-      gerritAdmin.updateHead(replicateURI, projectName, newHead);
-    } else if (!replicateURI.isRemote()) {
-      updateHeadLocally(replicateURI, newHead);
-    } else if (isSSH(replicateURI)) {
-      updateHeadRemoteSsh(replicateURI, newHead);
-    } else {
-      repLog.warn(
-          "Cannot update HEAD of project on remote site {}."
-              + " Only local paths and SSH URLs are supported"
-              + " for remote HEAD update.",
-          replicateURI);
-    }
-  }
-
-  private void updateHeadRemoteSsh(URIish uri, String newHead) {
-    String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
-    String cmd =
-        "cd " + quotedPath + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(newHead);
-    OutputStream errStream = sshHelper.newErrorBufferStream();
-    try {
-      sshHelper.executeRemoteSsh(uri, cmd, errStream);
-    } catch (IOException e) {
-      repLog.error(
-          "Error updating HEAD of remote repository at {} to {}:\n"
-              + "  Exception: {}\n"
-              + "  Command: {}\n"
-              + "  Output: {}",
-          uri,
-          newHead,
-          e,
-          cmd,
-          errStream,
-          e);
-    }
-  }
-
-  private static void updateHeadLocally(URIish uri, String newHead) {
-    try (Repository repo = new FileRepository(uri.getPath())) {
-      if (newHead != null) {
-        RefUpdate u = repo.updateRef(Constants.HEAD);
-        u.link(newHead);
-      }
-    } catch (IOException e) {
-      repLog.error("Failed to update HEAD of repository {} to {}", uri.getPath(), newHead, e);
-    }
-  }
-
-  private static boolean isSSH(URIish uri) {
-    String scheme = uri.getScheme();
-    if (!uri.isRemote()) {
-      return false;
-    }
-    if (scheme != null && scheme.toLowerCase().contains("ssh")) {
+    Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI);
+    if (adminApi.isPresent() && adminApi.get().createProject(projectName, head)) {
       return true;
     }
-    if (scheme == null && uri.getHost() != null && uri.getPath() != null) {
-      return true;
-    }
+
+    warnCannotPerform("create new project", replicateURI);
     return false;
   }
 
-  private static boolean isGerrit(URIish uri) {
-    String scheme = uri.getScheme();
-    return scheme != null && scheme.toLowerCase().equals("gerrit+ssh");
+  private void deleteProject(URIish replicateURI, Project.NameKey projectName) {
+    Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI);
+    if (adminApi.isPresent()) {
+      adminApi.get().deleteProject(projectName);
+      return;
+    }
+
+    warnCannotPerform("delete project", replicateURI);
+  }
+
+  private void updateHead(URIish replicateURI, Project.NameKey projectName, String newHead) {
+    Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI);
+    if (adminApi.isPresent()) {
+      adminApi.get().updateHead(projectName, newHead);
+      return;
+    }
+
+    warnCannotPerform("update HEAD of project", replicateURI);
+  }
+
+  private void warnCannotPerform(String op, URIish uri) {
+    repLog.warn("Cannot {} on remote site {}.", op, uri);
+  }
+
+  private static class ReferenceUpdatedEvent {
+    private String projectName;
+    private String refName;
+
+    public ReferenceUpdatedEvent(String projectName, String refName) {
+      this.projectName = projectName;
+      this.refName = refName;
+    }
+
+    public String getProjectName() {
+      return projectName;
+    }
+
+    public String getRefName() {
+      return refName;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(projectName, refName);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return (obj instanceof ReferenceUpdatedEvent)
+          && Objects.equal(projectName, ((ReferenceUpdatedEvent) obj).projectName)
+          && Objects.equal(refName, ((ReferenceUpdatedEvent) obj).refName);
+    }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
index cd7a3cf..aa965fe 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
@@ -15,7 +15,6 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.reviewdb.client.Project.NameKey;
 import com.google.gerrit.server.events.RefEvent;
 
 public class ReplicationScheduledEvent extends RefEvent {
@@ -38,7 +37,7 @@
   }
 
   @Override
-  public NameKey getProjectNameKey() {
+  public Project.NameKey getProjectNameKey() {
     return new Project.NameKey(project);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
index 6f0803a..df8f3f4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -16,8 +16,6 @@
 
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Table;
-import com.google.inject.assistedinject.Assisted;
-import com.google.inject.assistedinject.AssistedInject;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -26,12 +24,7 @@
 
 public class ReplicationState {
 
-  public interface Factory {
-    ReplicationState create(PushResultProcessing processing);
-  }
-
   private boolean allScheduled;
-  private final EventsStorage eventsStorage;
   private final PushResultProcessing pushResultProcessing;
 
   private final Lock countingLock = new ReentrantLock();
@@ -57,11 +50,7 @@
   private int totalPushTasksCount;
   private int finishedPushTasksCount;
 
-  private String eventKey;
-
-  @AssistedInject
-  ReplicationState(EventsStorage storage, @Assisted PushResultProcessing processing) {
-    eventsStorage = storage;
+  ReplicationState(PushResultProcessing processing) {
     pushResultProcessing = processing;
     statusByProjectRef = HashBasedTable.create();
   }
@@ -86,7 +75,6 @@
       URIish uri,
       RefPushResult status,
       RemoteRefUpdate.Status refUpdateStatus) {
-    deleteEvent();
     pushResultProcessing.onRefReplicatedToOneNode(project, ref, uri, status, refUpdateStatus);
 
     RefReplicationStatus completedRefStatus = null;
@@ -116,12 +104,6 @@
     }
   }
 
-  private void deleteEvent() {
-    if (eventKey != null) {
-      eventsStorage.delete(eventKey);
-    }
-  }
-
   public void markAllPushTasksScheduled() {
     countingLock.lock();
     try {
@@ -192,8 +174,4 @@
       return name().toLowerCase().replace("_", "-");
     }
   }
-
-  public void setEventKey(String eventKey) {
-    this.eventKey = eventKey;
-  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateListeners.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateListeners.java
new file mode 100644
index 0000000..d7bf227
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateListeners.java
@@ -0,0 +1,48 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.inject.Inject;
+
+public class ReplicationStateListeners implements ReplicationStateListener {
+  private final DynamicSet<ReplicationStateListener> listeners;
+
+  @Inject
+  ReplicationStateListeners(DynamicSet<ReplicationStateListener> stateListeners) {
+    this.listeners = stateListeners;
+  }
+
+  @Override
+  public void warn(String msg, ReplicationState... states) {
+    for (ReplicationStateListener listener : listeners) {
+      listener.warn(msg, states);
+    }
+  }
+
+  @Override
+  public void error(String msg, ReplicationState... states) {
+    for (ReplicationStateListener listener : listeners) {
+      listener.error(msg, states);
+    }
+  }
+
+  @Override
+  public void error(String msg, Throwable t, ReplicationState... states) {
+    for (ReplicationStateListener listener : listeners) {
+      listener.error(msg, t, states);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
new file mode 100644
index 0000000..39158f3
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -0,0 +1,137 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
+import com.google.common.hash.Hashing;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.transport.URIish;
+
+@Singleton
+public class ReplicationTasksStorage {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private boolean disableDeleteForTesting;
+
+  public static class ReplicateRefUpdate {
+    public final String project;
+    public final String ref;
+    public final String uri;
+    public final String remote;
+
+    public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
+      this.project = project;
+      this.ref = ref;
+      this.uri = uri.toASCIIString();
+      this.remote = remote;
+    }
+
+    @Override
+    public String toString() {
+      return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
+    }
+  }
+
+  private static final Gson GSON = new Gson();
+
+  private final Path refUpdates;
+
+  @Inject
+  ReplicationTasksStorage(ReplicationConfig config) {
+    refUpdates = config.getEventsDirectory().resolve("ref-updates");
+  }
+
+  public String persist(ReplicateRefUpdate r) {
+    String json = GSON.toJson(r) + "\n";
+    String eventKey = sha1(json).name();
+    Path file = refUpdates().resolve(eventKey);
+
+    if (Files.exists(file)) {
+      return eventKey;
+    }
+
+    try {
+      logger.atFine().log("CREATE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
+      Files.write(file, json.getBytes(UTF_8));
+    } catch (IOException e) {
+      logger.atWarning().withCause(e).log("Couldn't persist event %s", json);
+    }
+    return eventKey;
+  }
+
+  @VisibleForTesting
+  public void disableDeleteForTesting(boolean deleteDisabled) {
+    this.disableDeleteForTesting = deleteDisabled;
+  }
+
+  public void delete(ReplicateRefUpdate r) {
+    String taskJson = GSON.toJson(r) + "\n";
+    String taskKey = sha1(taskJson).name();
+    Path file = refUpdates().resolve(taskKey);
+
+    if (disableDeleteForTesting) {
+      logger.atFine().log("DELETE %s (%s:%s => %s) DISABLED", file, r.project, r.ref, r.uri);
+      return;
+    }
+
+    try {
+      logger.atFine().log("DELETE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
+      Files.delete(file);
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Error while deleting event %s", taskKey);
+    }
+  }
+
+  public List<ReplicateRefUpdate> list() {
+    ArrayList<ReplicateRefUpdate> result = new ArrayList<>();
+    try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) {
+      for (Path e : events) {
+        if (Files.isRegularFile(e)) {
+          String json = new String(Files.readAllBytes(e), UTF_8);
+          result.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+        }
+      }
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Error when firing pending events");
+    }
+    return result;
+  }
+
+  @SuppressWarnings("deprecation")
+  private ObjectId sha1(String s) {
+    return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
+  }
+
+  private Path refUpdates() {
+    try {
+      return Files.createDirectories(refUpdates);
+    } catch (IOException e) {
+      throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
index 2b0c16b..c518091 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
@@ -17,6 +17,7 @@
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import java.io.IOException;
+import java.util.Objects;
 import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
@@ -49,8 +50,8 @@
 
   @Override
   public SecureCredentialsProvider create(String remoteName) {
-    String user = config.getString("remote", remoteName, "username");
-    String pass = config.getString("remote", remoteName, "password");
+    String user = Objects.toString(config.getString("remote", remoteName, "username"), "");
+    String pass = Objects.toString(config.getString("remote", remoteName, "password"), "");
     return new SecureCredentialsProvider(user, pass);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java
index c4294a9..62b4036 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java
@@ -20,7 +20,7 @@
 import org.eclipse.jgit.transport.URIish;
 
 /** Looks up a remote's password in secure.config. */
-class SecureCredentialsProvider extends CredentialsProvider {
+public class SecureCredentialsProvider extends CredentialsProvider {
   private final String cfgUser;
   private final String cfgPass;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
index 2dbc7b7..fa8b44c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
@@ -52,15 +52,14 @@
   @Inject private PushAll.Factory pushFactory;
 
   private final Object lock = new Object();
-  @Inject private ReplicationState.Factory replicationStateFactory;
 
   @Override
   protected void run() throws Failure {
-    if (all && projectPatterns.size() > 0) {
+    if (all && !projectPatterns.isEmpty()) {
       throw new UnloggedFailure(1, "error: cannot combine --all and PROJECT");
     }
 
-    ReplicationState state = replicationStateFactory.create(new CommandProcessing(this));
+    ReplicationState state = new ReplicationState(new CommandProcessing(this));
     Future<?> future = null;
 
     ReplicationFilter projectFilter;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactory.java
new file mode 100644
index 0000000..ba14299
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactory.java
@@ -0,0 +1,26 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+
+public interface TransportFactory {
+
+  Transport open(Repository local, URIish uri) throws NotSupportedException, TransportException;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactoryImpl.java b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactoryImpl.java
new file mode 100644
index 0000000..58c1214
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactoryImpl.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+
+public class TransportFactoryImpl implements TransportFactory {
+
+  @Override
+  public Transport open(Repository git, URIish uri)
+      throws NotSupportedException, TransportException {
+    return Transport.open(git, uri);
+  }
+}
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 69a371b..adb8d4c 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -24,7 +24,7 @@
 
 * `refs/users/*` (user branches)
 * `refs/meta/external-ids` (external IDs)
-* `refs/starred-changes/*` (star labels)
+* `refs/starred-changes/*` (star labels, not needed for Gerrit slaves)
 * `refs/sequences/accounts` (account sequence numbers, not needed for Gerrit
   slaves)
 
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index c479ab2..c9356b0 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -13,6 +13,11 @@
   sudo su -c 'ssh mirror1.us.some.org echo' gerrit2
 ```
 
+*NOTE:* make sure the local user's ssh keys format is PEM, here how to generate them:
+```
+  ssh-keygen -m PEM -t rsa -C "your_email@example.com"
+```
+
 <a name="example_file">
 Next, create `$site_path/etc/replication.config` as a Git-style config
 file, for example to replicate in parallel to four different hosts:</a>
@@ -283,6 +288,20 @@
 
 	By default, use replication.maxRetries.
 
+remote.NAME.drainQueueAttempts
+:	Maximum number of attempts to drain the replication event queue before
+	stopping the plugin.
+
+	When stopping the plugin, the shutdown will be delayed trying to drain
+	the event queue.
+
+	The maximum delay is "drainQueueAttempts" * "replicationDelay" seconds.
+
+	When not set or set to 0, the queue is not drained and the pending
+	replication events are cancelled.
+
+	By default, do not drain replication events.
+
 remote.NAME.threads
 :	Number of worker threads to dedicate to pushing to the
 	repositories described by this remote.  Each thread can push
@@ -309,7 +328,7 @@
 	If the remote site was not available at the moment when a new
 	project was created, it will be created if during the replication
 	of a ref it is found to be missing.
-	
+
 	If false, repositories are never created automatically on this
 	remote.
 
@@ -409,7 +428,7 @@
 File `~/.ssh/config`
 --------------------
 
-If present, Gerrit reads and caches `~/.ssh/config` at startup, and
+Gerrit reads and caches the `~/.ssh/config` at startup, and
 supports most SSH configuration options.  For example:
 
 ```
@@ -423,6 +442,15 @@
     PreferredAuthentications publickey
 ```
 
+*IdentityFile* and *PreferredAuthentications* must be defined for all the hosts.
+Here an example of the minimum `~/.ssh/config` needed:
+
+```
+  Host *
+    IdentityFile ~/.ssh/id_rsa
+    PreferredAuthentications publickey
+```
+
 Supported options:
 
   * Host
diff --git a/src/main/resources/Documentation/extension-point.md b/src/main/resources/Documentation/extension-point.md
new file mode 100644
index 0000000..076aded
--- /dev/null
+++ b/src/main/resources/Documentation/extension-point.md
@@ -0,0 +1,53 @@
+@PLUGIN@ extension points
+==============
+
+The replication plugin exposes an extension point to allow influencing its behaviour from another plugin or a script.
+Extension points can be defined from the replication plugin only when it is loaded as [libModule](/config-gerrit.html#gerrit.installModule) and
+implemented by another plugin by declaring a `provided` dependency from the replication plugin.
+
+### Install extension libModule
+
+The replication plugin's extension points are defined in the `c.g.g.p.r.ReplicationExtensionPointModule`
+that needs to be configured as libModule.
+
+Create a symbolic link from `$GERRIT_SITE/plugins/replication.jar` into `$GERRIT_SITE/lib`
+and then add the replication extension module to the `gerrit.config`.
+
+Example:
+
+```
+[gerrit]
+  installModule = com.googlesource.gerrit.plugins.replication.ReplicationExtensionPointModule
+```
+
+> **NOTE**: Use and configuration of the replication plugin as library module requires a Gerrit server restart and does not support hot plugin install or upgrade.
+
+
+### Extension points
+
+* `com.googlesource.gerrit.plugins.replication.ReplicationPushFilter`
+
+  Filter out the ref updates pushed to a remote instance.
+  Only one filter at a time is supported. Filter implementation needs to bind a `DynamicItem`.
+
+  Default: no filtering
+
+  Example:
+
+  ```
+  DynamicItem.bind(binder(), ReplicationPushFilter.class).to(ReplicationPushFilterImpl.class);
+  ```
+
+* `com.googlesource.gerrit.plugins.replication.AdminApiFactory`
+
+  Create an instance of `AdminApi` for a given remote URL. The default implementation
+  provides API instances for local FS, remote SSH, and remote Gerrit.
+
+  Only one factory at a time is supported. The implementation needs to be bound as a
+  `DynamicItem`.
+
+  Example:
+
+  ```
+  DynamicItem.bind(binder(), AdminApiFactory.class).to(AdminApiFactoryImpl.class);
+  ```
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
index 337bd1d..5fa7b98 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
@@ -22,8 +22,8 @@
 import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 
-import com.google.gerrit.common.EventDispatcher;
 import com.google.gerrit.reviewdb.server.ReviewDb;
+import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gwtorm.client.KeyUtil;
 import com.google.gwtorm.server.OrmException;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
new file mode 100644
index 0000000..af065b3
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -0,0 +1,375 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.RemoteRefUpdateCollectionMatcher.eqRemoteRef;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.getCurrentArguments;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.eclipse.jgit.lib.Ref.Storage.NEW;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.git.PerThreadRequestScope;
+import com.google.gerrit.server.permissions.PermissionBackend;
+import com.google.gerrit.server.permissions.PermissionBackend.ForProject;
+import com.google.gerrit.server.permissions.PermissionBackend.WithUser;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gerrit.server.project.ProjectState;
+import com.google.gerrit.server.util.IdGenerator;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import junit.framework.AssertionFailedError;
+import org.easymock.IAnswer;
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectIdRef;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.FetchConnection;
+import org.eclipse.jgit.transport.PushConnection;
+import org.eclipse.jgit.transport.PushResult;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+import org.eclipse.jgit.util.FS;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PushOneTest {
+  private static final int TEST_PUSH_TIMEOUT_SECS = 10;
+
+  private GitRepositoryManager gitRepositoryManagerMock;
+  private Repository repositoryMock;
+  private PermissionBackend permissionBackendMock;
+  private PermissionBackend.WithUser withUserMock;
+  private PermissionBackend.ForProject forProjectMock;
+
+  private Destination destinationMock;
+  private RemoteConfig remoteConfigMock;
+  private RefSpec refSpecMock;
+  private CredentialsFactory credentialsFactory;
+  private PerThreadRequestScope.Scoper threadRequestScoperMock;
+  private ReplicationQueue replicationQueueMock;
+  private IdGenerator idGeneratorMock;
+  private ReplicationStateListeners replicationStateListenersMock;
+  private ReplicationMetrics replicationMetricsMock;
+  private Timer1.Context timerContextMock;
+  private ProjectCache projectCacheMock;
+  private TransportFactory transportFactoryMock;
+  private Transport transportMock;
+  private FetchConnection fetchConnection;
+  private PushConnection pushConnection;
+  private ProjectState projectStateMock;
+  private RefUpdate refUpdateMock;
+
+  private Project.NameKey projectNameKey;
+  private URIish urish;
+  private Map<String, Ref> localRefs;
+
+  private Map<String, Ref> remoteRefs;
+  private CountDownLatch isCallFinished;
+  private Ref newLocalRef;
+
+  @Before
+  public void setup() throws Exception {
+    projectNameKey = new Project.NameKey("fooProject");
+    urish = new URIish("http://foo.com/fooProject.git");
+
+    newLocalRef =
+        new ObjectIdRef.Unpeeled(
+            NEW, "foo", ObjectId.fromString("0000000000000000000000000000000000000001"));
+
+    localRefs = new HashMap<>();
+    localRefs.put("fooProject", newLocalRef);
+
+    Ref remoteRef = new ObjectIdRef.Unpeeled(NEW, "foo", ObjectId.zeroId());
+    remoteRefs = new HashMap<>();
+    remoteRefs.put("fooProject", remoteRef);
+
+    isCallFinished = new CountDownLatch(1);
+
+    setupMocks();
+  }
+
+  private void setupMocks() throws Exception {
+    FileBasedConfig config = new FileBasedConfig(new Config(), new File("/foo"), FS.DETECTED);
+    config.setString("remote", "Replication", "push", "foo");
+
+    setupRefUpdateMock();
+    setupRepositoryMock(config);
+    setupGitRepoManagerMock();
+
+    projectStateMock = createNiceMock(ProjectState.class);
+    forProjectMock = createNiceMock(ForProject.class);
+    setupWithUserMock();
+    setupPermissionBackedMock();
+
+    setupDestinationMock();
+
+    setupRefSpecMock();
+    setupRemoteConfigMock();
+
+    credentialsFactory = createNiceMock(CredentialsFactory.class);
+
+    setupFetchConnectionMock();
+    setupPushConnectionMock();
+    setupRequestScopeMock();
+    replicationQueueMock = createNiceMock(ReplicationQueue.class);
+    idGeneratorMock = createNiceMock(IdGenerator.class);
+    replicationStateListenersMock = createNiceMock(ReplicationStateListeners.class);
+
+    timerContextMock = createNiceMock(Timer1.Context.class);
+    setupReplicationMetricsMock();
+
+    setupTransportMock();
+
+    setupProjectCacheMock();
+
+    replay(
+        gitRepositoryManagerMock,
+        refUpdateMock,
+        repositoryMock,
+        permissionBackendMock,
+        destinationMock,
+        remoteConfigMock,
+        credentialsFactory,
+        threadRequestScoperMock,
+        replicationQueueMock,
+        idGeneratorMock,
+        replicationStateListenersMock,
+        replicationMetricsMock,
+        projectCacheMock,
+        timerContextMock,
+        transportFactoryMock,
+        projectStateMock,
+        withUserMock,
+        forProjectMock,
+        fetchConnection,
+        pushConnection,
+        refSpecMock);
+  }
+
+  @Test
+  public void shouldPushAllRefsWhenNoFilters() throws InterruptedException, IOException {
+    shouldPushAllRefsWithDynamicItemFilter(DynamicItem.itemOf(ReplicationPushFilter.class, null));
+  }
+
+  @Test
+  public void shouldPushAllRefsWhenNoFiltersSetup() throws InterruptedException, IOException {
+    shouldPushAllRefsWithDynamicItemFilter(null);
+  }
+
+  private void shouldPushAllRefsWithDynamicItemFilter(
+      DynamicItem<ReplicationPushFilter> replicationPushFilter)
+      throws IOException, NotSupportedException, TransportException, InterruptedException {
+    List<RemoteRefUpdate> expectedUpdates =
+        Arrays.asList(
+            new RemoteRefUpdate(
+                repositoryMock,
+                newLocalRef.getName(),
+                newLocalRef.getObjectId(),
+                "fooProject",
+                false,
+                "fooProject",
+                null));
+
+    PushResult pushResult = new PushResult();
+
+    expect(transportMock.push(anyObject(), eqRemoteRef(expectedUpdates)))
+        .andReturn(pushResult)
+        .once();
+    replay(transportMock);
+
+    PushOne pushOne = createPushOne(replicationPushFilter);
+
+    pushOne.addRef(PushOne.ALL_REFS);
+    pushOne.run();
+
+    isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
+
+    verify(transportMock);
+  }
+
+  @Test
+  public void shouldBlockReplicationUsingPushFilter() throws InterruptedException, IOException {
+    DynamicItem<ReplicationPushFilter> replicationPushFilter =
+        DynamicItem.itemOf(
+            ReplicationPushFilter.class,
+            new ReplicationPushFilter() {
+
+              @Override
+              public List<RemoteRefUpdate> filter(
+                  String projectName, List<RemoteRefUpdate> remoteUpdatesList) {
+                return Collections.emptyList();
+              }
+            });
+
+    // easymock way to check if method was never called
+    expect(transportMock.push(anyObject(), anyObject()))
+        .andThrow(new AssertionFailedError())
+        .anyTimes();
+    replay(transportMock);
+
+    PushOne pushOne = createPushOne(replicationPushFilter);
+
+    pushOne.addRef(PushOne.ALL_REFS);
+    pushOne.run();
+
+    isCallFinished.await(10, TimeUnit.SECONDS);
+
+    verify(transportMock);
+  }
+
+  private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
+    PushOne push =
+        new PushOne(
+            gitRepositoryManagerMock,
+            permissionBackendMock,
+            destinationMock,
+            remoteConfigMock,
+            credentialsFactory,
+            threadRequestScoperMock,
+            replicationQueueMock,
+            idGeneratorMock,
+            replicationStateListenersMock,
+            replicationMetricsMock,
+            projectCacheMock,
+            transportFactoryMock,
+            projectNameKey,
+            urish);
+
+    push.setReplicationPushFilter(replicationPushFilter);
+    return push;
+  }
+
+  private void setupProjectCacheMock() throws IOException {
+    projectCacheMock = createNiceMock(ProjectCache.class);
+    expect(projectCacheMock.checkedGet(projectNameKey)).andReturn(projectStateMock);
+  }
+
+  private void setupTransportMock() throws NotSupportedException, TransportException {
+    transportMock = createNiceMock(Transport.class);
+    expect(transportMock.openFetch()).andReturn(fetchConnection);
+    transportFactoryMock = createNiceMock(TransportFactory.class);
+    expect(transportFactoryMock.open(repositoryMock, urish)).andReturn(transportMock).anyTimes();
+  }
+
+  private void setupReplicationMetricsMock() {
+    replicationMetricsMock = createNiceMock(ReplicationMetrics.class);
+    expect(replicationMetricsMock.start(anyObject())).andReturn(timerContextMock);
+  }
+
+  private void setupRequestScopeMock() {
+    threadRequestScoperMock = createNiceMock(PerThreadRequestScope.Scoper.class);
+    expect(threadRequestScoperMock.scope(anyObject()))
+        .andAnswer(
+            new IAnswer<Callable<Object>>() {
+              @SuppressWarnings("unchecked")
+              @Override
+              public Callable<Object> answer() throws Throwable {
+                Callable<Object> originalCall = (Callable<Object>) getCurrentArguments()[0];
+                return new Callable<Object>() {
+
+                  @Override
+                  public Object call() throws Exception {
+                    Object result = originalCall.call();
+                    isCallFinished.countDown();
+                    return result;
+                  }
+                };
+              }
+            })
+        .anyTimes();
+  }
+
+  private void setupPushConnectionMock() {
+    pushConnection = createNiceMock(PushConnection.class);
+    expect(pushConnection.getRefsMap()).andReturn(remoteRefs);
+  }
+
+  private void setupFetchConnectionMock() {
+    fetchConnection = createNiceMock(FetchConnection.class);
+    expect(fetchConnection.getRefsMap()).andReturn(remoteRefs);
+  }
+
+  private void setupRemoteConfigMock() {
+    remoteConfigMock = createNiceMock(RemoteConfig.class);
+    expect(remoteConfigMock.getPushRefSpecs()).andReturn(ImmutableList.of(refSpecMock));
+  }
+
+  private void setupRefSpecMock() {
+    refSpecMock = createNiceMock(RefSpec.class);
+    expect(refSpecMock.matchSource(anyObject(String.class))).andReturn(true);
+    expect(refSpecMock.expandFromSource(anyObject(String.class))).andReturn(refSpecMock);
+    expect(refSpecMock.getDestination()).andReturn("fooProject").anyTimes();
+    expect(refSpecMock.isForceUpdate()).andReturn(false).anyTimes();
+  }
+
+  private void setupDestinationMock() {
+    destinationMock = createNiceMock(Destination.class);
+    expect(destinationMock.requestRunway(anyObject())).andReturn(RunwayStatus.allowed());
+  }
+
+  private void setupPermissionBackedMock() {
+    permissionBackendMock = createNiceMock(PermissionBackend.class);
+    expect(permissionBackendMock.currentUser()).andReturn(withUserMock);
+  }
+
+  private void setupWithUserMock() {
+    withUserMock = createNiceMock(WithUser.class);
+    expect(withUserMock.project(projectNameKey)).andReturn(forProjectMock);
+  }
+
+  private void setupGitRepoManagerMock() throws IOException {
+    gitRepositoryManagerMock = createNiceMock(GitRepositoryManager.class);
+    expect(gitRepositoryManagerMock.openRepository(projectNameKey)).andReturn(repositoryMock);
+  }
+
+  @SuppressWarnings("deprecation")
+  private void setupRepositoryMock(FileBasedConfig config) throws IOException {
+    repositoryMock = createNiceMock(Repository.class);
+    expect(repositoryMock.getConfig()).andReturn(config).anyTimes();
+    expect(repositoryMock.getAllRefs()).andReturn(localRefs);
+    expect(repositoryMock.updateRef("fooProject")).andReturn(refUpdateMock);
+  }
+
+  private void setupRefUpdateMock() {
+    refUpdateMock = createNiceMock(RefUpdate.class);
+    expect(refUpdateMock.getOldObjectId())
+        .andReturn(ObjectId.fromString("0000000000000000000000000000000000000001"))
+        .anyTimes();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
new file mode 100644
index 0000000..111a792
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
@@ -0,0 +1,64 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.util.Collection;
+import java.util.Objects;
+import org.easymock.EasyMock;
+import org.easymock.IArgumentMatcher;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+
+public class RemoteRefUpdateCollectionMatcher implements IArgumentMatcher {
+  Collection<RemoteRefUpdate> expectedRemoteRefs;
+
+  public static Collection<RemoteRefUpdate> eqRemoteRef(
+      Collection<RemoteRefUpdate> expectedRemoteRefs) {
+    EasyMock.reportMatcher(new RemoteRefUpdateCollectionMatcher(expectedRemoteRefs));
+    return null;
+  }
+
+  public RemoteRefUpdateCollectionMatcher(Collection<RemoteRefUpdate> expectedRemoteRefs) {
+    this.expectedRemoteRefs = expectedRemoteRefs;
+  }
+
+  @Override
+  public boolean matches(Object argument) {
+    if (!(argument instanceof Collection)) return false;
+
+    @SuppressWarnings("unchecked")
+    Collection<RemoteRefUpdate> refs = (Collection<RemoteRefUpdate>) argument;
+
+    if (expectedRemoteRefs.size() != refs.size()) return false;
+    return refs.stream()
+        .allMatch(
+            ref -> expectedRemoteRefs.stream().anyMatch(expectedRef -> compare(ref, expectedRef)));
+  }
+
+  @Override
+  public void appendTo(StringBuffer buffer) {
+    buffer.append("expected:" + expectedRemoteRefs.toString());
+  }
+
+  private boolean compare(RemoteRefUpdate ref, RemoteRefUpdate expectedRef) {
+    return Objects.equals(ref.getRemoteName(), expectedRef.getRemoteName())
+        && Objects.equals(ref.getStatus(), expectedRef.getStatus())
+        && Objects.equals(ref.getExpectedOldObjectId(), expectedRef.getExpectedOldObjectId())
+        && Objects.equals(ref.getNewObjectId(), expectedRef.getNewObjectId())
+        && Objects.equals(ref.isFastForward(), expectedRef.isFastForward())
+        && Objects.equals(ref.getSrcRef(), expectedRef.getSrcRef())
+        && Objects.equals(ref.isForceUpdate(), expectedRef.isForceUpdate())
+        && Objects.equals(ref.getMessage(), expectedRef.getMessage());
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
new file mode 100644
index 0000000..61a53f3
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -0,0 +1,377 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import com.google.gerrit.extensions.common.ProjectInfo;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.google.inject.Key;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.eclipse.jgit.transport.URIish;
+import org.eclipse.jgit.util.FS;
+import org.junit.Test;
+
+@UseLocalDisk
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationIT extends LightweightPluginDaemonTest {
+  private static final Optional<String> ALL_PROJECTS = Optional.empty();
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int TEST_REPLICATION_DELAY = 1;
+  private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2);
+
+  @Inject private SitePaths sitePaths;
+  private Path pluginDataDir;
+  private Path gitPath;
+  private Path storagePath;
+  private FileBasedConfig config;
+  private ReplicationTasksStorage tasksStorage;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    gitPath = sitePaths.site_path.resolve("git");
+
+    config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    setReplicationDestination(
+        "remote1",
+        "suffix1",
+        Optional.of("not-used-project")); // Simulates a full replication.config initialization
+    config.save();
+
+    super.setUpTestPlugin();
+
+    pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
+    storagePath = pluginDataDir.resolve("ref-updates");
+    tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+    cleanupReplicationTasks();
+    tasksStorage.disableDeleteForTesting(true);
+  }
+
+  @Test
+  public void shouldReplicateNewProject() throws Exception {
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Project.NameKey sourceProject = createTestProject("foo");
+
+    assertThat(listReplicationTasks("refs/meta/config")).hasSize(1);
+
+    waitUntil(() -> projectExists(new Project.NameKey(sourceProject + "replica.git")));
+
+    ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get();
+    assertThat(replicaProject).isNotNull();
+  }
+
+  @Test
+  public void shouldReplicateNewChangeRef() throws Exception {
+    Project.NameKey targetProject = createTestProject("projectreplica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().getRefName();
+
+    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1);
+
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  @Test
+  public void shouldReplicateNewBranch() throws Exception {
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Project.NameKey targetProject = createTestProject("projectreplica");
+    String newBranch = "refs/heads/mybranch";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(project.get()).branch(newBranch).create(input);
+
+    assertThat(listReplicationTasks("refs/heads/(mybranch|master)")).hasSize(2);
+
+    try (Repository repo = repoManager.openRepository(targetProject);
+        Repository sourceRepo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, newBranch) != null);
+
+      Ref masterRef = getRef(sourceRepo, master);
+      Ref targetBranchRef = getRef(repo, newBranch);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(masterRef.getObjectId());
+    }
+  }
+
+  @Test
+  public void shouldReplicateNewBranchToTwoRemotes() throws Exception {
+    Project.NameKey targetProject1 = createTestProject("projectreplica1");
+    Project.NameKey targetProject2 = createTestProject("projectreplica2");
+
+    setReplicationDestination("foo1", "replica1", ALL_PROJECTS);
+    setReplicationDestination("foo2", "replica2", ALL_PROJECTS);
+    reloadConfig();
+
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().getRefName();
+
+    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
+
+    try (Repository repo1 = repoManager.openRepository(targetProject1);
+        Repository repo2 = repoManager.openRepository(targetProject2)) {
+      waitUntil(
+          () ->
+              (checkedGetRef(repo1, sourceRef) != null && checkedGetRef(repo2, sourceRef) != null));
+
+      Ref targetBranchRef1 = getRef(repo1, sourceRef);
+      assertThat(targetBranchRef1).isNotNull();
+      assertThat(targetBranchRef1.getObjectId()).isEqualTo(sourceCommit.getId());
+
+      Ref targetBranchRef2 = getRef(repo2, sourceRef);
+      assertThat(targetBranchRef2).isNotNull();
+      assertThat(targetBranchRef2.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  @Test
+  public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
+    List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+    createTestProject("projectreplica1");
+    createTestProject("projectreplica2");
+
+    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+    config.setInt("remote", "foo1", "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    config.setInt("remote", "foo2", "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    reloadConfig();
+
+    createChange();
+
+    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+
+    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+  }
+
+  @Test
+  public void shouldCreateOneReplicationTaskWhenSchedulingRepoFullSync() throws Exception {
+    PushResultProcessing pushResultProcessing =
+        new PushResultProcessing() {
+
+          @Override
+          void onRefReplicatedToOneNode(
+              String project,
+              String ref,
+              URIish uri,
+              ReplicationState.RefPushResult status,
+              RemoteRefUpdate.Status refStatus) {}
+
+          @Override
+          void onRefReplicatedToAllNodes(String project, String ref, int nodesCount) {}
+
+          @Override
+          void onAllRefsReplicatedToAllNodes(int totalPushTasksCount) {}
+        };
+
+    createTestProject("projectreplica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    plugin
+        .getSysInjector()
+        .getInstance(ReplicationQueue.class)
+        .scheduleFullSync(project, null, new ReplicationState(pushResultProcessing), true);
+
+    assertThat(listReplicationTasks(".*all.*")).hasSize(1);
+  }
+
+  @Test
+  public void shouldReplicateHeadUpdate() throws Exception {
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Project.NameKey targetProject = createTestProject("projectreplica");
+    String newHead = "refs/heads/newhead";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(project.get()).branch(newHead).create(input);
+    gApi.projects().name(project.get()).head(newHead);
+
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, newHead) != null);
+
+      Ref targetProjectHead = getRef(repo, Constants.HEAD);
+      assertThat(targetProjectHead).isNotNull();
+      assertThat(targetProjectHead.getTarget().getName()).isEqualTo(newHead);
+    }
+  }
+
+  @Test
+  public void shouldNotDrainTheQueueWhenReloading() throws Exception {
+    // Setup repo to replicate
+    Project.NameKey targetProject = createTestProject("projectreplica");
+    String remoteName = "doNotDrainQueue";
+    setReplicationDestination(remoteName, "replica", ALL_PROJECTS);
+
+    Result pushResult = createChange();
+    shutdownConfig();
+
+    pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().getRefName();
+
+    assertThrows(
+        InterruptedException.class,
+        () -> {
+          try (Repository repo = repoManager.openRepository(targetProject)) {
+            waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+          }
+        });
+  }
+
+  @Test
+  public void shouldDrainTheQueueWhenReloading() throws Exception {
+    // Setup repo to replicate
+    Project.NameKey targetProject = createTestProject("projectreplica");
+    String remoteName = "drainQueue";
+    setReplicationDestination(remoteName, "replica", ALL_PROJECTS);
+
+    config.setInt("remote", remoteName, "drainQueueAttempts", 2);
+    config.save();
+    reloadConfig();
+
+    Result pushResult = createChange();
+    shutdownConfig();
+
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().getRefName();
+
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  private Project.NameKey createTestProject(String name) throws Exception {
+    return createProject(name);
+  }
+
+  private Ref getRef(Repository repo, String branchName) throws IOException {
+    return repo.getRefDatabase().exactRef(branchName);
+  }
+
+  private Ref checkedGetRef(Repository repo, String branchName) {
+    try {
+      return repo.getRefDatabase().exactRef(branchName);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
+      return null;
+    }
+  }
+
+  private void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
+    setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project);
+  }
+
+  private void setReplicationDestination(
+      String remoteName, List<String> replicaSuffixes, Optional<String> project)
+      throws IOException {
+
+    List<String> replicaUrls =
+        replicaSuffixes.stream()
+            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+            .collect(toList());
+    config.setStringList("remote", remoteName, "url", replicaUrls);
+    config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+    config.save();
+  }
+
+  private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
+    WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
+  }
+
+  private void reloadConfig() {
+    plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).forceReload();
+  }
+
+  private void shutdownConfig() {
+    plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).shutdown();
+  }
+
+  private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
+    Pattern refmaskPattern = Pattern.compile(refRegex);
+    return tasksStorage.list().stream()
+        .filter(task -> refmaskPattern.matcher(task.ref).matches())
+        .collect(toList());
+  }
+
+  private void cleanupReplicationTasks() throws IOException {
+    try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) {
+      for (Path path : files) {
+        path.toFile().delete();
+      }
+    }
+  }
+
+  private boolean projectExists(Project.NameKey name) {
+    try (Repository r = repoManager.openRepository(name)) {
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
index 881a282..2a395ca 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
@@ -56,13 +56,13 @@
   private FileBasedConfig config;
 
   @Override
-  public void setUp() throws Exception {
+  public void setUpTestPlugin() throws Exception {
     gitPath = sitePaths.site_path.resolve("git");
     config =
         new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
 
     setReplicationDestination("foo", "replica");
-    super.setUp();
+    super.setUpTestPlugin();
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
index cf6715e..193af1e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
@@ -32,15 +32,12 @@
 
   private ReplicationState replicationState;
   private PushResultProcessing pushResultProcessingMock;
-  private EventsStorage eventsStorage;
 
   @Before
   public void setUp() throws Exception {
     pushResultProcessingMock = createNiceMock(PushResultProcessing.class);
     replay(pushResultProcessingMock);
-    eventsStorage = createNiceMock(EventsStorage.class);
-    replay(eventsStorage);
-    replicationState = new ReplicationState(eventsStorage, pushResultProcessingMock);
+    replicationState = new ReplicationState(pushResultProcessingMock);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtil.java b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtil.java
new file mode 100644
index 0000000..586b56c
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtil.java
@@ -0,0 +1,34 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import com.google.common.base.Stopwatch;
+import java.time.Duration;
+import java.util.function.Supplier;
+
+public class WaitUtil {
+  public static void waitUntil(Supplier<Boolean> waitCondition, Duration timeout)
+      throws InterruptedException {
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    while (!waitCondition.get()) {
+      if (stopwatch.elapsed().compareTo(timeout) > 0) {
+        throw new InterruptedException();
+      }
+      MILLISECONDS.sleep(50);
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtilTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtilTest.java
new file mode 100644
index 0000000..0ccb0af
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtilTest.java
@@ -0,0 +1,40 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static com.googlesource.gerrit.plugins.replication.WaitUtil.waitUntil;
+
+import java.time.Duration;
+import org.junit.Test;
+
+public class WaitUtilTest {
+
+  @Test
+  public void shouldFailWhenConditionNotMetWithinTimeout() throws Exception {
+    assertThrows(
+        InterruptedException.class,
+        () -> waitUntil(() -> returnTrue() == false, Duration.ofSeconds(1)));
+  }
+
+  @Test
+  public void shouldNotFailWhenConditionIsMetWithinTimeout() throws Exception {
+    waitUntil(() -> returnTrue() == true, Duration.ofSeconds(1));
+  }
+
+  private static boolean returnTrue() {
+    return true;
+  }
+}