Merge branch 'stable-2.15' into stable-2.16

* stable-2.15:
  Fix issue with dropping events on start

Change-Id: I8d1b52bcf8f8c288892492b4375178756c784ec3
diff --git a/BUILD b/BUILD
index 0a54c3e..50615d8 100644
--- a/BUILD
+++ b/BUILD
@@ -21,7 +21,10 @@
 
 junit_tests(
     name = "replication_tests",
-    srcs = glob(["src/test/java/**/*Test.java"]),
+    srcs = glob([
+        "src/test/java/**/*Test.java",
+        "src/test/java/**/*IT.java",
+    ]),
     tags = ["replication"],
     visibility = ["//visibility:public"],
     deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
@@ -32,7 +35,7 @@
 
 java_library(
     name = "replication_util",
-    testonly = 1,
+    testonly = True,
     srcs = glob(
         ["src/test/java/**/*.java"],
         exclude = ["src/test/java/**/*Test.java"],
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
new file mode 100644
index 0000000..acbf763
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
@@ -0,0 +1,25 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.reviewdb.client.Project;
+
+public interface AdminApi {
+  public boolean createProject(Project.NameKey project, String head);
+
+  public boolean deleteProject(Project.NameKey project);
+
+  public boolean updateHead(Project.NameKey project, String newHead);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApiFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApiFactory.java
new file mode 100644
index 0000000..de6e91e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApiFactory.java
@@ -0,0 +1,73 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.Optional;
+import org.eclipse.jgit.transport.URIish;
+
+/** Factory for creating an {@link AdminApi} instance for a remote URI. */
+public interface AdminApiFactory {
+  /**
+   * Create an {@link AdminApi} for the given remote URI.
+   *
+   * @param uri the remote URI.
+   * @return An API for the given remote URI, or {@code Optional.empty} if there is no appropriate
+   *     API for the URI.
+   */
+  Optional<AdminApi> create(URIish uri);
+
+  @Singleton
+  static class DefaultAdminApiFactory implements AdminApiFactory {
+    protected final SshHelper sshHelper;
+
+    @Inject
+    public DefaultAdminApiFactory(SshHelper sshHelper) {
+      this.sshHelper = sshHelper;
+    }
+
+    @Override
+    public Optional<AdminApi> create(URIish uri) {
+      if (isGerrit(uri)) {
+        return Optional.of(new GerritSshApi(sshHelper, uri));
+      } else if (!uri.isRemote()) {
+        return Optional.of(new LocalFS(uri));
+      } else if (isSSH(uri)) {
+        return Optional.of(new RemoteSsh(sshHelper, uri));
+      }
+      return Optional.empty();
+    }
+  }
+
+  static boolean isGerrit(URIish uri) {
+    String scheme = uri.getScheme();
+    return scheme != null && scheme.toLowerCase().equals("gerrit+ssh");
+  }
+
+  static boolean isSSH(URIish uri) {
+    if (!uri.isRemote()) {
+      return false;
+    }
+    String scheme = uri.getScheme();
+    if (scheme != null && scheme.toLowerCase().contains("ssh")) {
+      return true;
+    }
+    if (scheme == null && uri.getHost() != null && uri.getPath() != null) {
+      return true;
+    }
+    return false;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 5d6e409..02daa6d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -13,36 +13,50 @@
 // limitations under the License.
 package com.googlesource.gerrit.plugins.replication;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.common.FileUtil;
+import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.google.inject.Singleton;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.List;
 import org.eclipse.jgit.errors.ConfigInvalidException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Singleton
 public class AutoReloadConfigDecorator implements ReplicationConfig {
-  private static final Logger log = LoggerFactory.getLogger(AutoReloadConfigDecorator.class);
-  private ReplicationFileBasedConfig currentConfig;
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private volatile ReplicationFileBasedConfig currentConfig;
   private long currentConfigTs;
+  private long lastFailedConfigTs;
 
   private final SitePaths site;
-  private final WorkQueue workQueue;
-  private final DestinationFactory destinationFactory;
+  private final Destination.Factory destinationFactory;
+  private final Path pluginDataDir;
+  // Use Provider<> instead of injecting the ReplicationQueue because of circular dependency with
+  // ReplicationConfig
+  private final Provider<ReplicationQueue> replicationQueue;
+
+  private volatile boolean shuttingDown;
 
   @Inject
   public AutoReloadConfigDecorator(
-      SitePaths site, WorkQueue workQueue, DestinationFactory destinationFactory)
+      SitePaths site,
+      Destination.Factory destinationFactory,
+      Provider<ReplicationQueue> replicationQueue,
+      @PluginData Path pluginDataDir)
       throws ConfigInvalidException, IOException {
     this.site = site;
     this.destinationFactory = destinationFactory;
+    this.pluginDataDir = pluginDataDir;
     this.currentConfig = loadConfig();
     this.currentConfigTs = getLastModified(currentConfig);
-    this.workQueue = workQueue;
+    this.replicationQueue = replicationQueue;
   }
 
   private static long getLastModified(ReplicationFileBasedConfig cfg) {
@@ -50,7 +64,7 @@
   }
 
   private ReplicationFileBasedConfig loadConfig() throws ConfigInvalidException, IOException {
-    return new ReplicationFileBasedConfig(site, destinationFactory);
+    return new ReplicationFileBasedConfig(site, destinationFactory, pluginDataDir);
   }
 
   private synchronized boolean isAutoReload() {
@@ -64,25 +78,42 @@
   }
 
   private void reloadIfNeeded() {
-    try {
-      if (isAutoReload()) {
-        long lastModified = getLastModified(currentConfig);
-        if (lastModified > currentConfigTs) {
-          ReplicationFileBasedConfig newConfig = loadConfig();
-          newConfig.startup(workQueue);
-          int discarded = currentConfig.shutdown();
+    reload(false);
+  }
 
-          this.currentConfig = newConfig;
-          this.currentConfigTs = lastModified;
-          log.info(
-              "Configuration reloaded: {} destinations, {} replication events discarded",
-              currentConfig.getDestinations(FilterType.ALL).size(),
-              discarded);
+  @VisibleForTesting
+  public void forceReload() {
+    reload(true);
+  }
+
+  private void reload(boolean force) {
+    if (force || isAutoReload()) {
+      ReplicationQueue queue = replicationQueue.get();
+
+      long lastModified = getLastModified(currentConfig);
+      try {
+        if (force
+            || (!shuttingDown
+                && lastModified > currentConfigTs
+                && lastModified > lastFailedConfigTs
+                && queue.isRunning()
+                && !queue.isReplaying())) {
+          queue.stop();
+          currentConfig = loadConfig();
+          currentConfigTs = lastModified;
+          lastFailedConfigTs = 0;
+          logger.atInfo().log(
+              "Configuration reloaded: %d destinations",
+              currentConfig.getDestinations(FilterType.ALL).size());
         }
+      } catch (Exception e) {
+        logger.atSevere().withCause(e).log(
+            "Cannot reload replication configuration: keeping existing settings");
+        lastFailedConfigTs = lastModified;
+        return;
+      } finally {
+        queue.start();
       }
-    } catch (Exception e) {
-      log.error("Cannot reload replication configuration: keeping existing settings", e);
-      return;
     }
   }
 
@@ -102,12 +133,32 @@
   }
 
   @Override
-  public synchronized int shutdown() {
+  public Path getEventsDirectory() {
+    return currentConfig.getEventsDirectory();
+  }
+
+  /* shutdown() cannot be set as a synchronized method because
+   * it may need to wait for pending events to complete;
+   * e.g. when enabling the drain of replication events before
+   * shutdown.
+   *
+   * As a rule of thumb for synchronized methods, because they
+   * implicitly define a critical section and associated lock,
+   * they should never hold waiting for another resource, otherwise
+   * the risk of deadlock is very high.
+   *
+   * See more background about deadlocks, what they are and how to
+   * prevent them at: https://en.wikipedia.org/wiki/Deadlock
+   */
+  @Override
+  public int shutdown() {
+    this.shuttingDown = true;
     return currentConfig.shutdown();
   }
 
   @Override
   public synchronized void startup(WorkQueue workQueue) {
+    shuttingDown = false;
     currentConfig.startup(workQueue);
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
index f8737b6..29a7ee6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
@@ -16,18 +16,16 @@
 
 import static com.google.gerrit.common.FileUtil.lastModified;
 
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.concurrent.atomic.AtomicReference;
 import org.eclipse.jgit.errors.ConfigInvalidException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class AutoReloadSecureCredentialsFactoryDecorator implements CredentialsFactory {
-  private static final Logger log =
-      LoggerFactory.getLogger(AutoReloadSecureCredentialsFactoryDecorator.class);
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   private final AtomicReference<SecureCredentialsFactory> secureCredentialsFactory;
   private volatile long secureCredentialsFactoryLoadTs;
@@ -58,13 +56,12 @@
         secureCredentialsFactory.compareAndSet(
             secureCredentialsFactory.get(), new SecureCredentialsFactory(site));
         secureCredentialsFactoryLoadTs = getSecureConfigLastEditTs();
-        log.info("secure.config reloaded as it was updated on the file system");
+        logger.atInfo().log("secure.config reloaded as it was updated on the file system");
       }
     } catch (Exception e) {
-      log.error(
+      logger.atSevere().withCause(e).log(
           "Unexpected error while trying to reload "
-              + "secure.config: keeping existing credentials",
-          e);
+              + "secure.config: keeping existing credentials");
     }
 
     return secureCredentialsFactory.get().create(remoteName);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 0a06093..36960a1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -24,9 +24,7 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Lists;
-import com.google.gerrit.common.EventDispatcher;
 import com.google.gerrit.common.data.GroupReference;
-import com.google.gerrit.extensions.client.ProjectState;
 import com.google.gerrit.extensions.config.FactoryModule;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
@@ -42,6 +40,7 @@
 import com.google.gerrit.server.account.GroupIncludeCache;
 import com.google.gerrit.server.account.ListGroupMembership;
 import com.google.gerrit.server.config.RequestScopedReviewDbProvider;
+import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.PerThreadRequestScope;
 import com.google.gerrit.server.git.WorkQueue;
@@ -50,15 +49,18 @@
 import com.google.gerrit.server.permissions.ProjectPermission;
 import com.google.gerrit.server.permissions.RefPermission;
 import com.google.gerrit.server.project.NoSuchProjectException;
-import com.google.gerrit.server.project.PerRequestProjectControlCache;
-import com.google.gerrit.server.project.ProjectControl;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gerrit.server.project.ProjectState;
 import com.google.gerrit.server.util.RequestContext;
+import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.Provider;
 import com.google.inject.Provides;
+import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.servlet.RequestScoped;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
@@ -83,18 +85,25 @@
 
 public class Destination {
   private static final Logger repLog = ReplicationQueue.repLog;
+
+  public interface Factory {
+    Destination create(DestinationConfiguration config);
+  }
+
   private final ReplicationStateListener stateLog;
   private final Object stateLock = new Object();
   private final Map<URIish, PushOne> pending = new HashMap<>();
   private final Map<URIish, PushOne> inFlight = new HashMap<>();
   private final PushOne.Factory opFactory;
-  private final ProjectControl.Factory projectControlFactory;
   private final GitRepositoryManager gitManager;
   private final PermissionBackend permissionBackend;
+  private final Provider<CurrentUser> userProvider;
+  private final ProjectCache projectCache;
   private volatile ScheduledExecutorService pool;
   private final PerThreadRequestScope.Scoper threadScoper;
   private final DestinationConfiguration config;
   private final DynamicItem<EventDispatcher> eventDispatcher;
+  private final Provider<ReplicationTasksStorage> replicationTasksStorage;
 
   protected enum RetryReason {
     TRANSPORT_ERROR,
@@ -112,23 +121,28 @@
     }
   }
 
+  @Inject
   protected Destination(
       Injector injector,
-      DestinationConfiguration cfg,
-      RemoteSiteUser.Factory replicationUserFactory,
       PluginUser pluginUser,
       GitRepositoryManager gitRepositoryManager,
       PermissionBackend permissionBackend,
+      Provider<CurrentUser> userProvider,
+      ProjectCache projectCache,
       GroupBackend groupBackend,
-      ReplicationStateListener stateLog,
+      ReplicationStateListeners stateLog,
       GroupIncludeCache groupIncludeCache,
-      DynamicItem<EventDispatcher> eventDispatcher) {
-    config = cfg;
+      DynamicItem<EventDispatcher> eventDispatcher,
+      Provider<ReplicationTasksStorage> rts,
+      @Assisted DestinationConfiguration cfg) {
     this.eventDispatcher = eventDispatcher;
     gitManager = gitRepositoryManager;
     this.permissionBackend = permissionBackend;
+    this.userProvider = userProvider;
+    this.projectCache = projectCache;
     this.stateLog = stateLog;
-
+    this.replicationTasksStorage = rts;
+    config = cfg;
     CurrentUser remoteUser;
     if (!cfg.getAuthGroupNames().isEmpty()) {
       ImmutableSet.Builder<AccountGroup.UUID> builder = ImmutableSet.builder();
@@ -141,7 +155,7 @@
           repLog.warn("Group \"{}\" not recognized, removing from authGroup", name);
         }
       }
-      remoteUser = replicationUserFactory.create(new ListGroupMembership(builder.build()));
+      remoteUser = new RemoteSiteUser(new ListGroupMembership(builder.build()));
     } else {
       remoteUser = pluginUser;
     }
@@ -153,7 +167,6 @@
               protected void configure() {
                 bindScope(RequestScoped.class, PerThreadRequestScope.REQUEST);
                 bind(PerThreadRequestScope.Propagator.class);
-                bind(PerRequestProjectControlCache.class).in(RequestScoped.class);
 
                 bind(Destination.class).toInstance(Destination.this);
                 bind(RemoteConfig.class).toInstance(config.getRemoteConfig());
@@ -185,7 +198,6 @@
               }
             });
 
-    projectControlFactory = child.getInstance(ProjectControl.Factory.class);
     opFactory = child.getInstance(PushOne.Factory.class);
     threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class);
   }
@@ -245,15 +257,21 @@
     }
   }
 
-  private boolean shouldReplicate(ProjectControl ctl) throws PermissionBackendException {
-    if (!config.replicateHiddenProjects() && ctl.getProject().getState() == ProjectState.HIDDEN) {
+  private boolean shouldReplicate(ProjectState state, CurrentUser user)
+      throws PermissionBackendException {
+    if (!config.replicateHiddenProjects()
+        && state.getProject().getState()
+            == com.google.gerrit.extensions.client.ProjectState.HIDDEN) {
       return false;
     }
+
+    // Hidden projects(permitsRead = false) should only be accessible by the project owners.
+    // READ_CONFIG is checked here because it's only allowed to project owners(ACCESS may also
+    // be allowed for other users).
+    ProjectPermission permissionToCheck =
+        state.statePermitsRead() ? ProjectPermission.ACCESS : ProjectPermission.READ_CONFIG;
     try {
-      permissionBackend
-          .user(ctl.getUser())
-          .project(ctl.getProject().getNameKey())
-          .check(ProjectPermission.ACCESS);
+      permissionBackend.user(user).project(state.getNameKey()).check(permissionToCheck);
       return true;
     } catch (AuthException e) {
       return false;
@@ -268,8 +286,19 @@
               new Callable<Boolean>() {
                 @Override
                 public Boolean call() throws NoSuchProjectException, PermissionBackendException {
-                  ProjectControl projectControl = controlFor(project);
-                  if (!shouldReplicate(projectControl)) {
+                  ProjectState projectState;
+                  try {
+                    projectState = projectCache.checkedGet(project);
+                  } catch (IOException e) {
+                    return false;
+                  }
+                  if (projectState == null) {
+                    throw new NoSuchProjectException(project);
+                  }
+                  if (!projectState.statePermitsRead()) {
+                    return false;
+                  }
+                  if (!shouldReplicate(projectState, userProvider.get())) {
                     return false;
                   }
                   if (PushOne.ALL_REFS.equals(ref)) {
@@ -277,7 +306,7 @@
                   }
                   try {
                     permissionBackend
-                        .user(projectControl.getUser())
+                        .user(userProvider.get())
                         .project(project)
                         .ref(ref)
                         .check(RefPermission.READ);
@@ -304,7 +333,16 @@
               new Callable<Boolean>() {
                 @Override
                 public Boolean call() throws NoSuchProjectException, PermissionBackendException {
-                  return shouldReplicate(controlFor(project));
+                  ProjectState projectState;
+                  try {
+                    projectState = projectCache.checkedGet(project);
+                  } catch (IOException e) {
+                    return false;
+                  }
+                  if (projectState == null) {
+                    throw new NoSuchProjectException(project);
+                  }
+                  return shouldReplicate(projectState, userProvider.get());
                 }
               })
           .call();
@@ -354,21 +392,21 @@
     }
 
     synchronized (stateLock) {
-      PushOne e = getPendingPush(uri);
-      if (e == null) {
-        e = opFactory.create(project, uri);
-        addRef(e, ref);
-        e.addState(ref, state);
+      PushOne task = getPendingPush(uri);
+      if (task == null) {
+        task = opFactory.create(project, uri);
+        addRef(task, ref);
+        task.addState(ref, state);
         @SuppressWarnings("unused")
         ScheduledFuture<?> ignored =
-            pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
-        pending.put(uri, e);
-      } else if (!e.getRefs().contains(ref)) {
-        addRef(e, ref);
-        e.addState(ref, state);
+            pool.schedule(task, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
+        pending.put(uri, task);
+      } else if (!task.getRefs().contains(ref)) {
+        addRef(task, ref);
+        task.addState(ref, state);
       }
       state.increasePushTaskCount(project.get(), ref);
-      repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, e, config.getDelay());
+      repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, task, config.getDelay());
     }
   }
 
@@ -492,10 +530,6 @@
     }
   }
 
-  ProjectControl controlFor(Project.NameKey project) throws NoSuchProjectException {
-    return projectControlFactory.controlFor(project);
-  }
-
   RunwayStatus requestRunway(PushOne op) {
     synchronized (stateLock) {
       if (op.wasCanceled()) {
@@ -511,12 +545,31 @@
     return RunwayStatus.allowed();
   }
 
-  void notifyFinished(PushOne op) {
+  void notifyFinished(PushOne task) {
     synchronized (stateLock) {
-      inFlight.remove(op.getURI());
+      inFlight.remove(task.getURI());
+      if (!task.wasCanceled()) {
+        for (String ref : task.getRefs()) {
+          if (!refHasPendingPush(task.getURI(), ref)) {
+            replicationTasksStorage
+                .get()
+                .delete(
+                    new ReplicateRefUpdate(
+                        task.getProjectNameKey().get(), ref, task.getURI(), getRemoteConfigName()));
+          }
+        }
+      }
     }
   }
 
+  private boolean refHasPendingPush(URIish opUri, String ref) {
+    return pushContainsRef(pending.get(opUri), ref) || pushContainsRef(inFlight.get(opUri), ref);
+  }
+
+  private boolean pushContainsRef(PushOne op, String ref) {
+    return op != null && op.getRefs().contains(ref);
+  }
+
   boolean wouldPushProject(Project.NameKey project) {
     if (!shouldReplicate(project)) {
       return false;
@@ -649,6 +702,14 @@
     return config.getMaxRetries();
   }
 
+  public int getDrainQueueAttempts() {
+    return config.getDrainQueueAttempts();
+  }
+
+  public int getReplicationDelaySeconds() {
+    return config.getDelay() * 1000;
+  }
+
   private static boolean matches(URIish uri, String urlMatch) {
     if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
       return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index b2d0de2..f688cfc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -22,10 +22,12 @@
 public class DestinationConfiguration {
   static final int DEFAULT_REPLICATION_DELAY = 15;
   static final int DEFAULT_RESCHEDULE_DELAY = 3;
+  static final int DEFAULT_DRAIN_QUEUE_ATTEMPTS = 0;
 
   private final int delay;
   private final int rescheduleDelay;
   private final int retryDelay;
+  private final int drainQueueAttempts;
   private final int lockErrorMaxRetries;
   private final ImmutableList<String> adminUrls;
   private final int poolThreads;
@@ -50,6 +52,8 @@
     projects = ImmutableList.copyOf(cfg.getStringList("remote", name, "projects"));
     adminUrls = ImmutableList.copyOf(cfg.getStringList("remote", name, "adminUrl"));
     retryDelay = Math.max(0, getInt(remoteConfig, cfg, "replicationretry", 1));
+    drainQueueAttempts =
+        Math.max(0, getInt(remoteConfig, cfg, "drainQueueAttempts", DEFAULT_DRAIN_QUEUE_ATTEMPTS));
     poolThreads = Math.max(0, getInt(remoteConfig, cfg, "threads", 1));
     authGroupNames = ImmutableList.copyOf(cfg.getStringList("remote", name, "authGroup"));
     lockErrorMaxRetries = cfg.getInt("replication", "lockErrorMaxRetries", 0);
@@ -77,6 +81,10 @@
     return retryDelay;
   }
 
+  public int getDrainQueueAttempts() {
+    return drainQueueAttempts;
+  }
+
   public int getPoolThreads() {
     return poolThreads;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java
deleted file mode 100644
index 83eab86..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import com.google.gerrit.common.EventDispatcher;
-import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.server.PluginUser;
-import com.google.gerrit.server.account.GroupBackend;
-import com.google.gerrit.server.account.GroupIncludeCache;
-import com.google.gerrit.server.git.GitRepositoryManager;
-import com.google.gerrit.server.permissions.PermissionBackend;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Singleton;
-
-@Singleton
-public class DestinationFactory {
-  private final Injector injector;
-  private final RemoteSiteUser.Factory replicationUserFactory;
-  private final PluginUser pluginUser;
-  private final GitRepositoryManager gitRepositoryManager;
-  private final PermissionBackend permissionBackend;
-  private final GroupBackend groupBackend;
-  private final ReplicationStateListener stateLog;
-  private final GroupIncludeCache groupIncludeCache;
-  private final DynamicItem<EventDispatcher> eventDispatcher;
-
-  @Inject
-  public DestinationFactory(
-      Injector injector,
-      RemoteSiteUser.Factory replicationUserFactory,
-      PluginUser pluginUser,
-      GitRepositoryManager gitRepositoryManager,
-      PermissionBackend permissionBackend,
-      GroupBackend groupBackend,
-      ReplicationStateListener stateLog,
-      GroupIncludeCache groupIncludeCache,
-      DynamicItem<EventDispatcher> eventDispatcher) {
-    this.injector = injector;
-    this.replicationUserFactory = replicationUserFactory;
-    this.pluginUser = pluginUser;
-    this.gitRepositoryManager = gitRepositoryManager;
-    this.permissionBackend = permissionBackend;
-    this.groupBackend = groupBackend;
-    this.stateLog = stateLog;
-    this.groupIncludeCache = groupIncludeCache;
-    this.eventDispatcher = eventDispatcher;
-  }
-
-  Destination create(DestinationConfiguration config) {
-    return new Destination(
-        injector,
-        config,
-        replicationUserFactory,
-        pluginUser,
-        gitRepositoryManager,
-        permissionBackend,
-        groupBackend,
-        stateLog,
-        groupIncludeCache,
-        eventDispatcher);
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
index b46a0d9..46c8892 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
@@ -14,33 +14,34 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.ssh.SshAddressesModule;
-import com.google.inject.Inject;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URISyntaxException;
 import java.util.HashSet;
 import java.util.Set;
 import org.eclipse.jgit.transport.URIish;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class GerritSshApi {
+public class GerritSshApi implements AdminApi {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
   static int SSH_COMMAND_FAILED = -1;
-  private static final Logger log = LoggerFactory.getLogger(GerritSshApi.class);
   private static String GERRIT_ADMIN_PROTOCOL_PREFIX = "gerrit+";
 
-  private final SshHelper sshHelper;
+  protected final SshHelper sshHelper;
+  protected final URIish uri;
 
   private final Set<URIish> withoutDeleteProjectPlugin = new HashSet<>();
 
-  @Inject
-  protected GerritSshApi(SshHelper sshHelper) {
+  protected GerritSshApi(SshHelper sshHelper, URIish uri) {
     this.sshHelper = sshHelper;
+    this.uri = uri;
   }
 
-  protected boolean createProject(URIish uri, Project.NameKey projectName, String head) {
+  @Override
+  public boolean createProject(Project.NameKey projectName, String head) {
     OutputStream errStream = sshHelper.newErrorBufferStream();
     String cmd = "gerrit create-project --branch " + head + " " + projectName.get();
     try {
@@ -52,7 +53,8 @@
     return true;
   }
 
-  protected boolean deleteProject(URIish uri, Project.NameKey projectName) {
+  @Override
+  public boolean deleteProject(Project.NameKey projectName) {
     if (!withoutDeleteProjectPlugin.contains(uri)) {
       OutputStream errStream = sshHelper.newErrorBufferStream();
       String cmd = "deleteproject delete --yes-really-delete --force " + projectName.get();
@@ -64,30 +66,23 @@
         return false;
       }
       if (exitCode == 1) {
-        log.info(
-            "DeleteProject plugin is not installed on {}; will not try to forward this operation to that host");
+        logger.atInfo().log(
+            "DeleteProject plugin is not installed on %s;"
+                + " will not try to forward this operation to that host");
         withoutDeleteProjectPlugin.add(uri);
-        return true;
       }
     }
     return true;
   }
 
-  protected boolean updateHead(URIish uri, Project.NameKey projectName, String newHead) {
+  @Override
+  public boolean updateHead(Project.NameKey projectName, String newHead) {
     OutputStream errStream = sshHelper.newErrorBufferStream();
     String cmd = "gerrit set-head " + projectName.get() + " --new-head " + newHead;
     try {
       execute(uri, cmd, errStream);
     } catch (IOException e) {
-      log.error(
-          "Error updating HEAD of remote repository at {} to {}:\n"
-              + "  Exception: {}\n  Command: {}\n  Output: {}",
-          uri,
-          newHead,
-          e,
-          cmd,
-          errStream,
-          e);
+      logError("updating HEAD of", uri, errStream, cmd, e);
       return false;
     }
     return true;
@@ -114,19 +109,14 @@
       URIish sshUri = toSshUri(uri);
       return sshHelper.executeRemoteSsh(sshUri, cmd, errStream);
     } catch (URISyntaxException e) {
-      log.error("Cannot convert {} to SSH uri", uri, e);
+      logger.atSevere().withCause(e).log("Cannot convert %s to SSH uri", uri);
     }
     return SSH_COMMAND_FAILED;
   }
 
   public void logError(String msg, URIish uri, OutputStream errStream, String cmd, IOException e) {
-    log.error(
-        "Error {} remote repository at {}:\n  Exception: {}\n  Command: {}\n  Output: {}",
-        msg,
-        uri,
-        e,
-        cmd,
-        errStream,
-        e);
+    logger.atSevere().withCause(e).log(
+        "Error %s remote repository at %s:\n  Exception: %s\n  Command: %s\n  Output: %s",
+        msg, uri, e, cmd, errStream);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
new file mode 100644
index 0000000..aa6e16c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -0,0 +1,97 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
+import com.google.gerrit.reviewdb.client.Project;
+import java.io.File;
+import java.io.IOException;
+import org.eclipse.jgit.internal.storage.file.FileRepository;
+import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.URIish;
+
+public class LocalFS implements AdminApi {
+
+  private final URIish uri;
+
+  public LocalFS(URIish uri) {
+    this.uri = uri;
+  }
+
+  @Override
+  public boolean createProject(Project.NameKey project, String head) {
+    try (Repository repo = new FileRepository(uri.getPath())) {
+      repo.create(true /* bare */);
+
+      if (head != null && head.startsWith(Constants.R_REFS)) {
+        RefUpdate u = repo.updateRef(Constants.HEAD);
+        u.disableRefLog();
+        u.link(head);
+      }
+      repLog.info("Created local repository: {}", uri);
+    } catch (IOException e) {
+      repLog.error("Error creating local repository {}", uri.getPath(), e);
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public boolean deleteProject(Project.NameKey project) {
+    try {
+      recursivelyDelete(new File(uri.getPath()));
+      repLog.info("Deleted local repository: {}", uri);
+    } catch (IOException e) {
+      repLog.error("Error deleting local repository {}:\n", uri.getPath(), e);
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public boolean updateHead(Project.NameKey project, String newHead) {
+    try (Repository repo = new FileRepository(uri.getPath())) {
+      if (newHead != null) {
+        RefUpdate u = repo.updateRef(Constants.HEAD);
+        u.link(newHead);
+      }
+    } catch (IOException e) {
+      repLog.error("Failed to update HEAD of repository {} to {}", uri.getPath(), newHead, e);
+      return false;
+    }
+    return true;
+  }
+
+  private static void recursivelyDelete(File dir) throws IOException {
+    File[] contents = dir.listFiles();
+    if (contents != null) {
+      for (File d : contents) {
+        if (d.isDirectory()) {
+          recursivelyDelete(d);
+        } else {
+          if (!d.delete()) {
+            throw new IOException("Failed to delete: " + d.getAbsolutePath());
+          }
+        }
+      }
+    }
+    if (!dir.delete()) {
+      throw new IOException("Failed to delete: " + dir.getAbsolutePath());
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
index 227804d..8b0aa3d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
@@ -15,10 +15,10 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.common.util.concurrent.Atomics;
-import com.google.gerrit.common.EventDispatcher;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.events.EventDispatcher;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
 import java.util.concurrent.Future;
@@ -29,7 +29,6 @@
   private final AtomicReference<Future<?>> pushAllFuture;
   private final ServerInformation srvInfo;
   private final PushAll.Factory pushAll;
-  private final ReplicationQueue queue;
   private final ReplicationConfig config;
   private final DynamicItem<EventDispatcher> eventDispatcher;
 
@@ -37,12 +36,10 @@
   protected OnStartStop(
       ServerInformation srvInfo,
       PushAll.Factory pushAll,
-      ReplicationQueue queue,
       ReplicationConfig config,
       DynamicItem<EventDispatcher> eventDispatcher) {
     this.srvInfo = srvInfo;
     this.pushAll = pushAll;
-    this.queue = queue;
     this.config = config;
     this.eventDispatcher = eventDispatcher;
     this.pushAllFuture = Atomics.newReference();
@@ -50,8 +47,6 @@
 
   @Override
   public void start() {
-    queue.start();
-
     if (srvInfo.getState() == ServerInformation.State.STARTUP
         && config.isReplicateAllOnPluginStart()) {
       ReplicationState state = new ReplicationState(new GitUpdateProcessing(eventDispatcher.get()));
@@ -68,6 +63,5 @@
     if (f != null) {
       f.cancel(true);
     }
-    queue.stop();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
index db067e2..833b02b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
@@ -43,7 +43,7 @@
       WorkQueue wq,
       ProjectCache projectCache,
       ReplicationQueue rq,
-      ReplicationStateListener stateLog,
+      ReplicationStateListeners stateLog,
       @Assisted @Nullable String urlMatch,
       @Assisted ReplicationFilter filter,
       @Assisted ReplicationState state,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 5f0c066..5794f6e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -22,20 +22,23 @@
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Sets;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.extensions.restapi.ResourceConflictException;
 import com.google.gerrit.metrics.Timer1;
 import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.reviewdb.client.RefNames;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.PerThreadRequestScope;
 import com.google.gerrit.server.git.ProjectRunnable;
-import com.google.gerrit.server.git.VisibleRefFilter;
 import com.google.gerrit.server.git.WorkQueue.CanceledWhileRunning;
+import com.google.gerrit.server.ioutil.HexFormat;
 import com.google.gerrit.server.permissions.PermissionBackend;
+import com.google.gerrit.server.permissions.PermissionBackend.RefFilterOptions;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gerrit.server.permissions.ProjectPermission;
-import com.google.gerrit.server.project.NoSuchProjectException;
-import com.google.gerrit.server.project.ProjectControl;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gerrit.server.project.ProjectState;
 import com.google.gerrit.server.util.IdGenerator;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
@@ -92,7 +95,6 @@
   private final RemoteConfig config;
   private final CredentialsProvider credentialsProvider;
   private final PerThreadRequestScope.Scoper threadScoper;
-  private final VisibleRefFilter.Factory refFilterFactory;
   private final ReplicationQueue replicationQueue;
 
   private final Project.NameKey projectName;
@@ -110,7 +112,10 @@
   private final int id;
   private final long createdAt;
   private final ReplicationMetrics metrics;
+  private final ProjectCache projectCache;
   private final AtomicBoolean canceledWhileRunning;
+  private final TransportFactory transportFactory;
+  private DynamicItem<ReplicationPushFilter> replicationPushFilter;
 
   @Inject
   PushOne(
@@ -118,20 +123,20 @@
       PermissionBackend permissionBackend,
       Destination p,
       RemoteConfig c,
-      VisibleRefFilter.Factory rff,
       CredentialsFactory cpFactory,
       PerThreadRequestScope.Scoper ts,
       ReplicationQueue rq,
       IdGenerator ig,
-      ReplicationStateListener sl,
+      ReplicationStateListeners sl,
       ReplicationMetrics m,
+      ProjectCache pc,
+      TransportFactory tf,
       @Assisted Project.NameKey d,
       @Assisted URIish u) {
     gitManager = grm;
     this.permissionBackend = permissionBackend;
     pool = p;
     config = c;
-    refFilterFactory = rff;
     credentialsProvider = cpFactory.create(c.getName());
     threadScoper = ts;
     replicationQueue = rq;
@@ -143,13 +148,20 @@
     stateLog = sl;
     createdAt = System.nanoTime();
     metrics = m;
+    projectCache = pc;
     canceledWhileRunning = new AtomicBoolean(false);
     maxRetries = p.getMaxRetries();
+    transportFactory = tf;
+  }
+
+  @Inject(optional = true)
+  public void setReplicationPushFilter(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
+    this.replicationPushFilter = replicationPushFilter;
   }
 
   @Override
   public void cancel() {
-    repLog.info("Replication [{}] to {} was canceled", IdGenerator.format(id), getURI());
+    repLog.info("Replication [{}] to {} was canceled", HexFormat.fromInt(id), getURI());
     canceledByReplication();
     pool.pushWasCanceled(this);
   }
@@ -158,7 +170,7 @@
   public void setCanceledWhileRunning() {
     repLog.info(
         "Replication [{}] to {} was canceled while being executed",
-        IdGenerator.format(id),
+        HexFormat.fromInt(id),
         getURI());
     canceledWhileRunning.set(true);
   }
@@ -180,7 +192,7 @@
 
   @Override
   public String toString() {
-    String print = "[" + IdGenerator.format(id) + "] push " + uri;
+    String print = "[" + HexFormat.fromInt(id) + "] push " + uri;
 
     if (retryCount > 0) {
       print = "(retry " + retryCount + ") " + print;
@@ -301,7 +313,7 @@
     // we start replication (instead a new instance, with the same URI, is
     // created and scheduled for a future point in time.)
     //
-    MDC.put(ID_MDC_KEY, IdGenerator.format(id));
+    MDC.put(ID_MDC_KEY, HexFormat.fromInt(id));
     RunwayStatus status = pool.requestRunway(this);
     if (!status.isAllowed()) {
       if (status.isCanceled()) {
@@ -310,7 +322,7 @@
         repLog.info(
             "Rescheduling replication to {} to avoid collision with the in-flight push [{}].",
             uri,
-            IdGenerator.format(status.getInFlightPushId()));
+            HexFormat.fromInt(status.getInFlightPushId()));
         pool.reschedule(this, Destination.RetryReason.COLLISION);
       }
       return;
@@ -407,15 +419,12 @@
     if (pool.isCreateMissingRepos()) {
       try {
         Ref head = git.exactRef(Constants.HEAD);
-        if (replicationQueue.createProject(projectName, head != null ? getName(head) : null)) {
+        if (replicationQueue.createProject(
+            config.getName(), projectName, head != null ? getName(head) : null)) {
           repLog.warn("Missing repository created; retry replication to {}", uri);
           pool.reschedule(this, Destination.RetryReason.REPOSITORY_MISSING);
         } else {
-          repLog.warn(
-              "Missing repository could not be created when replicating {}. "
-                  + "You can only create missing repositories locally, over SSH or when "
-                  + "using adminUrl in replication.config. See documentation for more information.",
-              uri);
+          repLog.warn("Missing repository could not be created when replicating {}", uri);
         }
       } catch (IOException ioe) {
         stateLog.error(
@@ -438,7 +447,7 @@
 
   private void runImpl() throws IOException, PermissionBackendException {
     PushResult res;
-    try (Transport tn = Transport.open(git, uri)) {
+    try (Transport tn = transportFactory.open(git, uri)) {
       res = pushVia(tn);
     }
     updateStates(res.getRemoteUpdates());
@@ -465,19 +474,19 @@
 
   private List<RemoteRefUpdate> generateUpdates(Transport tn)
       throws IOException, PermissionBackendException {
-    ProjectControl pc;
-    try {
-      pc = pool.controlFor(projectName);
-    } catch (NoSuchProjectException e) {
+    ProjectState projectState = projectCache.checkedGet(projectName);
+    if (projectState == null) {
       return Collections.emptyList();
     }
 
     Map<String, Ref> local = git.getAllRefs();
     boolean filter;
+    PermissionBackend.ForProject forProject = permissionBackend.currentUser().project(projectName);
     try {
-      permissionBackend.user(pc.getUser()).project(projectName).check(ProjectPermission.READ);
+      projectState.checkStatePermitsRead();
+      forProject.check(ProjectPermission.READ);
       filter = false;
-    } catch (AuthException e) {
+    } catch (AuthException | ResourceConflictException e) {
       filter = true;
     }
     if (filter) {
@@ -494,10 +503,15 @@
         }
         local = n;
       }
-      local = refFilterFactory.create(pc.getProjectState(), git).filter(local, true);
+      local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build());
     }
 
-    return pushAllRefs ? doPushAll(tn, local) : doPushDelta(local);
+    List<RemoteRefUpdate> remoteUpdatesList =
+        pushAllRefs ? doPushAll(tn, local) : doPushDelta(local);
+
+    return replicationPushFilter == null || replicationPushFilter.get() == null
+        ? remoteUpdatesList
+        : replicationPushFilter.get().filter(projectName.get(), remoteUpdatesList);
   }
 
   private List<RemoteRefUpdate> doPushAll(Transport tn, Map<String, Ref> local)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
index c71a792..ae0662d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -14,7 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.gerrit.common.EventDispatcher;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.events.RefEvent;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gwtorm.server.OrmException;
@@ -23,8 +24,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.URIish;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public abstract class PushResultProcessing {
 
@@ -159,7 +158,7 @@
   }
 
   public static class GitUpdateProcessing extends PushResultProcessing {
-    private static final Logger log = LoggerFactory.getLogger(GitUpdateProcessing.class);
+    private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
     private final EventDispatcher dispatcher;
 
@@ -189,7 +188,7 @@
       try {
         dispatcher.postEvent(event);
       } catch (OrmException | PermissionBackendException e) {
-        log.error("Cannot post event", e);
+        logger.atSevere().withCause(e).log("Cannot post event");
       }
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java
index 91fce7f..c3556af 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java
@@ -16,18 +16,11 @@
 
 import com.google.gerrit.server.CurrentUser;
 import com.google.gerrit.server.account.GroupMembership;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
 
 public class RemoteSiteUser extends CurrentUser {
-  public interface Factory {
-    RemoteSiteUser create(@Assisted GroupMembership authGroups);
-  }
-
   private final GroupMembership effectiveGroups;
 
-  @Inject
-  RemoteSiteUser(@Assisted GroupMembership authGroups) {
+  public RemoteSiteUser(GroupMembership authGroups) {
     effectiveGroups = authGroups;
   }
 
@@ -35,4 +28,10 @@
   public GroupMembership getEffectiveGroups() {
     return effectiveGroups;
   }
+
+  @Override
+  public Object getCacheKey() {
+    // Never cache a remote user
+    return new Object();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
new file mode 100644
index 0000000..ee9d4c0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -0,0 +1,110 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
+import com.google.gerrit.reviewdb.client.Project;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.eclipse.jgit.transport.URIish;
+import org.eclipse.jgit.util.QuotedString;
+
+public class RemoteSsh implements AdminApi {
+
+  private final SshHelper sshHelper;
+  private URIish uri;
+
+  RemoteSsh(SshHelper sshHelper, URIish uri) {
+    this.sshHelper = sshHelper;
+    this.uri = uri;
+  }
+
+  @Override
+  public boolean createProject(Project.NameKey project, String head) {
+    String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
+    String cmd = "mkdir -p " + quotedPath + " && cd " + quotedPath + " && git init --bare";
+    if (head != null) {
+      cmd = cmd + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(head);
+    }
+    OutputStream errStream = sshHelper.newErrorBufferStream();
+    try {
+      sshHelper.executeRemoteSsh(uri, cmd, errStream);
+      repLog.info("Created remote repository: {}", uri);
+    } catch (IOException e) {
+      repLog.error(
+          "Error creating remote repository at {}:\n"
+              + "  Exception: {}\n"
+              + "  Command: {}\n"
+              + "  Output: {}",
+          uri,
+          e,
+          cmd,
+          errStream,
+          e);
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public boolean deleteProject(Project.NameKey project) {
+    String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
+    String cmd = "rm -rf " + quotedPath;
+    OutputStream errStream = sshHelper.newErrorBufferStream();
+    try {
+      sshHelper.executeRemoteSsh(uri, cmd, errStream);
+      repLog.info("Deleted remote repository: {}", uri);
+    } catch (IOException e) {
+      repLog.error(
+          "Error deleting remote repository at {}:\n"
+              + "  Exception: {}\n"
+              + "  Command: {}\n"
+              + "  Output: {}",
+          uri,
+          e,
+          cmd,
+          errStream,
+          e);
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public boolean updateHead(Project.NameKey project, String newHead) {
+    String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
+    String cmd =
+        "cd " + quotedPath + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(newHead);
+    OutputStream errStream = sshHelper.newErrorBufferStream();
+    try {
+      sshHelper.executeRemoteSsh(uri, cmd, errStream);
+    } catch (IOException e) {
+      repLog.error(
+          "Error updating HEAD of remote repository at {} to {}:\n"
+              + "  Exception: {}\n"
+              + "  Command: {}\n"
+              + "  Output: {}",
+          uri,
+          newHead,
+          e,
+          cmd,
+          errStream,
+          e);
+      return false;
+    }
+    return true;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index 869a49b..c9531e3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -14,6 +14,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.gerrit.server.git.WorkQueue;
+import java.nio.file.Path;
 import java.util.List;
 
 public interface ReplicationConfig {
@@ -32,6 +33,8 @@
 
   boolean isEmpty();
 
+  Path getEventsDirectory();
+
   int shutdown();
 
   void startup(WorkQueue workQueue);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationExtensionPointModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationExtensionPointModule.java
new file mode 100644
index 0000000..b92a54a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationExtensionPointModule.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.inject.AbstractModule;
+
+/**
+ * Gerrit libModule for applying a ref-filter for outgoing replications.
+ *
+ * <p>It should be used only when an actual filter is defined, otherwise the default replication
+ * plugin behaviour will be pushing all refs without any filtering.
+ */
+public class ReplicationExtensionPointModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    DynamicItem.itemOf(binder(), ReplicationPushFilter.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index db9f35d..9004968 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -17,8 +17,11 @@
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.server.config.ConfigUtil;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.git.WorkQueue;
@@ -38,28 +41,31 @@
 import org.eclipse.jgit.transport.RemoteConfig;
 import org.eclipse.jgit.transport.URIish;
 import org.eclipse.jgit.util.FS;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Singleton
 public class ReplicationFileBasedConfig implements ReplicationConfig {
-  private static final Logger log = LoggerFactory.getLogger(ReplicationFileBasedConfig.class);
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final int DEFAULT_SSH_CONNECTION_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes
 
   private List<Destination> destinations;
+  private final SitePaths site;
   private Path cfgPath;
   private boolean replicateAllOnPluginStart;
   private boolean defaultForceUpdate;
   private int sshCommandTimeout;
   private int sshConnectionTimeout = DEFAULT_SSH_CONNECTION_TIMEOUT_MS;
   private final FileBasedConfig config;
+  private final Path pluginDataDir;
 
   @Inject
-  public ReplicationFileBasedConfig(SitePaths site, DestinationFactory destinationFactory)
+  public ReplicationFileBasedConfig(
+      SitePaths site, Destination.Factory destinationFactory, @PluginData Path pluginDataDir)
       throws ConfigInvalidException, IOException {
+    this.site = site;
     this.cfgPath = site.etc_dir.resolve("replication.config");
     this.config = new FileBasedConfig(cfgPath.toFile(), FS.DETECTED);
     this.destinations = allDestinations(destinationFactory);
+    this.pluginDataDir = pluginDataDir;
   }
 
   /*
@@ -86,14 +92,14 @@
     return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList());
   }
 
-  private List<Destination> allDestinations(DestinationFactory destinationFactory)
+  private List<Destination> allDestinations(Destination.Factory destinationFactory)
       throws ConfigInvalidException, IOException {
     if (!config.getFile().exists()) {
-      log.warn("Config file {} does not exist; not replicating", config.getFile());
+      logger.atWarning().log("Config file %s does not exist; not replicating", config.getFile());
       return Collections.emptyList();
     }
     if (config.getFile().length() == 0) {
-      log.info("Config file {} is empty; not replicating", config.getFile());
+      logger.atInfo().log("Config file %s is empty; not replicating", config.getFile());
       return Collections.emptyList();
     }
 
@@ -107,7 +113,7 @@
           String.format("Cannot read %s: %s", config.getFile(), e.getMessage()), e);
     }
 
-    replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", true);
+    replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
 
     defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
 
@@ -199,6 +205,15 @@
     return destinations.isEmpty();
   }
 
+  @Override
+  public Path getEventsDirectory() {
+    String eventsDirectory = config.getString("replication", null, "eventsDirectory");
+    if (!Strings.isNullOrEmpty(eventsDirectory)) {
+      return site.resolve(eventsDirectory);
+    }
+    return pluginDataDir;
+  }
+
   Path getCfgPath() {
     return cfgPath;
   }
@@ -207,11 +222,54 @@
   public int shutdown() {
     int discarded = 0;
     for (Destination cfg : destinations) {
-      discarded += cfg.shutdown();
+      try {
+        drainReplicationEvents(cfg);
+      } catch (EventQueueNotEmptyException e) {
+        logger.atWarning().log("Event queue not empty: %s", e.getMessage());
+      } finally {
+        discarded += cfg.shutdown();
+      }
     }
     return discarded;
   }
 
+  void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
+    int drainQueueAttempts = destination.getDrainQueueAttempts();
+    if (drainQueueAttempts == 0) {
+      return;
+    }
+    int pending = destination.getQueueInfo().pending.size();
+    int inFlight = destination.getQueueInfo().inFlight.size();
+
+    while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
+      try {
+        logger.atInfo().log(
+            "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
+            inFlight, pending);
+        Thread.sleep(destination.getReplicationDelaySeconds());
+      } catch (InterruptedException ie) {
+        logger.atWarning().withCause(ie).log(
+            "Wait for replication events to drain has been interrupted");
+      }
+      pending = destination.getQueueInfo().pending.size();
+      inFlight = destination.getQueueInfo().inFlight.size();
+      drainQueueAttempts--;
+    }
+
+    if (pending > 0 || inFlight > 0) {
+      throw new EventQueueNotEmptyException(
+          String.format("Pending: %d - InFlight: %d", pending, inFlight));
+    }
+  }
+
+  public static class EventQueueNotEmptyException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public EventQueueNotEmptyException(String errorMessage) {
+      super(errorMessage);
+    }
+  }
+
   FileBasedConfig getConfig() {
     return config;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
index 7b3486b..05bbb03 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.gerrit.common.data.AccessSection;
-import com.google.gerrit.reviewdb.client.Project.NameKey;
+import com.google.gerrit.reviewdb.client.Project;
 import java.util.Collections;
 import java.util.List;
 
@@ -46,7 +46,7 @@
     projectPatterns = patterns;
   }
 
-  public boolean matches(NameKey name) {
+  public boolean matches(Project.NameKey name) {
     if (projectPatterns.isEmpty()) {
       return true;
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index f30e13d..5fdb375 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -21,8 +21,8 @@
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.extensions.events.NewProjectCreatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventTypes;
 import com.google.inject.AbstractModule;
@@ -34,11 +34,13 @@
 class ReplicationModule extends AbstractModule {
   @Override
   protected void configure() {
-    bind(DestinationFactory.class).in(Scopes.SINGLETON);
+    install(new FactoryModuleBuilder().build(Destination.Factory.class));
     bind(ReplicationQueue.class).in(Scopes.SINGLETON);
+    bind(LifecycleListener.class)
+        .annotatedWith(UniqueAnnotations.create())
+        .to(ReplicationQueue.class);
 
     DynamicSet.bind(binder(), GitReferenceUpdatedListener.class).to(ReplicationQueue.class);
-    DynamicSet.bind(binder(), NewProjectCreatedListener.class).to(ReplicationQueue.class);
     DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ReplicationQueue.class);
     DynamicSet.bind(binder(), HeadUpdatedListener.class).to(ReplicationQueue.class);
 
@@ -55,14 +57,20 @@
         .to(StartReplicationCapability.class);
 
     install(new FactoryModuleBuilder().build(PushAll.Factory.class));
-    install(new FactoryModuleBuilder().build(RemoteSiteUser.Factory.class));
 
     bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
-    bind(ReplicationStateListener.class).to(ReplicationStateLogger.class);
+    DynamicSet.setOf(binder(), ReplicationStateListener.class);
+    DynamicSet.bind(binder(), ReplicationStateListener.class).to(ReplicationStateLogger.class);
 
     EventTypes.register(RefReplicatedEvent.TYPE, RefReplicatedEvent.class);
     EventTypes.register(RefReplicationDoneEvent.TYPE, RefReplicationDoneEvent.class);
     EventTypes.register(ReplicationScheduledEvent.TYPE, ReplicationScheduledEvent.class);
     bind(SshSessionFactory.class).toProvider(ReplicationSshSessionFactoryProvider.class);
+
+    DynamicItem.itemOf(binder(), AdminApiFactory.class);
+    DynamicItem.bind(binder(), AdminApiFactory.class)
+        .to(AdminApiFactory.DefaultAdminApiFactory.class);
+
+    bind(TransportFactory.class).to(TransportFactoryImpl.class).in(Scopes.SINGLETON);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationPushFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationPushFilter.java
new file mode 100644
index 0000000..eb6ba90
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationPushFilter.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.extensions.annotations.ExtensionPoint;
+import java.util.List;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+
+/**
+ * Filter that is invoked before list of remote ref updates is pushed to remote instance.
+ *
+ * <p>It can be used to filter out unwanted updates.
+ */
+@ExtensionPoint
+public interface ReplicationPushFilter {
+
+  public List<RemoteRefUpdate> filter(String projectName, List<RemoteRefUpdate> remoteUpdatesList);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 4c7bdfc..d73ab7b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,34 +14,33 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
 import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
-import com.google.gerrit.common.EventDispatcher;
+import com.google.gerrit.common.Nullable;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.extensions.events.NewProjectCreatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
-import org.eclipse.jgit.internal.storage.file.FileRepository;
-import org.eclipse.jgit.lib.Constants;
-import org.eclipse.jgit.lib.RefUpdate;
-import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.transport.URIish;
-import org.eclipse.jgit.util.QuotedString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +48,6 @@
 public class ReplicationQueue
     implements LifecycleListener,
         GitReferenceUpdatedListener,
-        NewProjectCreatedListener,
         ProjectDeletedListener,
         HeadUpdatedListener {
   static final String REPLICATION_LOG_NAME = "replication_log";
@@ -70,35 +68,39 @@
   }
 
   private final WorkQueue workQueue;
-  private final SshHelper sshHelper;
   private final DynamicItem<EventDispatcher> dispatcher;
   private final ReplicationConfig config;
-  private final GerritSshApi gerritAdmin;
+  private final DynamicItem<AdminApiFactory> adminApiFactory;
+  private final ReplicationTasksStorage replicationTasksStorage;
   private volatile boolean running;
-  private final Queue<GitReferenceUpdatedListener.Event> beforeStartupEventsQueue;
+  private volatile boolean replaying;
+  private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
 
   @Inject
   ReplicationQueue(
       WorkQueue wq,
-      SshHelper sh,
-      GerritSshApi ga,
+      DynamicItem<AdminApiFactory> aaf,
       ReplicationConfig rc,
       DynamicItem<EventDispatcher> dis,
-      ReplicationStateListener sl) {
+      ReplicationStateListeners sl,
+      ReplicationTasksStorage rts) {
     workQueue = wq;
-    sshHelper = sh;
     dispatcher = dis;
     config = rc;
     stateLog = sl;
-    gerritAdmin = ga;
+    adminApiFactory = aaf;
+    replicationTasksStorage = rts;
     beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
   }
 
   @Override
   public void start() {
-    config.startup(workQueue);
-    running = true;
-    fireBeforeStartupEvents();
+    if (!running) {
+      config.startup(workQueue);
+      running = true;
+      firePendingEvents();
+      fireBeforeStartupEvents();
+    }
   }
 
   @Override
@@ -110,11 +112,20 @@
     }
   }
 
+  public boolean isRunning() {
+    return running;
+  }
+
+  public boolean isReplaying() {
+    return replaying;
+  }
+
   void scheduleFullSync(Project.NameKey project, String urlMatch, ReplicationState state) {
     scheduleFullSync(project, urlMatch, state, false);
   }
 
-  void scheduleFullSync(
+  @VisibleForTesting
+  public void scheduleFullSync(
       Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
     if (!running) {
       stateLog.warn("Replication plugin did not finish startup before event", state);
@@ -125,6 +136,9 @@
       if (cfg.wouldPushProject(project)) {
         for (URIish uri : cfg.getURIs(project, urlMatch)) {
           cfg.schedule(project, PushOne.ALL_REFS, uri, state, now);
+          replicationTasksStorage.persist(
+              new ReplicateRefUpdate(
+                  project.get(), PushOne.ALL_REFS, uri, cfg.getRemoteConfigName()));
         }
       }
     }
@@ -132,38 +146,53 @@
 
   @Override
   public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
+    onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+  }
+
+  private void onGitReferenceUpdated(String projectName, String refName) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
     if (!running) {
       stateLog.warn(
           "Replication plugin did not finish startup before event, event replication is postponed",
           state);
-      beforeStartupEventsQueue.add(event);
+      beforeStartupEventsQueue.add(new ReferenceUpdatedEvent(projectName, refName));
       return;
     }
 
-    Project.NameKey project = new Project.NameKey(event.getProjectName());
+    Project.NameKey project = new Project.NameKey(projectName);
     for (Destination cfg : config.getDestinations(FilterType.ALL)) {
-      if (cfg.wouldPushProject(project) && cfg.wouldPushRef(event.getRefName())) {
+      if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
         for (URIish uri : cfg.getURIs(project, null)) {
-          cfg.schedule(project, event.getRefName(), uri, state);
+          replicationTasksStorage.persist(
+              new ReplicateRefUpdate(projectName, refName, uri, cfg.getRemoteConfigName()));
+          cfg.schedule(project, refName, uri, state);
         }
       }
     }
     state.markAllPushTasksScheduled();
   }
 
-  @Override
-  public void onNewProjectCreated(NewProjectCreatedListener.Event event) {
-    Project.NameKey projectName = new Project.NameKey(event.getProjectName());
-    for (URIish uri : getURIs(projectName, FilterType.PROJECT_CREATION)) {
-      createProject(uri, projectName, event.getHeadName());
+  private void firePendingEvents() {
+    try {
+      Set<String> eventsReplayed = new HashSet<>();
+      replaying = true;
+      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) {
+        String eventKey = String.format("%s:%s", t.project, t.ref);
+        if (!eventsReplayed.contains(eventKey)) {
+          repLog.info("Firing pending task {}", eventKey);
+          onGitReferenceUpdated(t.project, t.ref);
+          eventsReplayed.add(eventKey);
+        }
+      }
+    } finally {
+      replaying = false;
     }
   }
 
   @Override
   public void onProjectDeleted(ProjectDeletedListener.Event event) {
     Project.NameKey projectName = new Project.NameKey(event.getProjectName());
-    for (URIish uri : getURIs(projectName, FilterType.PROJECT_DELETION)) {
+    for (URIish uri : getURIs(null, projectName, FilterType.PROJECT_DELETION)) {
       deleteProject(uri, projectName);
     }
   }
@@ -171,24 +200,25 @@
   @Override
   public void onHeadUpdated(HeadUpdatedListener.Event event) {
     Project.NameKey project = new Project.NameKey(event.getProjectName());
-    for (URIish uri : getURIs(project, FilterType.ALL)) {
+    for (URIish uri : getURIs(null, project, FilterType.ALL)) {
       updateHead(uri, project, event.getNewHeadName());
     }
   }
 
   private void fireBeforeStartupEvents() {
     Set<String> eventsReplayed = new HashSet<>();
-    for (GitReferenceUpdatedListener.Event event : beforeStartupEventsQueue) {
+    for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) {
       String eventKey = String.format("%s:%s", event.getProjectName(), event.getRefName());
       if (!eventsReplayed.contains(eventKey)) {
         repLog.info("Firing pending task {}", event);
-        onGitReferenceUpdated(event);
+        onGitReferenceUpdated(event.getProjectName(), event.getRefName());
         eventsReplayed.add(eventKey);
       }
     }
   }
 
-  private Set<URIish> getURIs(Project.NameKey projectName, FilterType filterType) {
+  private Set<URIish> getURIs(
+      @Nullable String remoteName, Project.NameKey projectName, FilterType filterType) {
     if (config.getDestinations(filterType).isEmpty()) {
       return Collections.emptySet();
     }
@@ -203,6 +233,10 @@
         continue;
       }
 
+      if (remoteName != null && !config.getRemoteConfigName().equals(remoteName)) {
+        continue;
+      }
+
       boolean adminURLUsed = false;
 
       for (String url : config.getAdminUrls()) {
@@ -245,201 +279,75 @@
     return uris;
   }
 
-  public boolean createProject(Project.NameKey project, String head) {
+  public boolean createProject(String remoteName, Project.NameKey project, String head) {
     boolean success = true;
-    for (URIish uri : getURIs(project, FilterType.PROJECT_CREATION)) {
+    for (URIish uri : getURIs(remoteName, project, FilterType.PROJECT_CREATION)) {
       success &= createProject(uri, project, head);
     }
     return success;
   }
 
   private boolean createProject(URIish replicateURI, Project.NameKey projectName, String head) {
-    if (isGerrit(replicateURI)) {
-      gerritAdmin.createProject(replicateURI, projectName, head);
-    } else if (!replicateURI.isRemote()) {
-      createLocally(replicateURI, head);
-    } else if (isSSH(replicateURI)) {
-      createRemoteSsh(replicateURI, head);
-    } else {
-      repLog.warn(
-          "Cannot create new project on remote site {}."
-              + " Only local paths and SSH URLs are supported"
-              + " for remote repository creation",
-          replicateURI);
-      return false;
-    }
-    return true;
-  }
-
-  private static void createLocally(URIish uri, String head) {
-    try (Repository repo = new FileRepository(uri.getPath())) {
-      repo.create(true /* bare */);
-
-      if (head != null && head.startsWith(Constants.R_REFS)) {
-        RefUpdate u = repo.updateRef(Constants.HEAD);
-        u.disableRefLog();
-        u.link(head);
-      }
-      repLog.info("Created local repository: {}", uri);
-    } catch (IOException e) {
-      repLog.error("Error creating local repository {}:\n", uri.getPath(), e);
-    }
-  }
-
-  private void createRemoteSsh(URIish uri, String head) {
-    String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
-    String cmd = "mkdir -p " + quotedPath + " && cd " + quotedPath + " && git init --bare";
-    if (head != null) {
-      cmd = cmd + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(head);
-    }
-    OutputStream errStream = sshHelper.newErrorBufferStream();
-    try {
-      sshHelper.executeRemoteSsh(uri, cmd, errStream);
-      repLog.info("Created remote repository: {}", uri);
-    } catch (IOException e) {
-      repLog.error(
-          "Error creating remote repository at {}:\n"
-              + "  Exception: {}\n"
-              + "  Command: {}\n"
-              + "  Output: {}",
-          uri,
-          e,
-          cmd,
-          errStream,
-          e);
-    }
-  }
-
-  private void deleteProject(URIish replicateURI, Project.NameKey projectName) {
-    if (isGerrit(replicateURI)) {
-      gerritAdmin.deleteProject(replicateURI, projectName);
-      repLog.info("Deleted remote repository: " + replicateURI);
-    } else if (!replicateURI.isRemote()) {
-      deleteLocally(replicateURI);
-    } else if (isSSH(replicateURI)) {
-      deleteRemoteSsh(replicateURI);
-    } else {
-      repLog.warn(
-          "Cannot delete project on remote site {}. "
-              + "Only local paths and SSH URLs are supported"
-              + " for remote repository deletion",
-          replicateURI);
-    }
-  }
-
-  private static void deleteLocally(URIish uri) {
-    try {
-      recursivelyDelete(new File(uri.getPath()));
-      repLog.info("Deleted local repository: {}", uri);
-    } catch (IOException e) {
-      repLog.error("Error deleting local repository {}:\n", uri.getPath(), e);
-    }
-  }
-
-  private static void recursivelyDelete(File dir) throws IOException {
-    File[] contents = dir.listFiles();
-    if (contents != null) {
-      for (File d : contents) {
-        if (d.isDirectory()) {
-          recursivelyDelete(d);
-        } else {
-          if (!d.delete()) {
-            throw new IOException("Failed to delete: " + d.getAbsolutePath());
-          }
-        }
-      }
-    }
-    if (!dir.delete()) {
-      throw new IOException("Failed to delete: " + dir.getAbsolutePath());
-    }
-  }
-
-  private void deleteRemoteSsh(URIish uri) {
-    String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
-    String cmd = "rm -rf " + quotedPath;
-    OutputStream errStream = sshHelper.newErrorBufferStream();
-    try {
-      sshHelper.executeRemoteSsh(uri, cmd, errStream);
-      repLog.info("Deleted remote repository: {}", uri);
-    } catch (IOException e) {
-      repLog.error(
-          "Error deleting remote repository at {}:\n"
-              + "  Exception: {}\n"
-              + "  Command: {}\n"
-              + "  Output: {}",
-          uri,
-          e,
-          cmd,
-          errStream,
-          e);
-    }
-  }
-
-  private void updateHead(URIish replicateURI, Project.NameKey projectName, String newHead) {
-    if (isGerrit(replicateURI)) {
-      gerritAdmin.updateHead(replicateURI, projectName, newHead);
-    } else if (!replicateURI.isRemote()) {
-      updateHeadLocally(replicateURI, newHead);
-    } else if (isSSH(replicateURI)) {
-      updateHeadRemoteSsh(replicateURI, newHead);
-    } else {
-      repLog.warn(
-          "Cannot update HEAD of project on remote site {}."
-              + " Only local paths and SSH URLs are supported"
-              + " for remote HEAD update.",
-          replicateURI);
-    }
-  }
-
-  private void updateHeadRemoteSsh(URIish uri, String newHead) {
-    String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
-    String cmd =
-        "cd " + quotedPath + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(newHead);
-    OutputStream errStream = sshHelper.newErrorBufferStream();
-    try {
-      sshHelper.executeRemoteSsh(uri, cmd, errStream);
-    } catch (IOException e) {
-      repLog.error(
-          "Error updating HEAD of remote repository at {} to {}:\n"
-              + "  Exception: {}\n"
-              + "  Command: {}\n"
-              + "  Output: {}",
-          uri,
-          newHead,
-          e,
-          cmd,
-          errStream,
-          e);
-    }
-  }
-
-  private static void updateHeadLocally(URIish uri, String newHead) {
-    try (Repository repo = new FileRepository(uri.getPath())) {
-      if (newHead != null) {
-        RefUpdate u = repo.updateRef(Constants.HEAD);
-        u.link(newHead);
-      }
-    } catch (IOException e) {
-      repLog.error("Failed to update HEAD of repository {} to {}", uri.getPath(), newHead, e);
-    }
-  }
-
-  private static boolean isSSH(URIish uri) {
-    String scheme = uri.getScheme();
-    if (!uri.isRemote()) {
-      return false;
-    }
-    if (scheme != null && scheme.toLowerCase().contains("ssh")) {
+    Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI);
+    if (adminApi.isPresent() && adminApi.get().createProject(projectName, head)) {
       return true;
     }
-    if (scheme == null && uri.getHost() != null && uri.getPath() != null) {
-      return true;
-    }
+
+    warnCannotPerform("create new project", replicateURI);
     return false;
   }
 
-  private static boolean isGerrit(URIish uri) {
-    String scheme = uri.getScheme();
-    return scheme != null && scheme.toLowerCase().equals("gerrit+ssh");
+  private void deleteProject(URIish replicateURI, Project.NameKey projectName) {
+    Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI);
+    if (adminApi.isPresent()) {
+      adminApi.get().deleteProject(projectName);
+      return;
+    }
+
+    warnCannotPerform("delete project", replicateURI);
+  }
+
+  private void updateHead(URIish replicateURI, Project.NameKey projectName, String newHead) {
+    Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI);
+    if (adminApi.isPresent()) {
+      adminApi.get().updateHead(projectName, newHead);
+      return;
+    }
+
+    warnCannotPerform("update HEAD of project", replicateURI);
+  }
+
+  private void warnCannotPerform(String op, URIish uri) {
+    repLog.warn("Cannot {} on remote site {}.", op, uri);
+  }
+
+  private static class ReferenceUpdatedEvent {
+    private String projectName;
+    private String refName;
+
+    public ReferenceUpdatedEvent(String projectName, String refName) {
+      this.projectName = projectName;
+      this.refName = refName;
+    }
+
+    public String getProjectName() {
+      return projectName;
+    }
+
+    public String getRefName() {
+      return refName;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(projectName, refName);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return (obj instanceof ReferenceUpdatedEvent)
+          && Objects.equal(projectName, ((ReferenceUpdatedEvent) obj).projectName)
+          && Objects.equal(refName, ((ReferenceUpdatedEvent) obj).refName);
+    }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
index cd7a3cf..aa965fe 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
@@ -15,7 +15,6 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.reviewdb.client.Project.NameKey;
 import com.google.gerrit.server.events.RefEvent;
 
 public class ReplicationScheduledEvent extends RefEvent {
@@ -38,7 +37,7 @@
   }
 
   @Override
-  public NameKey getProjectNameKey() {
+  public Project.NameKey getProjectNameKey() {
     return new Project.NameKey(project);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
index 86557e2..df8f3f4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -23,6 +23,7 @@
 import org.eclipse.jgit.transport.URIish;
 
 public class ReplicationState {
+
   private boolean allScheduled;
   private final PushResultProcessing pushResultProcessing;
 
@@ -49,7 +50,7 @@
   private int totalPushTasksCount;
   private int finishedPushTasksCount;
 
-  public ReplicationState(PushResultProcessing processing) {
+  ReplicationState(PushResultProcessing processing) {
     pushResultProcessing = processing;
     statusByProjectRef = HashBasedTable.create();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateListeners.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateListeners.java
new file mode 100644
index 0000000..d7bf227
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateListeners.java
@@ -0,0 +1,48 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.inject.Inject;
+
+public class ReplicationStateListeners implements ReplicationStateListener {
+  private final DynamicSet<ReplicationStateListener> listeners;
+
+  @Inject
+  ReplicationStateListeners(DynamicSet<ReplicationStateListener> stateListeners) {
+    this.listeners = stateListeners;
+  }
+
+  @Override
+  public void warn(String msg, ReplicationState... states) {
+    for (ReplicationStateListener listener : listeners) {
+      listener.warn(msg, states);
+    }
+  }
+
+  @Override
+  public void error(String msg, ReplicationState... states) {
+    for (ReplicationStateListener listener : listeners) {
+      listener.error(msg, states);
+    }
+  }
+
+  @Override
+  public void error(String msg, Throwable t, ReplicationState... states) {
+    for (ReplicationStateListener listener : listeners) {
+      listener.error(msg, t, states);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
new file mode 100644
index 0000000..64397f9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -0,0 +1,137 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
+import com.google.common.hash.Hashing;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.transport.URIish;
+
+@Singleton
+public class ReplicationTasksStorage {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private boolean disableDeleteForTesting;
+
+  public static class ReplicateRefUpdate {
+    public final String project;
+    public final String ref;
+    public final String uri;
+    public final String remote;
+
+    public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
+      this.project = project;
+      this.ref = ref;
+      this.uri = uri.toASCIIString();
+      this.remote = remote;
+    }
+
+    @Override
+    public String toString() {
+      return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
+    }
+  }
+
+  private static Gson GSON = new Gson();
+
+  private final Path refUpdates;
+
+  @Inject
+  ReplicationTasksStorage(ReplicationConfig config) {
+    refUpdates = config.getEventsDirectory().resolve("ref-updates");
+  }
+
+  public String persist(ReplicateRefUpdate r) {
+    String json = GSON.toJson(r) + "\n";
+    String eventKey = sha1(json).name();
+    Path file = refUpdates().resolve(eventKey);
+
+    if (Files.exists(file)) {
+      return eventKey;
+    }
+
+    try {
+      logger.atFine().log("CREATE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
+      Files.write(file, json.getBytes(UTF_8));
+    } catch (IOException e) {
+      logger.atWarning().withCause(e).log("Couldn't persist event %s", json);
+    }
+    return eventKey;
+  }
+
+  @VisibleForTesting
+  public void disableDeleteForTesting(boolean deleteDisabled) {
+    this.disableDeleteForTesting = deleteDisabled;
+  }
+
+  public void delete(ReplicateRefUpdate r) {
+    String taskJson = GSON.toJson(r) + "\n";
+    String taskKey = sha1(taskJson).name();
+    Path file = refUpdates().resolve(taskKey);
+
+    if (disableDeleteForTesting) {
+      logger.atFine().log("DELETE %s (%s:%s => %s) DISABLED", file, r.project, r.ref, r.uri);
+      return;
+    }
+
+    try {
+      logger.atFine().log("DELETE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
+      Files.delete(file);
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Error while deleting event %s", taskKey);
+    }
+  }
+
+  public List<ReplicateRefUpdate> list() {
+    ArrayList<ReplicateRefUpdate> result = new ArrayList<>();
+    try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) {
+      for (Path e : events) {
+        if (Files.isRegularFile(e)) {
+          String json = new String(Files.readAllBytes(e), UTF_8);
+          result.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+        }
+      }
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Error when firing pending events");
+    }
+    return result;
+  }
+
+  @SuppressWarnings("deprecation")
+  private ObjectId sha1(String s) {
+    return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
+  }
+
+  private Path refUpdates() {
+    try {
+      return Files.createDirectories(refUpdates);
+    } catch (IOException e) {
+      throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
index 2b0c16b..c518091 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
@@ -17,6 +17,7 @@
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import java.io.IOException;
+import java.util.Objects;
 import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
@@ -49,8 +50,8 @@
 
   @Override
   public SecureCredentialsProvider create(String remoteName) {
-    String user = config.getString("remote", remoteName, "username");
-    String pass = config.getString("remote", remoteName, "password");
+    String user = Objects.toString(config.getString("remote", remoteName, "username"), "");
+    String pass = Objects.toString(config.getString("remote", remoteName, "password"), "");
     return new SecureCredentialsProvider(user, pass);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java
index c4294a9..62b4036 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java
@@ -20,7 +20,7 @@
 import org.eclipse.jgit.transport.URIish;
 
 /** Looks up a remote's password in secure.config. */
-class SecureCredentialsProvider extends CredentialsProvider {
+public class SecureCredentialsProvider extends CredentialsProvider {
   private final String cfgUser;
   private final String cfgPass;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactory.java
new file mode 100644
index 0000000..ba14299
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactory.java
@@ -0,0 +1,26 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+
+public interface TransportFactory {
+
+  Transport open(Repository local, URIish uri) throws NotSupportedException, TransportException;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactoryImpl.java b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactoryImpl.java
new file mode 100644
index 0000000..58c1214
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactoryImpl.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+
+public class TransportFactoryImpl implements TransportFactory {
+
+  @Override
+  public Transport open(Repository git, URIish uri)
+      throws NotSupportedException, TransportException {
+    return Transport.open(git, uri);
+  }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index e70094c..c9356b0 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -13,6 +13,11 @@
   sudo su -c 'ssh mirror1.us.some.org echo' gerrit2
 ```
 
+*NOTE:* make sure the local user's ssh keys format is PEM, here how to generate them:
+```
+  ssh-keygen -m PEM -t rsa -C "your_email@example.com"
+```
+
 <a name="example_file">
 Next, create `$site_path/etc/replication.config` as a Git-style config
 file, for example to replicate in parallel to four different hosts:</a>
@@ -65,7 +70,7 @@
 
 gerrit.replicateOnStartup
 :	If true, replicates to all remotes on startup to ensure they
-	are in-sync with this server.  By default, true.
+	are in-sync with this server.  By default, false.
 
 gerrit.autoReload
 :	If true, automatically reloads replication destinations and settings
@@ -124,6 +129,17 @@
 
 	By default, pushes are retried indefinitely.
 
+replication.eventsDirectory
+: Directory where replication events are persisted
+
+	When scheduling a replication, the replication event is persisted
+	under this directory. When the replication is done, the event is deleted.
+	If plugin is stopped before all scheduled replications are done, the
+	persisted events will not be deleted. When the plugin is started again,
+	it will trigger all replications found under this directory.
+
+	When not set, defaults to the plugin's data directory.
+
 remote.NAME.url
 :	Address of the remote server to push to.  Multiple URLs may be
 	specified within a single remote block, listing different
@@ -272,6 +288,20 @@
 
 	By default, use replication.maxRetries.
 
+remote.NAME.drainQueueAttempts
+:	Maximum number of attempts to drain the replication event queue before
+	stopping the plugin.
+
+	When stopping the plugin, the shutdown will be delayed trying to drain
+	the event queue.
+
+	The maximum delay is "drainQueueAttempts" * "replicationDelay" seconds.
+
+	When not set or set to 0, the queue is not drained and the pending
+	replication events are cancelled.
+
+	By default, do not drain replication events.
+
 remote.NAME.threads
 :	Number of worker threads to dedicate to pushing to the
 	repositories described by this remote.  Each thread can push
@@ -298,7 +328,7 @@
 	If the remote site was not available at the moment when a new
 	project was created, it will be created if during the replication
 	of a ref it is found to be missing.
-	
+
 	If false, repositories are never created automatically on this
 	remote.
 
@@ -398,7 +428,7 @@
 File `~/.ssh/config`
 --------------------
 
-If present, Gerrit reads and caches `~/.ssh/config` at startup, and
+Gerrit reads and caches the `~/.ssh/config` at startup, and
 supports most SSH configuration options.  For example:
 
 ```
@@ -412,6 +442,15 @@
     PreferredAuthentications publickey
 ```
 
+*IdentityFile* and *PreferredAuthentications* must be defined for all the hosts.
+Here an example of the minimum `~/.ssh/config` needed:
+
+```
+  Host *
+    IdentityFile ~/.ssh/id_rsa
+    PreferredAuthentications publickey
+```
+
 Supported options:
 
   * Host
diff --git a/src/main/resources/Documentation/extension-point.md b/src/main/resources/Documentation/extension-point.md
new file mode 100644
index 0000000..076aded
--- /dev/null
+++ b/src/main/resources/Documentation/extension-point.md
@@ -0,0 +1,53 @@
+@PLUGIN@ extension points
+==============
+
+The replication plugin exposes an extension point to allow influencing its behaviour from another plugin or a script.
+Extension points can be defined from the replication plugin only when it is loaded as [libModule](/config-gerrit.html#gerrit.installModule) and
+implemented by another plugin by declaring a `provided` dependency from the replication plugin.
+
+### Install extension libModule
+
+The replication plugin's extension points are defined in the `c.g.g.p.r.ReplicationExtensionPointModule`
+that needs to be configured as libModule.
+
+Create a symbolic link from `$GERRIT_SITE/plugins/replication.jar` into `$GERRIT_SITE/lib`
+and then add the replication extension module to the `gerrit.config`.
+
+Example:
+
+```
+[gerrit]
+  installModule = com.googlesource.gerrit.plugins.replication.ReplicationExtensionPointModule
+```
+
+> **NOTE**: Use and configuration of the replication plugin as library module requires a Gerrit server restart and does not support hot plugin install or upgrade.
+
+
+### Extension points
+
+* `com.googlesource.gerrit.plugins.replication.ReplicationPushFilter`
+
+  Filter out the ref updates pushed to a remote instance.
+  Only one filter at a time is supported. Filter implementation needs to bind a `DynamicItem`.
+
+  Default: no filtering
+
+  Example:
+
+  ```
+  DynamicItem.bind(binder(), ReplicationPushFilter.class).to(ReplicationPushFilterImpl.class);
+  ```
+
+* `com.googlesource.gerrit.plugins.replication.AdminApiFactory`
+
+  Create an instance of `AdminApi` for a given remote URL. The default implementation
+  provides API instances for local FS, remote SSH, and remote Gerrit.
+
+  Only one factory at a time is supported. The implementation needs to be bound as a
+  `DynamicItem`.
+
+  Example:
+
+  ```
+  DynamicItem.bind(binder(), AdminApiFactory.class).to(AdminApiFactoryImpl.class);
+  ```
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
index 337bd1d..5fa7b98 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
@@ -22,8 +22,8 @@
 import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 
-import com.google.gerrit.common.EventDispatcher;
 import com.google.gerrit.reviewdb.server.ReviewDb;
+import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gwtorm.client.KeyUtil;
 import com.google.gwtorm.server.OrmException;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
new file mode 100644
index 0000000..af065b3
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -0,0 +1,375 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.RemoteRefUpdateCollectionMatcher.eqRemoteRef;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.getCurrentArguments;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.eclipse.jgit.lib.Ref.Storage.NEW;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.git.PerThreadRequestScope;
+import com.google.gerrit.server.permissions.PermissionBackend;
+import com.google.gerrit.server.permissions.PermissionBackend.ForProject;
+import com.google.gerrit.server.permissions.PermissionBackend.WithUser;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gerrit.server.project.ProjectState;
+import com.google.gerrit.server.util.IdGenerator;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import junit.framework.AssertionFailedError;
+import org.easymock.IAnswer;
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectIdRef;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.FetchConnection;
+import org.eclipse.jgit.transport.PushConnection;
+import org.eclipse.jgit.transport.PushResult;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+import org.eclipse.jgit.util.FS;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PushOneTest {
+  private static final int TEST_PUSH_TIMEOUT_SECS = 10;
+
+  private GitRepositoryManager gitRepositoryManagerMock;
+  private Repository repositoryMock;
+  private PermissionBackend permissionBackendMock;
+  private PermissionBackend.WithUser withUserMock;
+  private PermissionBackend.ForProject forProjectMock;
+
+  private Destination destinationMock;
+  private RemoteConfig remoteConfigMock;
+  private RefSpec refSpecMock;
+  private CredentialsFactory credentialsFactory;
+  private PerThreadRequestScope.Scoper threadRequestScoperMock;
+  private ReplicationQueue replicationQueueMock;
+  private IdGenerator idGeneratorMock;
+  private ReplicationStateListeners replicationStateListenersMock;
+  private ReplicationMetrics replicationMetricsMock;
+  private Timer1.Context timerContextMock;
+  private ProjectCache projectCacheMock;
+  private TransportFactory transportFactoryMock;
+  private Transport transportMock;
+  private FetchConnection fetchConnection;
+  private PushConnection pushConnection;
+  private ProjectState projectStateMock;
+  private RefUpdate refUpdateMock;
+
+  private Project.NameKey projectNameKey;
+  private URIish urish;
+  private Map<String, Ref> localRefs;
+
+  private Map<String, Ref> remoteRefs;
+  private CountDownLatch isCallFinished;
+  private Ref newLocalRef;
+
+  @Before
+  public void setup() throws Exception {
+    projectNameKey = new Project.NameKey("fooProject");
+    urish = new URIish("http://foo.com/fooProject.git");
+
+    newLocalRef =
+        new ObjectIdRef.Unpeeled(
+            NEW, "foo", ObjectId.fromString("0000000000000000000000000000000000000001"));
+
+    localRefs = new HashMap<>();
+    localRefs.put("fooProject", newLocalRef);
+
+    Ref remoteRef = new ObjectIdRef.Unpeeled(NEW, "foo", ObjectId.zeroId());
+    remoteRefs = new HashMap<>();
+    remoteRefs.put("fooProject", remoteRef);
+
+    isCallFinished = new CountDownLatch(1);
+
+    setupMocks();
+  }
+
+  private void setupMocks() throws Exception {
+    FileBasedConfig config = new FileBasedConfig(new Config(), new File("/foo"), FS.DETECTED);
+    config.setString("remote", "Replication", "push", "foo");
+
+    setupRefUpdateMock();
+    setupRepositoryMock(config);
+    setupGitRepoManagerMock();
+
+    projectStateMock = createNiceMock(ProjectState.class);
+    forProjectMock = createNiceMock(ForProject.class);
+    setupWithUserMock();
+    setupPermissionBackedMock();
+
+    setupDestinationMock();
+
+    setupRefSpecMock();
+    setupRemoteConfigMock();
+
+    credentialsFactory = createNiceMock(CredentialsFactory.class);
+
+    setupFetchConnectionMock();
+    setupPushConnectionMock();
+    setupRequestScopeMock();
+    replicationQueueMock = createNiceMock(ReplicationQueue.class);
+    idGeneratorMock = createNiceMock(IdGenerator.class);
+    replicationStateListenersMock = createNiceMock(ReplicationStateListeners.class);
+
+    timerContextMock = createNiceMock(Timer1.Context.class);
+    setupReplicationMetricsMock();
+
+    setupTransportMock();
+
+    setupProjectCacheMock();
+
+    replay(
+        gitRepositoryManagerMock,
+        refUpdateMock,
+        repositoryMock,
+        permissionBackendMock,
+        destinationMock,
+        remoteConfigMock,
+        credentialsFactory,
+        threadRequestScoperMock,
+        replicationQueueMock,
+        idGeneratorMock,
+        replicationStateListenersMock,
+        replicationMetricsMock,
+        projectCacheMock,
+        timerContextMock,
+        transportFactoryMock,
+        projectStateMock,
+        withUserMock,
+        forProjectMock,
+        fetchConnection,
+        pushConnection,
+        refSpecMock);
+  }
+
+  @Test
+  public void shouldPushAllRefsWhenNoFilters() throws InterruptedException, IOException {
+    shouldPushAllRefsWithDynamicItemFilter(DynamicItem.itemOf(ReplicationPushFilter.class, null));
+  }
+
+  @Test
+  public void shouldPushAllRefsWhenNoFiltersSetup() throws InterruptedException, IOException {
+    shouldPushAllRefsWithDynamicItemFilter(null);
+  }
+
+  private void shouldPushAllRefsWithDynamicItemFilter(
+      DynamicItem<ReplicationPushFilter> replicationPushFilter)
+      throws IOException, NotSupportedException, TransportException, InterruptedException {
+    List<RemoteRefUpdate> expectedUpdates =
+        Arrays.asList(
+            new RemoteRefUpdate(
+                repositoryMock,
+                newLocalRef.getName(),
+                newLocalRef.getObjectId(),
+                "fooProject",
+                false,
+                "fooProject",
+                null));
+
+    PushResult pushResult = new PushResult();
+
+    expect(transportMock.push(anyObject(), eqRemoteRef(expectedUpdates)))
+        .andReturn(pushResult)
+        .once();
+    replay(transportMock);
+
+    PushOne pushOne = createPushOne(replicationPushFilter);
+
+    pushOne.addRef(PushOne.ALL_REFS);
+    pushOne.run();
+
+    isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
+
+    verify(transportMock);
+  }
+
+  @Test
+  public void shouldBlockReplicationUsingPushFilter() throws InterruptedException, IOException {
+    DynamicItem<ReplicationPushFilter> replicationPushFilter =
+        DynamicItem.itemOf(
+            ReplicationPushFilter.class,
+            new ReplicationPushFilter() {
+
+              @Override
+              public List<RemoteRefUpdate> filter(
+                  String projectName, List<RemoteRefUpdate> remoteUpdatesList) {
+                return Collections.emptyList();
+              }
+            });
+
+    // easymock way to check if method was never called
+    expect(transportMock.push(anyObject(), anyObject()))
+        .andThrow(new AssertionFailedError())
+        .anyTimes();
+    replay(transportMock);
+
+    PushOne pushOne = createPushOne(replicationPushFilter);
+
+    pushOne.addRef(PushOne.ALL_REFS);
+    pushOne.run();
+
+    isCallFinished.await(10, TimeUnit.SECONDS);
+
+    verify(transportMock);
+  }
+
+  private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
+    PushOne push =
+        new PushOne(
+            gitRepositoryManagerMock,
+            permissionBackendMock,
+            destinationMock,
+            remoteConfigMock,
+            credentialsFactory,
+            threadRequestScoperMock,
+            replicationQueueMock,
+            idGeneratorMock,
+            replicationStateListenersMock,
+            replicationMetricsMock,
+            projectCacheMock,
+            transportFactoryMock,
+            projectNameKey,
+            urish);
+
+    push.setReplicationPushFilter(replicationPushFilter);
+    return push;
+  }
+
+  private void setupProjectCacheMock() throws IOException {
+    projectCacheMock = createNiceMock(ProjectCache.class);
+    expect(projectCacheMock.checkedGet(projectNameKey)).andReturn(projectStateMock);
+  }
+
+  private void setupTransportMock() throws NotSupportedException, TransportException {
+    transportMock = createNiceMock(Transport.class);
+    expect(transportMock.openFetch()).andReturn(fetchConnection);
+    transportFactoryMock = createNiceMock(TransportFactory.class);
+    expect(transportFactoryMock.open(repositoryMock, urish)).andReturn(transportMock).anyTimes();
+  }
+
+  private void setupReplicationMetricsMock() {
+    replicationMetricsMock = createNiceMock(ReplicationMetrics.class);
+    expect(replicationMetricsMock.start(anyObject())).andReturn(timerContextMock);
+  }
+
+  private void setupRequestScopeMock() {
+    threadRequestScoperMock = createNiceMock(PerThreadRequestScope.Scoper.class);
+    expect(threadRequestScoperMock.scope(anyObject()))
+        .andAnswer(
+            new IAnswer<Callable<Object>>() {
+              @SuppressWarnings("unchecked")
+              @Override
+              public Callable<Object> answer() throws Throwable {
+                Callable<Object> originalCall = (Callable<Object>) getCurrentArguments()[0];
+                return new Callable<Object>() {
+
+                  @Override
+                  public Object call() throws Exception {
+                    Object result = originalCall.call();
+                    isCallFinished.countDown();
+                    return result;
+                  }
+                };
+              }
+            })
+        .anyTimes();
+  }
+
+  private void setupPushConnectionMock() {
+    pushConnection = createNiceMock(PushConnection.class);
+    expect(pushConnection.getRefsMap()).andReturn(remoteRefs);
+  }
+
+  private void setupFetchConnectionMock() {
+    fetchConnection = createNiceMock(FetchConnection.class);
+    expect(fetchConnection.getRefsMap()).andReturn(remoteRefs);
+  }
+
+  private void setupRemoteConfigMock() {
+    remoteConfigMock = createNiceMock(RemoteConfig.class);
+    expect(remoteConfigMock.getPushRefSpecs()).andReturn(ImmutableList.of(refSpecMock));
+  }
+
+  private void setupRefSpecMock() {
+    refSpecMock = createNiceMock(RefSpec.class);
+    expect(refSpecMock.matchSource(anyObject(String.class))).andReturn(true);
+    expect(refSpecMock.expandFromSource(anyObject(String.class))).andReturn(refSpecMock);
+    expect(refSpecMock.getDestination()).andReturn("fooProject").anyTimes();
+    expect(refSpecMock.isForceUpdate()).andReturn(false).anyTimes();
+  }
+
+  private void setupDestinationMock() {
+    destinationMock = createNiceMock(Destination.class);
+    expect(destinationMock.requestRunway(anyObject())).andReturn(RunwayStatus.allowed());
+  }
+
+  private void setupPermissionBackedMock() {
+    permissionBackendMock = createNiceMock(PermissionBackend.class);
+    expect(permissionBackendMock.currentUser()).andReturn(withUserMock);
+  }
+
+  private void setupWithUserMock() {
+    withUserMock = createNiceMock(WithUser.class);
+    expect(withUserMock.project(projectNameKey)).andReturn(forProjectMock);
+  }
+
+  private void setupGitRepoManagerMock() throws IOException {
+    gitRepositoryManagerMock = createNiceMock(GitRepositoryManager.class);
+    expect(gitRepositoryManagerMock.openRepository(projectNameKey)).andReturn(repositoryMock);
+  }
+
+  @SuppressWarnings("deprecation")
+  private void setupRepositoryMock(FileBasedConfig config) throws IOException {
+    repositoryMock = createNiceMock(Repository.class);
+    expect(repositoryMock.getConfig()).andReturn(config).anyTimes();
+    expect(repositoryMock.getAllRefs()).andReturn(localRefs);
+    expect(repositoryMock.updateRef("fooProject")).andReturn(refUpdateMock);
+  }
+
+  private void setupRefUpdateMock() {
+    refUpdateMock = createNiceMock(RefUpdate.class);
+    expect(refUpdateMock.getOldObjectId())
+        .andReturn(ObjectId.fromString("0000000000000000000000000000000000000001"))
+        .anyTimes();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
new file mode 100644
index 0000000..111a792
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
@@ -0,0 +1,64 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.util.Collection;
+import java.util.Objects;
+import org.easymock.EasyMock;
+import org.easymock.IArgumentMatcher;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+
+public class RemoteRefUpdateCollectionMatcher implements IArgumentMatcher {
+  Collection<RemoteRefUpdate> expectedRemoteRefs;
+
+  public static Collection<RemoteRefUpdate> eqRemoteRef(
+      Collection<RemoteRefUpdate> expectedRemoteRefs) {
+    EasyMock.reportMatcher(new RemoteRefUpdateCollectionMatcher(expectedRemoteRefs));
+    return null;
+  }
+
+  public RemoteRefUpdateCollectionMatcher(Collection<RemoteRefUpdate> expectedRemoteRefs) {
+    this.expectedRemoteRefs = expectedRemoteRefs;
+  }
+
+  @Override
+  public boolean matches(Object argument) {
+    if (!(argument instanceof Collection)) return false;
+
+    @SuppressWarnings("unchecked")
+    Collection<RemoteRefUpdate> refs = (Collection<RemoteRefUpdate>) argument;
+
+    if (expectedRemoteRefs.size() != refs.size()) return false;
+    return refs.stream()
+        .allMatch(
+            ref -> expectedRemoteRefs.stream().anyMatch(expectedRef -> compare(ref, expectedRef)));
+  }
+
+  @Override
+  public void appendTo(StringBuffer buffer) {
+    buffer.append("expected:" + expectedRemoteRefs.toString());
+  }
+
+  private boolean compare(RemoteRefUpdate ref, RemoteRefUpdate expectedRef) {
+    return Objects.equals(ref.getRemoteName(), expectedRef.getRemoteName())
+        && Objects.equals(ref.getStatus(), expectedRef.getStatus())
+        && Objects.equals(ref.getExpectedOldObjectId(), expectedRef.getExpectedOldObjectId())
+        && Objects.equals(ref.getNewObjectId(), expectedRef.getNewObjectId())
+        && Objects.equals(ref.isFastForward(), expectedRef.isFastForward())
+        && Objects.equals(ref.getSrcRef(), expectedRef.getSrcRef())
+        && Objects.equals(ref.isForceUpdate(), expectedRef.isForceUpdate())
+        && Objects.equals(ref.getMessage(), expectedRef.getMessage());
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
new file mode 100644
index 0000000..61a53f3
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -0,0 +1,377 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import com.google.gerrit.extensions.common.ProjectInfo;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.google.inject.Key;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.eclipse.jgit.transport.URIish;
+import org.eclipse.jgit.util.FS;
+import org.junit.Test;
+
+@UseLocalDisk
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationIT extends LightweightPluginDaemonTest {
+  private static final Optional<String> ALL_PROJECTS = Optional.empty();
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int TEST_REPLICATION_DELAY = 1;
+  private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2);
+
+  @Inject private SitePaths sitePaths;
+  private Path pluginDataDir;
+  private Path gitPath;
+  private Path storagePath;
+  private FileBasedConfig config;
+  private ReplicationTasksStorage tasksStorage;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    gitPath = sitePaths.site_path.resolve("git");
+
+    config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    setReplicationDestination(
+        "remote1",
+        "suffix1",
+        Optional.of("not-used-project")); // Simulates a full replication.config initialization
+    config.save();
+
+    super.setUpTestPlugin();
+
+    pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
+    storagePath = pluginDataDir.resolve("ref-updates");
+    tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+    cleanupReplicationTasks();
+    tasksStorage.disableDeleteForTesting(true);
+  }
+
+  @Test
+  public void shouldReplicateNewProject() throws Exception {
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Project.NameKey sourceProject = createTestProject("foo");
+
+    assertThat(listReplicationTasks("refs/meta/config")).hasSize(1);
+
+    waitUntil(() -> projectExists(new Project.NameKey(sourceProject + "replica.git")));
+
+    ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get();
+    assertThat(replicaProject).isNotNull();
+  }
+
+  @Test
+  public void shouldReplicateNewChangeRef() throws Exception {
+    Project.NameKey targetProject = createTestProject("projectreplica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().getRefName();
+
+    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1);
+
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  @Test
+  public void shouldReplicateNewBranch() throws Exception {
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Project.NameKey targetProject = createTestProject("projectreplica");
+    String newBranch = "refs/heads/mybranch";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(project.get()).branch(newBranch).create(input);
+
+    assertThat(listReplicationTasks("refs/heads/(mybranch|master)")).hasSize(2);
+
+    try (Repository repo = repoManager.openRepository(targetProject);
+        Repository sourceRepo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, newBranch) != null);
+
+      Ref masterRef = getRef(sourceRepo, master);
+      Ref targetBranchRef = getRef(repo, newBranch);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(masterRef.getObjectId());
+    }
+  }
+
+  @Test
+  public void shouldReplicateNewBranchToTwoRemotes() throws Exception {
+    Project.NameKey targetProject1 = createTestProject("projectreplica1");
+    Project.NameKey targetProject2 = createTestProject("projectreplica2");
+
+    setReplicationDestination("foo1", "replica1", ALL_PROJECTS);
+    setReplicationDestination("foo2", "replica2", ALL_PROJECTS);
+    reloadConfig();
+
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().getRefName();
+
+    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
+
+    try (Repository repo1 = repoManager.openRepository(targetProject1);
+        Repository repo2 = repoManager.openRepository(targetProject2)) {
+      waitUntil(
+          () ->
+              (checkedGetRef(repo1, sourceRef) != null && checkedGetRef(repo2, sourceRef) != null));
+
+      Ref targetBranchRef1 = getRef(repo1, sourceRef);
+      assertThat(targetBranchRef1).isNotNull();
+      assertThat(targetBranchRef1.getObjectId()).isEqualTo(sourceCommit.getId());
+
+      Ref targetBranchRef2 = getRef(repo2, sourceRef);
+      assertThat(targetBranchRef2).isNotNull();
+      assertThat(targetBranchRef2.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  @Test
+  public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
+    List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+    createTestProject("projectreplica1");
+    createTestProject("projectreplica2");
+
+    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+    config.setInt("remote", "foo1", "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    config.setInt("remote", "foo2", "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    reloadConfig();
+
+    createChange();
+
+    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+
+    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+  }
+
+  @Test
+  public void shouldCreateOneReplicationTaskWhenSchedulingRepoFullSync() throws Exception {
+    PushResultProcessing pushResultProcessing =
+        new PushResultProcessing() {
+
+          @Override
+          void onRefReplicatedToOneNode(
+              String project,
+              String ref,
+              URIish uri,
+              ReplicationState.RefPushResult status,
+              RemoteRefUpdate.Status refStatus) {}
+
+          @Override
+          void onRefReplicatedToAllNodes(String project, String ref, int nodesCount) {}
+
+          @Override
+          void onAllRefsReplicatedToAllNodes(int totalPushTasksCount) {}
+        };
+
+    createTestProject("projectreplica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    plugin
+        .getSysInjector()
+        .getInstance(ReplicationQueue.class)
+        .scheduleFullSync(project, null, new ReplicationState(pushResultProcessing), true);
+
+    assertThat(listReplicationTasks(".*all.*")).hasSize(1);
+  }
+
+  @Test
+  public void shouldReplicateHeadUpdate() throws Exception {
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Project.NameKey targetProject = createTestProject("projectreplica");
+    String newHead = "refs/heads/newhead";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(project.get()).branch(newHead).create(input);
+    gApi.projects().name(project.get()).head(newHead);
+
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, newHead) != null);
+
+      Ref targetProjectHead = getRef(repo, Constants.HEAD);
+      assertThat(targetProjectHead).isNotNull();
+      assertThat(targetProjectHead.getTarget().getName()).isEqualTo(newHead);
+    }
+  }
+
+  @Test
+  public void shouldNotDrainTheQueueWhenReloading() throws Exception {
+    // Setup repo to replicate
+    Project.NameKey targetProject = createTestProject("projectreplica");
+    String remoteName = "doNotDrainQueue";
+    setReplicationDestination(remoteName, "replica", ALL_PROJECTS);
+
+    Result pushResult = createChange();
+    shutdownConfig();
+
+    pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().getRefName();
+
+    assertThrows(
+        InterruptedException.class,
+        () -> {
+          try (Repository repo = repoManager.openRepository(targetProject)) {
+            waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+          }
+        });
+  }
+
+  @Test
+  public void shouldDrainTheQueueWhenReloading() throws Exception {
+    // Setup repo to replicate
+    Project.NameKey targetProject = createTestProject("projectreplica");
+    String remoteName = "drainQueue";
+    setReplicationDestination(remoteName, "replica", ALL_PROJECTS);
+
+    config.setInt("remote", remoteName, "drainQueueAttempts", 2);
+    config.save();
+    reloadConfig();
+
+    Result pushResult = createChange();
+    shutdownConfig();
+
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().getRefName();
+
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  private Project.NameKey createTestProject(String name) throws Exception {
+    return createProject(name);
+  }
+
+  private Ref getRef(Repository repo, String branchName) throws IOException {
+    return repo.getRefDatabase().exactRef(branchName);
+  }
+
+  private Ref checkedGetRef(Repository repo, String branchName) {
+    try {
+      return repo.getRefDatabase().exactRef(branchName);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
+      return null;
+    }
+  }
+
+  private void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
+    setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project);
+  }
+
+  private void setReplicationDestination(
+      String remoteName, List<String> replicaSuffixes, Optional<String> project)
+      throws IOException {
+
+    List<String> replicaUrls =
+        replicaSuffixes.stream()
+            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+            .collect(toList());
+    config.setStringList("remote", remoteName, "url", replicaUrls);
+    config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+    config.save();
+  }
+
+  private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
+    WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
+  }
+
+  private void reloadConfig() {
+    plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).forceReload();
+  }
+
+  private void shutdownConfig() {
+    plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).shutdown();
+  }
+
+  private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
+    Pattern refmaskPattern = Pattern.compile(refRegex);
+    return tasksStorage.list().stream()
+        .filter(task -> refmaskPattern.matcher(task.ref).matches())
+        .collect(toList());
+  }
+
+  private void cleanupReplicationTasks() throws IOException {
+    try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) {
+      for (Path path : files) {
+        path.toFile().delete();
+      }
+    }
+  }
+
+  private boolean projectExists(Project.NameKey name) {
+    try (Repository r = repoManager.openRepository(name)) {
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
index 881a282..2a395ca 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
@@ -56,13 +56,13 @@
   private FileBasedConfig config;
 
   @Override
-  public void setUp() throws Exception {
+  public void setUpTestPlugin() throws Exception {
     gitPath = sitePaths.site_path.resolve("git");
     config =
         new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
 
     setReplicationDestination("foo", "replica");
-    super.setUp();
+    super.setUpTestPlugin();
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtil.java b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtil.java
new file mode 100644
index 0000000..586b56c
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtil.java
@@ -0,0 +1,34 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import com.google.common.base.Stopwatch;
+import java.time.Duration;
+import java.util.function.Supplier;
+
+public class WaitUtil {
+  public static void waitUntil(Supplier<Boolean> waitCondition, Duration timeout)
+      throws InterruptedException {
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    while (!waitCondition.get()) {
+      if (stopwatch.elapsed().compareTo(timeout) > 0) {
+        throw new InterruptedException();
+      }
+      MILLISECONDS.sleep(50);
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtilTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtilTest.java
new file mode 100644
index 0000000..0ccb0af
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtilTest.java
@@ -0,0 +1,40 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static com.googlesource.gerrit.plugins.replication.WaitUtil.waitUntil;
+
+import java.time.Duration;
+import org.junit.Test;
+
+public class WaitUtilTest {
+
+  @Test
+  public void shouldFailWhenConditionNotMetWithinTimeout() throws Exception {
+    assertThrows(
+        InterruptedException.class,
+        () -> waitUntil(() -> returnTrue() == false, Duration.ofSeconds(1)));
+  }
+
+  @Test
+  public void shouldNotFailWhenConditionIsMetWithinTimeout() throws Exception {
+    waitUntil(() -> returnTrue() == true, Duration.ofSeconds(1));
+  }
+
+  private static boolean returnTrue() {
+    return true;
+  }
+}