Merge branch 'stable-3.1' into stable-3.2

* stable-3.1:
  ReplicationStorageIT: Wait for all pushes without order
  Replication*IT: Share getRef method
  ReplicationFanoutIT: Share setReplicationDestination
  ReplicationFanoutIT: Split shouldReplicateNewBranch tests
  ReplicationFanoutIT: Remove generic waitUntil helper
  ReplicationFanoutIT: Inherit from ReplicationDaemon
  ReplicationFanoutIT: Refactor setRemoteReplicationDestination
  ReplicationFanoutIT: Rename setReplicationDestination
  ReplicationFanoutIT: Cleanup shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair
  Move shouldCleanupTasksAfterNewProjectReplication test
  Move storage portion of replicateBranchDeletion ITs
  Refactor Replication*IT tests to share a base class
  ReplicationIT: Add shouldMatch* e2e tests
  ReplicationStorageIT: Move shouldMatch* tests from ReplicationIT
  ReplicationTasksStorage: Add multi-primary unit tests
  ReplicationTasksStorage: Add multi-primary unit tests
  ReplicationStorageIT: Add shouldFire*ChangeRefs tests
  Move storage-based ITs into ReplicationStorageIT
  ReplicationTasksStorage.Task: Add multi-primary unit tests
  ReplicationQueue: Remove unused method

Cleanup specific to stable-3.2 will be done in follow-up changes.

Change-Id: Ib938c661158e8f7a3434010187b87c79e81a01b8
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 5a4d56c..b93a83d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -17,7 +17,6 @@
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 import com.google.auto.value.AutoValue;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import com.google.gerrit.common.UsedAt;
 import com.google.gerrit.entities.Project;
@@ -116,11 +115,6 @@
     return replaying;
   }
 
-  void scheduleFullSync(Project.NameKey project, String urlMatch, ReplicationState state) {
-    scheduleFullSync(project, urlMatch, state, false);
-  }
-
-  @VisibleForTesting
   public void scheduleFullSync(
       Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
     fire(project, urlMatch, PushOne.ALL_REFS, state, now, false);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
new file mode 100644
index 0000000..420cdf8
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -0,0 +1,232 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+
+/**
+ * This class can be extended by any Replication*IT class and provides common setup and helper
+ * methods.
+ */
+@UseLocalDisk
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationDaemon extends LightweightPluginDaemonTest {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  protected static final Optional<String> ALL_PROJECTS = Optional.empty();
+
+  protected static final int TEST_REPLICATION_DELAY_SECONDS = 1;
+  protected static final int TEST_REPLICATION_RETRY_MINUTES = 1;
+  protected static final int TEST_PUSH_TIME_SECONDS = 1;
+  protected static final int TEST_PROJECT_CREATION_SECONDS = 10;
+  protected static final Duration TEST_PUSH_TIMEOUT =
+      Duration.ofSeconds(TEST_REPLICATION_DELAY_SECONDS + TEST_PUSH_TIME_SECONDS);
+  protected static final Duration TEST_NEW_PROJECT_TIMEOUT =
+      Duration.ofSeconds(
+          (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
+              + TEST_PROJECT_CREATION_SECONDS);
+
+  @Inject protected SitePaths sitePaths;
+  @Inject private ProjectOperations projectOperations;
+  protected Path gitPath;
+  protected FileBasedConfig config;
+
+  protected void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
+    setReplicationDestination(
+        remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY_SECONDS);
+  }
+
+  protected void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project, boolean mirror)
+      throws IOException {
+    setReplicationDestination(
+        remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY_SECONDS, mirror);
+  }
+
+  protected void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project, int replicationDelay)
+      throws IOException {
+    setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project, replicationDelay);
+  }
+
+  protected void setReplicationDestination(
+      String remoteName,
+      List<String> replicaSuffixes,
+      Optional<String> project,
+      int replicationDelay)
+      throws IOException {
+    setReplicationDestination(remoteName, replicaSuffixes, project, replicationDelay, false);
+  }
+
+  protected void setReplicationDestination(
+      String remoteName,
+      String replicaSuffix,
+      Optional<String> project,
+      int replicationDelay,
+      boolean mirror)
+      throws IOException {
+    setReplicationDestination(
+        remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror);
+  }
+
+  protected void setReplicationDestination(
+      String remoteName,
+      List<String> replicaSuffixes,
+      Optional<String> project,
+      int replicationDelay,
+      boolean mirror)
+      throws IOException {
+    setReplicationDestination(
+        config, remoteName, replicaSuffixes, project, replicationDelay, mirror);
+    config.setBoolean("gerrit", null, "autoReload", true);
+    config.save();
+  }
+
+  protected void setReplicationDestination(
+      FileBasedConfig config,
+      List<String> replicaSuffixes,
+      Optional<String> project,
+      int replicationDelay)
+      throws IOException {
+    setReplicationDestination(config, null, replicaSuffixes, project, replicationDelay, false);
+  }
+
+  protected void setReplicationDestination(
+      FileBasedConfig remoteConfig,
+      String remoteName,
+      List<String> replicaSuffixes,
+      Optional<String> project,
+      int replicationDelay,
+      boolean mirror)
+      throws IOException {
+
+    List<String> replicaUrls =
+        replicaSuffixes.stream()
+            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+            .collect(toList());
+    remoteConfig.setStringList("remote", remoteName, "url", replicaUrls);
+    remoteConfig.setInt("remote", remoteName, "replicationDelay", replicationDelay);
+    remoteConfig.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY_MINUTES);
+    remoteConfig.setBoolean("remote", remoteName, "mirror", mirror);
+    project.ifPresent(prj -> remoteConfig.setString("remote", remoteName, "projects", prj));
+    remoteConfig.save();
+  }
+
+  protected Project.NameKey createTestProject(String name) throws Exception {
+    return projectOperations.newProject().name(name).create();
+  }
+
+  protected boolean isPushCompleted(Project.NameKey project, String ref, Duration timeOut) {
+    try (Repository repo = repoManager.openRepository(project)) {
+      WaitUtil.waitUntil(() -> checkedGetRef(repo, ref) != null, timeOut);
+      return true;
+    } catch (InterruptedException e) {
+      return false;
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot open repo for project" + project, e);
+    }
+  }
+
+  protected boolean isPushCompleted(Map<Project.NameKey, String> refsByProject, Duration timeOut) {
+    try {
+      WaitUtil.waitUntil(
+          () -> {
+            Iterator<Map.Entry<Project.NameKey, String>> iterator =
+                refsByProject.entrySet().iterator();
+            while (iterator.hasNext()) {
+              Map.Entry<Project.NameKey, String> entry = iterator.next();
+              try (Repository repo = repoManager.openRepository(entry.getKey())) {
+                if (checkedGetRef(repo, entry.getValue()) != null) {
+                  iterator.remove();
+                }
+              } catch (IOException e) {
+                throw new RuntimeException("Cannot open repo for project" + entry.getKey(), e);
+              }
+            }
+            return refsByProject.isEmpty();
+          },
+          timeOut);
+    } catch (InterruptedException e) {
+      return false;
+    }
+    return true;
+  }
+
+  protected Ref checkedGetRef(Repository repo, String branchName) {
+    try {
+      return repo.getRefDatabase().exactRef(branchName);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
+      return null;
+    }
+  }
+
+  protected Ref getRef(Repository repo, String branchName) throws IOException {
+    return repo.getRefDatabase().exactRef(branchName);
+  }
+
+  protected void reloadConfig() {
+    getAutoReloadConfigDecoratorInstance().reload();
+  }
+
+  protected AutoReloadConfigDecorator getAutoReloadConfigDecoratorInstance() {
+    return getInstance(AutoReloadConfigDecorator.class);
+  }
+
+  protected <T> T getInstance(Class<T> classObj) {
+    return plugin.getSysInjector().getInstance(classObj);
+  }
+
+  protected boolean nonEmptyProjectExists(Project.NameKey name) {
+    try (Repository r = repoManager.openRepository(name)) {
+      return !r.getAllRefsByPeeledObjectId().isEmpty();
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  protected void initConfig() throws IOException {
+    if (config == null) {
+      gitPath = sitePaths.site_path.resolve("git");
+      config =
+          new FileBasedConfig(
+              sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+      config.save();
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
index c32a55d..3619add 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -17,32 +17,20 @@
 import static com.google.common.truth.Truth.assertThat;
 import static java.util.stream.Collectors.toList;
 
-import com.google.common.flogger.FluentLogger;
 import com.google.common.io.MoreFiles;
 import com.google.common.io.RecursiveDeleteOption;
-import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.PushOneCommit.Result;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
 import com.google.gerrit.entities.Project;
-import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.extensions.api.projects.BranchInput;
-import com.google.gerrit.server.config.SitePaths;
-import com.google.inject.Inject;
-import com.google.inject.Key;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.io.IOException;
-import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
-import java.util.function.Supplier;
 import java.util.regex.Pattern;
-import java.util.stream.Stream;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.revwalk.RevCommit;
@@ -55,37 +43,18 @@
 @TestPlugin(
     name = "replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
-public class ReplicationFanoutIT extends LightweightPluginDaemonTest {
-  private static final Optional<String> ALL_PROJECTS = Optional.empty();
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-  private static final int TEST_REPLICATION_DELAY = 1;
-  private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2);
-
-  @Inject private SitePaths sitePaths;
-  @Inject private ProjectOperations projectOperations;
-  private Path pluginDataDir;
-  private Path gitPath;
-  private Path storagePath;
-  private FileBasedConfig config;
+public class ReplicationFanoutIT extends ReplicationDaemon {
   private ReplicationTasksStorage tasksStorage;
 
   @Override
   public void setUpTestPlugin() throws Exception {
-    gitPath = sitePaths.site_path.resolve("git");
-
-    config =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
-    setAutoReload();
+    initConfig();
+    config.setBoolean("gerrit", null, "autoReload", true);
     config.save();
-
-    setReplicationDestination("remote1", "suffix1", Optional.of("not-used-project"));
+    setReplicationDestinationRemoteConfig("remote1", "suffix1", Optional.of("not-used-project"));
 
     super.setUpTestPlugin();
-
-    pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
-    storagePath = pluginDataDir.resolve("ref-updates");
     tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
-    cleanupReplicationTasks();
   }
 
   @After
@@ -98,7 +67,7 @@
 
   @Test
   public void shouldReplicateNewBranch() throws Exception {
-    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    setReplicationDestinationRemoteConfig("foo", "replica", ALL_PROJECTS);
     reloadConfig();
 
     Project.NameKey targetProject = createTestProject(project + "replica");
@@ -108,12 +77,9 @@
     input.revision = master;
     gApi.projects().name(project.get()).branch(newBranch).create(input);
 
-    assertThat(listIncompleteTasks("refs/heads/(mybranch|master)")).hasSize(2);
-
+    isPushCompleted(targetProject, newBranch, TEST_PUSH_TIMEOUT);
     try (Repository repo = repoManager.openRepository(targetProject);
         Repository sourceRepo = repoManager.openRepository(project)) {
-      waitUntil(() -> checkedGetRef(repo, newBranch) != null);
-
       Ref masterRef = getRef(sourceRepo, master);
       Ref targetBranchRef = getRef(repo, newBranch);
       assertThat(targetBranchRef).isNotNull();
@@ -122,25 +88,39 @@
   }
 
   @Test
+  public void shouldReplicateNewBranchStorage() throws Exception {
+    setReplicationDestinationRemoteConfig("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    createTestProject(project + "replica");
+    String newBranch = "refs/heads/mybranch";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(project.get()).branch(newBranch).create(input);
+
+    assertThat(listWaitingTasks("refs/heads/(mybranch|master)")).hasSize(2);
+  }
+
+  @Test
   public void shouldReplicateNewBranchToTwoRemotes() throws Exception {
     Project.NameKey targetProject1 = createTestProject(project + "replica1");
     Project.NameKey targetProject2 = createTestProject(project + "replica2");
 
-    setReplicationDestination("foo1", "replica1", ALL_PROJECTS);
-    setReplicationDestination("foo2", "replica2", ALL_PROJECTS);
+    setReplicationDestinationRemoteConfig("foo1", "replica1", ALL_PROJECTS);
+    setReplicationDestinationRemoteConfig("foo2", "replica2", ALL_PROJECTS);
     reloadConfig();
 
     Result pushResult = createChange();
     RevCommit sourceCommit = pushResult.getCommit();
     String sourceRef = pushResult.getPatchSet().refName();
 
-    assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
-
     try (Repository repo1 = repoManager.openRepository(targetProject1);
         Repository repo2 = repoManager.openRepository(targetProject2)) {
-      waitUntil(
+      WaitUtil.waitUntil(
           () ->
-              (checkedGetRef(repo1, sourceRef) != null && checkedGetRef(repo2, sourceRef) != null));
+              (checkedGetRef(repo1, sourceRef) != null && checkedGetRef(repo2, sourceRef) != null),
+          TEST_PUSH_TIMEOUT);
 
       Ref targetBranchRef1 = getRef(repo1, sourceRef);
       assertThat(targetBranchRef1).isNotNull();
@@ -153,119 +133,63 @@
   }
 
   @Test
-  public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
-    List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+  public void shouldReplicateNewBranchToTwoRemotesStorage() throws Exception {
+    createTestProject(project + "replica1");
+    createTestProject(project + "replica2");
 
-    FileBasedConfig dest1 = setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
-    FileBasedConfig dest2 = setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
-    dest1.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
-    dest2.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
-    dest1.save();
-    dest2.save();
+    setReplicationDestinationRemoteConfig("foo1", "replica1", ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestinationRemoteConfig("foo2", "replica2", ALL_PROJECTS, Integer.MAX_VALUE);
     reloadConfig();
 
     createChange();
 
-    assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
-
-    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
-    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+    assertThat(listWaitingTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
   }
 
-  private Ref getRef(Repository repo, String branchName) throws IOException {
-    return repo.getRefDatabase().exactRef(branchName);
+  @Test
+  public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
+    List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+
+    setReplicationDestinationRemoteConfig("foo1", replicaSuffixes, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestinationRemoteConfig("foo2", replicaSuffixes, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    createChange();
+
+    assertThat(listWaitingTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
   }
 
-  private Ref checkedGetRef(Repository repo, String branchName) {
-    try {
-      return repo.getRefDatabase().exactRef(branchName);
-    } catch (Exception e) {
-      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
-      return null;
-    }
-  }
-
-  private void setReplicationDestination(
+  private void setReplicationDestinationRemoteConfig(
       String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
-    setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project);
+    setReplicationDestinationRemoteConfig(
+        remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY_SECONDS);
   }
 
-  private FileBasedConfig setReplicationDestination(
-      String remoteName, List<String> replicaSuffixes, Optional<String> allProjects)
+  private void setReplicationDestinationRemoteConfig(
+      String remoteName, String replicaSuffix, Optional<String> project, int replicationDelay)
+      throws IOException {
+    setReplicationDestinationRemoteConfig(
+        remoteName, Arrays.asList(replicaSuffix), project, replicationDelay);
+  }
+
+  private void setReplicationDestinationRemoteConfig(
+      String remoteName,
+      List<String> replicaSuffixes,
+      Optional<String> allProjects,
+      int replicationDelay)
       throws IOException {
     FileBasedConfig remoteConfig =
         new FileBasedConfig(
             sitePaths.etc_dir.resolve("replication/" + remoteName + ".config").toFile(),
             FS.DETECTED);
 
-    setReplicationDestination(remoteConfig, replicaSuffixes, allProjects);
-    return remoteConfig;
+    setReplicationDestination(remoteConfig, replicaSuffixes, allProjects, replicationDelay);
   }
 
-  private void setAutoReload() throws IOException {
-    config.setBoolean("gerrit", null, "autoReload", true);
-    config.save();
-  }
-
-  private void setReplicationDestination(
-      FileBasedConfig config, List<String> replicaSuffixes, Optional<String> project)
-      throws IOException {
-
-    List<String> replicaUrls =
-        replicaSuffixes.stream()
-            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
-            .collect(toList());
-    config.setStringList("remote", null, "url", replicaUrls);
-    config.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY);
-    project.ifPresent(prj -> config.setString("remote", null, "projects", prj));
-
-    config.save();
-  }
-
-  private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
-    WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
-  }
-
-  private void reloadConfig() {
-    getAutoReloadConfigDecoratorInstance().reload();
-  }
-
-  private AutoReloadConfigDecorator getAutoReloadConfigDecoratorInstance() {
-    return getInstance(AutoReloadConfigDecorator.class);
-  }
-
-  private <T> T getInstance(Class<T> classObj) {
-    return plugin.getSysInjector().getInstance(classObj);
-  }
-
-  private Project.NameKey createTestProject(String name) throws Exception {
-    return projectOperations.newProject().name(name).create();
-  }
-
-  @SuppressWarnings(
-      "SynchronizeOnNonFinalField") // tasksStorage is non-final but only set in setUpTestPlugin()
-  private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) {
+  private List<ReplicateRefUpdate> listWaitingTasks(String refRegex) {
     Pattern refmaskPattern = Pattern.compile(refRegex);
-    synchronized (tasksStorage) {
-      return Stream.concat(tasksStorage.listWaiting().stream(), tasksStorage.listRunning().stream())
-          .filter(task -> refmaskPattern.matcher(task.ref).matches())
-          .collect(toList());
-    }
-  }
-
-  public void cleanupReplicationTasks() throws IOException {
-    cleanupReplicationTasks(storagePath);
-  }
-
-  private void cleanupReplicationTasks(Path basePath) throws IOException {
-    try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) {
-      for (Path path : files) {
-        if (Files.isDirectory(path)) {
-          cleanupReplicationTasks(path);
-        } else {
-          path.toFile().delete();
-        }
-      }
-    }
+    return tasksStorage.listWaiting().stream()
+        .filter(task -> refmaskPattern.matcher(task.ref).matches())
+        .collect(toList());
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index ecc71ba..2a7ff19 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -17,14 +17,10 @@
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.gerrit.testing.GerritJUnit.assertThrows;
 import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.NO_OP;
-import static java.util.stream.Collectors.toList;
 
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.PushOneCommit.Result;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.extensions.api.changes.NotifyHandling;
@@ -32,40 +28,32 @@
 import com.google.gerrit.extensions.common.ProjectInfo;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import com.google.inject.Key;
 import com.googlesource.gerrit.plugins.replication.Destination.QueueInfo;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
-import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
 import java.util.Optional;
 import java.util.function.Supplier;
-import java.util.regex.Pattern;
-import java.util.stream.Stream;
 import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.revwalk.RevCommit;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.revwalk.RevWalk;
 import org.eclipse.jgit.transport.URIish;
-import org.eclipse.jgit.util.FS;
 import org.junit.Test;
 
 @UseLocalDisk
 @TestPlugin(
     name = "replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
-public class ReplicationIT extends LightweightPluginDaemonTest {
-  private static final Optional<String> ALL_PROJECTS = Optional.empty();
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+public class ReplicationIT extends ReplicationDaemon {
   private static final int TEST_REPLICATION_DELAY = 1;
   private static final int TEST_REPLICATION_RETRY = 1;
   private static final int TEST_REPLICATION_MAX_RETRIES = 1;
@@ -76,35 +64,21 @@
       Duration.ofSeconds(
           (TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) * TEST_REPLICATION_MAX_RETRIES
               + 10);
-  private static final int TEST_PROJECT_CREATION_SECONDS = 10;
 
-  private static final Duration TEST_NEW_PROJECT_TIMEOUT =
-      Duration.ofSeconds(
-          (TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) + TEST_PROJECT_CREATION_SECONDS);
-
-  @Inject private SitePaths sitePaths;
-  @Inject private ProjectOperations projectOperations;
   @Inject private DynamicSet<ProjectDeletedListener> deletedListeners;
   private DestinationsCollection destinationCollection;
   private Path pluginDataDir;
-  private Path gitPath;
   private Path storagePath;
-  private FileBasedConfig config;
   private ReplicationConfig replicationConfig;
   private ReplicationTasksStorage tasksStorage;
 
   @Override
   public void setUpTestPlugin() throws Exception {
-    gitPath = sitePaths.site_path.resolve("git");
-
-    config =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    initConfig();
     setReplicationDestination(
         "remote1",
         "suffix1",
         Optional.of("not-used-project")); // Simulates a full replication.config initialization
-    config.save();
-
     super.setUpTestPlugin();
 
     pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
@@ -231,78 +205,55 @@
   }
 
   @Test
-  public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
-    List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
-    createTestProject(project + "replica1");
-    createTestProject(project + "replica2");
-
-    FileBasedConfig dest1 = setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
-    FileBasedConfig dest2 = setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
-    dest1.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
-    dest2.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
-    dest1.save();
-    dest2.save();
-    reloadConfig();
-
-    createChange();
-
-    assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
-
-    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
-    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
-  }
-
-  @Test
-  public void shouldCreateOneReplicationTaskWhenSchedulingRepoFullSync() throws Exception {
-    createTestProject(project + "replica");
-
-    setReplicationDestination("foo", "replica", ALL_PROJECTS);
-    reloadConfig();
-
-    plugin
-        .getSysInjector()
-        .getInstance(ReplicationQueue.class)
-        .scheduleFullSync(project, null, new ReplicationState(NO_OP), true);
-
-    assertThat(listIncompleteTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
-  }
-
-  @Test
   public void shouldMatchTemplatedURL() throws Exception {
-    createTestProject(project + "replica");
+    Project.NameKey targetProject = createTestProject(project + "replica");
 
     setReplicationDestination("foo", "replica", ALL_PROJECTS);
     reloadConfig();
 
+    String newRef = "refs/heads/newForTest";
+    ObjectId newRefTip = createNewBranchWithoutPush("refs/heads/master", newRef);
+
     String urlMatch = gitPath.resolve("${name}" + "replica" + ".git").toString();
-    String expectedURI = gitPath.resolve(project + "replica" + ".git").toString();
 
     plugin
         .getSysInjector()
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), true);
 
-    assertThat(listIncompleteTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
-    streamIncompleteTasks().forEach((task) -> assertThat(task.uri).isEqualTo(expectedURI));
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, newRef) != null);
+
+      Ref targetBranchRef = getRef(repo, newRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(newRefTip);
+    }
   }
 
   @Test
   public void shouldMatchRealURL() throws Exception {
-    createTestProject(project + "replica");
+    Project.NameKey targetProject = createTestProject(project + "replica");
 
     setReplicationDestination("foo", "replica", ALL_PROJECTS);
     reloadConfig();
 
+    String newRef = "refs/heads/newForTest";
+    ObjectId newRefTip = createNewBranchWithoutPush("refs/heads/master", newRef);
+
     String urlMatch = gitPath.resolve(project + "replica" + ".git").toString();
-    String expectedURI = urlMatch;
 
     plugin
         .getSysInjector()
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), true);
 
-    assertThat(listIncompleteTasks()).hasSize(1);
-    streamIncompleteTasks().forEach((task) -> assertThat(task.uri).isEqualTo(expectedURI));
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, newRef) != null);
+
+      Ref targetBranchRef = getRef(repo, newRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(newRefTip);
+    }
   }
 
   @Test
@@ -348,16 +299,12 @@
     input.revision = master;
     gApi.projects().name(project.get()).branch(branchToDelete).create(input);
 
-    assertThat(listIncompleteTasks("refs/heads/(todelete|master)")).hasSize(2);
-
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, branchToDelete) != null);
     }
 
     gApi.projects().name(project.get()).branch(branchToDelete).delete();
 
-    assertThat(listIncompleteTasks(branchToDelete)).hasSize(1);
-
     try (Repository repo = repoManager.openRepository(targetProject)) {
       if (mirror) {
         waitUntil(() -> checkedGetRef(repo, branchToDelete) == null);
@@ -441,19 +388,7 @@
     }
   }
 
-  @Test
-  public void shouldCleanupTasksAfterNewProjectReplication() throws Exception {
-    setReplicationDestination("task_cleanup_project", "replica", ALL_PROJECTS);
-    config.setInt("remote", "task_cleanup_project", "replicationRetry", 0);
-    config.save();
-    reloadConfig();
-    assertThat(tasksStorage.listRunning()).hasSize(0);
-    Project.NameKey sourceProject = createTestProject("task_cleanup_project");
-
-    waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")));
-    waitUntil(() -> tasksStorage.listRunning().size() == 0);
-  }
-
+  // TODO: Move to ReplicationStorageIT
   @Test
   public void shouldCleanupBothTasksAndLocksAfterNewProjectReplication() throws Exception {
     setReplicationDestination("task_cleanup_locks_project", "replica", ALL_PROJECTS);
@@ -467,6 +402,7 @@
     waitUntil(() -> isTaskCleanedUp());
   }
 
+  // TODO: Move to ReplicationStorageIT
   @Test
   public void shouldCleanupBothTasksAndLocksAfterReplicationCancelledAfterMaxRetries()
       throws Exception {
@@ -505,118 +441,6 @@
     return pushOne == null ? false : pushOne.isRetrying();
   }
 
-  @Test
-  public void shouldFirePendingOnlyToRemainingUris() throws Exception {
-    String suffix1 = "replica1";
-    String suffix2 = "replica2";
-    Project.NameKey target1 = createTestProject(project + suffix1);
-    Project.NameKey target2 = createTestProject(project + suffix2);
-    String remote1 = "foo1";
-    String remote2 = "foo2";
-    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE, false);
-    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE, false);
-    reloadConfig();
-
-    String changeRef = createChange().getPatchSet().refName();
-
-    changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
-        .forEach(
-            (update) -> {
-              try {
-                UriUpdates uriUpdates = TestUriUpdates.create(update);
-                tasksStorage.start(uriUpdates);
-                tasksStorage.finish(uriUpdates);
-              } catch (URISyntaxException e) {
-              }
-            });
-
-    setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
-    setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
-    reloadConfig();
-
-    assertThat(changeReplicationTasksForRemote(changeRef, remote2).count()).isEqualTo(1);
-    assertThat(changeReplicationTasksForRemote(changeRef, remote1).count()).isEqualTo(0);
-
-    assertThat(isPushCompleted(target2, changeRef, TEST_TIMEOUT)).isEqualTo(true);
-    assertThat(isPushCompleted(target1, changeRef, TEST_TIMEOUT)).isEqualTo(false);
-  }
-
-  public boolean isPushCompleted(Project.NameKey project, String ref, Duration timeOut) {
-    try (Repository repo = repoManager.openRepository(project)) {
-      WaitUtil.waitUntil(() -> checkedGetRef(repo, ref) != null, timeOut);
-      return true;
-    } catch (InterruptedException e) {
-      return false;
-    } catch (Exception e) {
-      throw new RuntimeException("Cannot open repo for project" + project, e);
-    }
-  }
-
-  private Ref getRef(Repository repo, String branchName) throws IOException {
-    return repo.getRefDatabase().exactRef(branchName);
-  }
-
-  private Ref checkedGetRef(Repository repo, String branchName) {
-    try {
-      return repo.getRefDatabase().exactRef(branchName);
-    } catch (Exception e) {
-      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
-      return null;
-    }
-  }
-
-  private void setReplicationDestination(
-      String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
-    setReplicationDestination(
-        remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY, false);
-  }
-
-  private void setReplicationDestination(
-      String remoteName, String replicaSuffix, Optional<String> project, boolean mirror)
-      throws IOException {
-    setReplicationDestination(
-        remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY, mirror);
-  }
-
-  private void setReplicationDestination(
-      String remoteName,
-      String replicaSuffix,
-      Optional<String> project,
-      int replicationDelay,
-      boolean mirror)
-      throws IOException {
-    setReplicationDestination(
-        remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror);
-  }
-
-  private FileBasedConfig setReplicationDestination(
-      String remoteName, List<String> replicaSuffixes, Optional<String> project)
-      throws IOException {
-    return setReplicationDestination(
-        remoteName, replicaSuffixes, project, TEST_REPLICATION_DELAY, false);
-  }
-
-  private FileBasedConfig setReplicationDestination(
-      String remoteName,
-      List<String> replicaSuffixes,
-      Optional<String> project,
-      int replicationDelay,
-      boolean mirror)
-      throws IOException {
-    List<String> replicaUrls =
-        replicaSuffixes.stream()
-            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
-            .collect(toList());
-    config.setStringList("remote", remoteName, "url", replicaUrls);
-    config.setInt("remote", remoteName, "replicationDelay", replicationDelay);
-    config.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY);
-    config.setBoolean("remote", remoteName, "mirror", mirror);
-    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
-    config.setBoolean("gerrit", null, "autoReload", true);
-    config.save();
-    return config;
-  }
-
   private void setProjectDeletionReplication(String remoteName, boolean replicateProjectDeletion)
       throws IOException {
     config.setBoolean("remote", remoteName, "replicateProjectDeletions", replicateProjectDeletion);
@@ -627,10 +451,6 @@
     WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
   }
 
-  private void reloadConfig() {
-    getAutoReloadConfigDecoratorInstance().reload();
-  }
-
   private void shutdownDestinations() {
     getInstance(DestinationsCollection.class).shutdown();
   }
@@ -643,51 +463,33 @@
     getReplicationQueueInstance().stop();
   }
 
-  private AutoReloadConfigDecorator getAutoReloadConfigDecoratorInstance() {
-    return getInstance(AutoReloadConfigDecorator.class);
-  }
-
   private ReplicationQueue getReplicationQueueInstance() {
     return getInstance(ReplicationQueue.class);
   }
 
-  private <T> T getInstance(Class<T> classObj) {
-    return plugin.getSysInjector().getInstance(classObj);
+  private ObjectId createNewBranchWithoutPush(String fromBranch, String newBranch)
+      throws Exception {
+    try (Repository repo = repoManager.openRepository(project);
+        RevWalk walk = new RevWalk(repo)) {
+      Ref ref = repo.exactRef(fromBranch);
+      RevCommit tip = null;
+      if (ref != null) {
+        tip = walk.parseCommit(ref.getObjectId());
+      }
+      RefUpdate update = repo.updateRef(newBranch);
+      update.setNewObjectId(tip);
+      update.update(walk);
+      return update.getNewObjectId();
+    }
   }
 
-  private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
-      String changeRef, String remote) {
-    return changeReplicationTasksForRemote(streamIncompleteTasks(), changeRef, remote);
-  }
-
-  private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
-      Stream<ReplicateRefUpdate> updates, String changeRef, String remote) {
-    return updates
-        .filter(task -> changeRef.equals(task.ref))
-        .filter(task -> remote.equals(task.remote));
-  }
-
-  private Project.NameKey createTestProject(String name) throws Exception {
-    return projectOperations.newProject().name(name).create();
-  }
-
-  private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) {
-    Pattern refmaskPattern = Pattern.compile(refRegex);
-    return streamIncompleteTasks()
-        .filter(task -> refmaskPattern.matcher(task.ref).matches())
-        .collect(toList());
-  }
-
-  private List<ReplicateRefUpdate> listIncompleteTasks() {
-    return streamIncompleteTasks().collect(toList());
-  }
-
-  @SuppressWarnings(
-      "SynchronizeOnNonFinalField") // tasksStorage is non-final but only set in setUpTestPlugin()
-  private Stream<ReplicateRefUpdate> streamIncompleteTasks() {
-    synchronized (tasksStorage) {
-      return Stream.concat(
-          tasksStorage.listWaiting().stream(), tasksStorage.listRunning().stream());
+  private boolean isTaskCleanedUp() {
+    Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates");
+    Path runningUpdates = refUpdates.resolve("running");
+    try {
+      return Files.list(runningUpdates).count() == 0;
+    } catch (IOException e) {
+      throw new RuntimeException(e.getMessage(), e);
     }
   }
 
@@ -706,22 +508,4 @@
       }
     }
   }
-
-  private boolean isTaskCleanedUp() {
-    Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates");
-    Path runningUpdates = refUpdates.resolve("running");
-    try {
-      return Files.list(runningUpdates).count() == 0;
-    } catch (IOException e) {
-      throw new RuntimeException(e.getMessage(), e);
-    }
-  }
-
-  private boolean nonEmptyProjectExists(Project.NameKey name) {
-    try (Repository r = repoManager.openRepository(name)) {
-      return !r.getAllRefsByPeeledObjectId().isEmpty();
-    } catch (Exception e) {
-      return false;
-    }
-  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
new file mode 100644
index 0000000..88b8f87
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -0,0 +1,312 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.NO_OP;
+import static java.util.stream.Collectors.toList;
+
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import org.junit.Test;
+
+/**
+ * The tests in this class aim to ensure events are correctly written and read from storage. They
+ * typically do this by setting up replication destinations with long delays, performing actions
+ * that are expected to write to storage, reloading the configuration (which should read from
+ * storage), and then confirming the actions complete as expected.
+ */
+@UseLocalDisk
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationStorageIT extends ReplicationDaemon {
+  private static final int TEST_TASK_FINISH_SECONDS = 1;
+  protected static final Duration TEST_TASK_FINISH_TIMEOUT =
+      Duration.ofSeconds(TEST_TASK_FINISH_SECONDS);
+  protected ReplicationTasksStorage tasksStorage;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    initConfig();
+    setReplicationDestination(
+        "remote1",
+        "suffix1",
+        Optional.of("not-used-project")); // Simulates a full replication.config initialization
+    super.setUpTestPlugin();
+    tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+  }
+
+  @Test
+  public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
+    List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+    createTestProject(project + "replica1");
+    createTestProject(project + "replica2");
+
+    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    createChange();
+
+    assertThat(listWaitingReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+  }
+
+  @Test
+  public void shouldCreateOneReplicationTaskWhenSchedulingRepoFullSync() throws Exception {
+    createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    plugin
+        .getSysInjector()
+        .getInstance(ReplicationQueue.class)
+        .scheduleFullSync(project, null, new ReplicationState(NO_OP), false);
+
+    assertThat(listWaitingReplicationTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
+  }
+
+  @Test
+  public void shouldFirePendingOnlyToIncompleteUri() throws Exception {
+    String suffix1 = "replica1";
+    String suffix2 = "replica2";
+    String remote1 = "foo1";
+    String remote2 = "foo2";
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String changeRef = createChange().getPatchSet().refName();
+    changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
+        .forEach(
+            (update) -> {
+              try {
+                UriUpdates uriUpdates = TestUriUpdates.create(update);
+                tasksStorage.start(uriUpdates);
+                tasksStorage.finish(uriUpdates);
+              } catch (URISyntaxException e) {
+              }
+            });
+    reloadConfig();
+
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef, remote2).count()).isEqualTo(1);
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef, remote1).count()).isEqualTo(0);
+  }
+
+  @Test
+  public void shouldFireAndCompletePendingOnlyToIncompleteUri() throws Exception {
+    String suffix1 = "replica1";
+    String suffix2 = "replica2";
+    Project.NameKey target1 = createTestProject(project + suffix1);
+    Project.NameKey target2 = createTestProject(project + suffix2);
+    String remote1 = "foo1";
+    String remote2 = "foo2";
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String changeRef = createChange().getPatchSet().refName();
+    changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
+        .forEach(
+            (update) -> {
+              try {
+                UriUpdates uriUpdates = TestUriUpdates.create(update);
+                tasksStorage.start(uriUpdates);
+                tasksStorage.finish(uriUpdates);
+              } catch (URISyntaxException e) {
+              }
+            });
+
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
+    reloadConfig();
+
+    assertThat(isPushCompleted(target2, changeRef, TEST_PUSH_TIMEOUT)).isEqualTo(true);
+    assertThat(isPushCompleted(target1, changeRef, TEST_PUSH_TIMEOUT)).isEqualTo(false);
+  }
+
+  @Test
+  public void shouldFirePendingChangeRefs() throws Exception {
+    String suffix1 = "replica1";
+    String suffix2 = "replica2";
+    String remote1 = "foo1";
+    String remote2 = "foo2";
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String changeRef1 = createChange().getPatchSet().refName();
+    String changeRef2 = createChange().getPatchSet().refName();
+    reloadConfig();
+
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef1, remote1).count()).isEqualTo(1);
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef1, remote2).count()).isEqualTo(1);
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef2, remote1).count()).isEqualTo(1);
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef2, remote2).count()).isEqualTo(1);
+  }
+
+  @Test
+  public void shouldFireAndCompletePendingChangeRefs() throws Exception {
+    String suffix1 = "replica1";
+    String suffix2 = "replica2";
+    Project.NameKey target1 = createTestProject(project + suffix1);
+    Project.NameKey target2 = createTestProject(project + suffix2);
+    String remote1 = "foo1";
+    String remote2 = "foo2";
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String changeRef1 = createChange().getPatchSet().refName();
+    String changeRef2 = createChange().getPatchSet().refName();
+    Map<Project.NameKey, String> refsByProject = new HashMap<>();
+    refsByProject.put(target1, changeRef1);
+    refsByProject.put(target2, changeRef1);
+    refsByProject.put(target1, changeRef2);
+    refsByProject.put(target2, changeRef2);
+
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
+    reloadConfig();
+
+    // Wait for completion within the time 2 pushes should take because each remote only has 1
+    // thread and needs to push 2 events
+    assertThat(isPushCompleted(refsByProject, TEST_PUSH_TIMEOUT.plus(TEST_PUSH_TIMEOUT)))
+        .isEqualTo(true);
+  }
+
+  @Test
+  public void shouldMatchTemplatedUrl() throws Exception {
+    createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String urlMatch = gitPath.resolve("${name}" + "replica" + ".git").toString();
+    String expectedURI = gitPath.resolve(project + "replica" + ".git").toString();
+
+    plugin
+        .getSysInjector()
+        .getInstance(ReplicationQueue.class)
+        .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false);
+
+    assertThat(tasksStorage.listWaiting()).hasSize(1);
+    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.listWaiting()) {
+      assertThat(task.uri).isEqualTo(expectedURI);
+      assertThat(task.ref).isEqualTo(PushOne.ALL_REFS);
+    }
+  }
+
+  @Test
+  public void shouldMatchRealUrl() throws Exception {
+    createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String urlMatch = gitPath.resolve(project + "replica" + ".git").toString();
+    String expectedURI = urlMatch;
+
+    plugin
+        .getSysInjector()
+        .getInstance(ReplicationQueue.class)
+        .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false);
+
+    assertThat(tasksStorage.listWaiting()).hasSize(1);
+    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.listWaiting()) {
+      assertThat(task.uri).isEqualTo(expectedURI);
+      assertThat(task.ref).isEqualTo(PushOne.ALL_REFS);
+    }
+  }
+
+  @Test
+  public void shouldReplicateBranchDeletionWhenMirror() throws Exception {
+    replicateBranchDeletion(true);
+  }
+
+  @Test
+  public void shouldNotReplicateBranchDeletionWhenNotMirror() throws Exception {
+    replicateBranchDeletion(false);
+  }
+
+  private void replicateBranchDeletion(boolean mirror) throws Exception {
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Project.NameKey targetProject = createTestProject(project + "replica");
+    String branchToDelete = "refs/heads/todelete";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(project.get()).branch(branchToDelete).create(input);
+    isPushCompleted(targetProject, branchToDelete, TEST_PUSH_TIMEOUT);
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE, mirror);
+    reloadConfig();
+
+    gApi.projects().name(project.get()).branch(branchToDelete).delete();
+
+    assertThat(listWaitingReplicationTasks(branchToDelete)).hasSize(1);
+  }
+
+  @Test
+  public void shouldCleanupTasksAfterNewProjectReplication() throws Exception {
+    setReplicationDestination("task_cleanup_project", "replica", ALL_PROJECTS);
+    config.setInt("remote", "task_cleanup_project", "replicationRetry", 0);
+    config.save();
+    reloadConfig();
+    assertThat(tasksStorage.listRunning()).hasSize(0);
+    Project.NameKey sourceProject = createTestProject("task_cleanup_project");
+
+    WaitUtil.waitUntil(
+        () -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")),
+        TEST_NEW_PROJECT_TIMEOUT);
+    WaitUtil.waitUntil(() -> tasksStorage.listRunning().size() == 0, TEST_TASK_FINISH_TIMEOUT);
+  }
+
+  private Stream<ReplicateRefUpdate> waitingChangeReplicationTasksForRemote(
+      String changeRef, String remote) {
+    return tasksStorage.listWaiting().stream()
+        .filter(task -> changeRef.equals(task.ref))
+        .filter(task -> remote.equals(task.remote));
+  }
+
+  private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
+      Stream<ReplicateRefUpdate> updates, String changeRef, String remote) {
+    return updates
+        .filter(task -> changeRef.equals(task.ref))
+        .filter(task -> remote.equals(task.remote));
+  }
+
+  private List<ReplicateRefUpdate> listWaitingReplicationTasks(String refRegex) {
+    Pattern refmaskPattern = Pattern.compile(refRegex);
+    return tasksStorage.listWaiting().stream()
+        .filter(task -> refmaskPattern.matcher(task.ref).matches())
+        .collect(toList());
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
new file mode 100644
index 0000000..42c0914
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
@@ -0,0 +1,203 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertContainsExactly;
+import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertNoIncompleteTasks;
+
+import com.google.common.jimfs.Configuration;
+import com.google.common.jimfs.Jimfs;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystem;
+import java.nio.file.Path;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicationTasksStorageMPTest {
+  protected static final String PROJECT = "myProject";
+  protected static final String REF = "myRef";
+  protected static final String REMOTE = "myDest";
+  protected static final URIish URISH =
+      ReplicationTasksStorageTest.getUrish("http://example.com/" + PROJECT + ".git");
+  protected static final ReplicationTasksStorage.ReplicateRefUpdate REF_UPDATE =
+      new ReplicationTasksStorage.ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE);
+  protected static final UriUpdates URI_UPDATES = getUriUpdates(REF_UPDATE);
+
+  protected ReplicationTasksStorage nodeA;
+  protected ReplicationTasksStorage nodeB;
+  protected ReplicationTasksStorage persistedView;
+  protected FileSystem fileSystem;
+
+  @Before
+  public void setUp() throws Exception {
+    fileSystem = Jimfs.newFileSystem(Configuration.unix());
+    Path storageSite = fileSystem.getPath("replication_site");
+    nodeA = new ReplicationTasksStorage(storageSite);
+    nodeB = new ReplicationTasksStorage(storageSite);
+    persistedView = new ReplicationTasksStorage(storageSite);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    persistedView = null;
+    nodeA = null;
+    nodeB = null;
+    fileSystem.close();
+  }
+
+  @Test
+  public void sameTaskCreatedByOtherNodeIsDeduped() {
+    nodeA.create(REF_UPDATE);
+
+    nodeB.create(REF_UPDATE);
+    assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+  }
+
+  @Test
+  public void waitingTaskCanBeCompletedByOtherNode() {
+    nodeA.create(REF_UPDATE);
+
+    nodeB.start(URI_UPDATES);
+    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+
+    nodeB.finish(URI_UPDATES);
+    assertNoIncompleteTasks(persistedView);
+  }
+
+  @Test
+  public void resetTaskCanBeCompletedByOtherNode() {
+    nodeA.create(REF_UPDATE);
+    nodeA.start(URI_UPDATES);
+
+    nodeA.reset(URI_UPDATES);
+    assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+
+    nodeB.start(URI_UPDATES);
+    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+
+    nodeB.finish(URI_UPDATES);
+    assertNoIncompleteTasks(persistedView);
+  }
+
+  @Test
+  public void resetTaskCanBeResetAndCompletedByOtherNode() {
+    nodeA.create(REF_UPDATE);
+    nodeA.start(URI_UPDATES);
+    nodeA.reset(URI_UPDATES);
+    nodeB.start(URI_UPDATES);
+
+    nodeB.reset(URI_UPDATES);
+    assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+
+    nodeB.start(URI_UPDATES);
+    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+
+    nodeB.finish(URI_UPDATES);
+    assertNoIncompleteTasks(persistedView);
+  }
+
+  @Test
+  public void resetTaskCanBeResetByOtherNodeAndCompletedByOriginalNode() {
+    nodeA.create(REF_UPDATE);
+    nodeA.start(URI_UPDATES);
+    nodeA.reset(URI_UPDATES);
+    nodeB.start(URI_UPDATES);
+    nodeB.reset(URI_UPDATES);
+
+    nodeA.start(URI_UPDATES);
+    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+
+    nodeA.finish(URI_UPDATES);
+    assertNoIncompleteTasks(persistedView);
+  }
+
+  @Test
+  public void canBeResetAllAndCompletedByOtherNode() {
+    nodeA.create(REF_UPDATE);
+    nodeA.start(URI_UPDATES);
+
+    nodeB.resetAll();
+    assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+
+    nodeB.start(URI_UPDATES);
+    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+
+    nodeA.finish(URI_UPDATES);
+    // Bug: https://crbug.com/gerrit/12973
+    // assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+
+    nodeB.finish(URI_UPDATES);
+    assertNoIncompleteTasks(persistedView);
+  }
+
+  @Test
+  public void canBeResetAllAndCompletedByOtherNodeFastOriginalNode() {
+    nodeA.create(REF_UPDATE);
+    nodeA.start(URI_UPDATES);
+    nodeB.resetAll();
+
+    nodeA.finish(URI_UPDATES);
+    assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+
+    nodeB.start(URI_UPDATES);
+    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+
+    nodeB.finish(URI_UPDATES);
+    assertNoIncompleteTasks(persistedView);
+  }
+
+  @Test
+  public void canBeResetAllAndCompletedByOtherNodeSlowOriginalNode() {
+    nodeA.create(REF_UPDATE);
+    nodeA.start(URI_UPDATES);
+    nodeB.resetAll();
+
+    nodeB.start(URI_UPDATES);
+    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+
+    nodeB.finish(URI_UPDATES);
+    ReplicationTasksStorageTest.assertNoIncompleteTasks(persistedView);
+
+    nodeA.finish(URI_UPDATES);
+    ReplicationTasksStorageTest.assertNoIncompleteTasks(persistedView);
+  }
+
+  @Test
+  public void multipleNodesCanReplicateSameRef() {
+    nodeA.create(REF_UPDATE);
+    nodeA.start(URI_UPDATES);
+    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+
+    nodeA.finish(URI_UPDATES);
+    assertNoIncompleteTasks(persistedView);
+
+    nodeB.create(REF_UPDATE);
+    nodeB.start(URI_UPDATES);
+    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+
+    nodeB.finish(URI_UPDATES);
+    assertNoIncompleteTasks(persistedView);
+  }
+
+  public static UriUpdates getUriUpdates(ReplicationTasksStorage.ReplicateRefUpdate refUpdate) {
+    try {
+      return TestUriUpdates.create(refUpdate);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("Cannot instantiate UriUpdates object", e);
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
new file mode 100644
index 0000000..23d6759
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
@@ -0,0 +1,194 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTaskTest.assertIsRunning;
+import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTaskTest.assertIsWaiting;
+import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTaskTest.assertNotRunning;
+import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTaskTest.assertNotWaiting;
+
+import com.google.common.jimfs.Configuration;
+import com.google.common.jimfs.Jimfs;
+import java.nio.file.FileSystem;
+import java.nio.file.Path;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicationTasksStorageTaskMPTest {
+  protected static final String PROJECT = "myProject";
+  protected static final String REF = "myRef";
+  protected static final String REMOTE = "myDest";
+  protected static final URIish URISH =
+      ReplicationTasksStorageTest.getUrish("http://example.com/" + PROJECT + ".git");
+  protected static final ReplicationTasksStorage.ReplicateRefUpdate REF_UPDATE =
+      new ReplicationTasksStorage.ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE);
+
+  protected FileSystem fileSystem;
+  protected Path storageSite;
+  protected ReplicationTasksStorage nodeA;
+  protected ReplicationTasksStorage nodeB;
+  protected ReplicationTasksStorage.Task taskA;
+  protected ReplicationTasksStorage.Task taskB;
+
+  @Before
+  public void setUp() throws Exception {
+    fileSystem = Jimfs.newFileSystem(Configuration.unix());
+    storageSite = fileSystem.getPath("replication_site");
+    nodeA = new ReplicationTasksStorage(storageSite);
+    nodeB = new ReplicationTasksStorage(storageSite);
+    taskA = nodeA.new Task(REF_UPDATE);
+    taskB = nodeB.new Task(REF_UPDATE);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fileSystem.close();
+  }
+
+  @Test
+  public void waitingTaskCanBeCompletedByOtherNode() {
+    taskA.create();
+
+    taskB.start();
+    assertIsRunning(taskA);
+
+    taskB.finish();
+    assertNotRunning(taskA);
+    assertNotWaiting(taskA);
+    assertNotRunning(taskB);
+    assertNotWaiting(taskB);
+  }
+
+  @Test
+  public void resetTaskCanBeCompletedByOtherNode() {
+    taskA.create();
+    taskA.start();
+
+    taskA.reset();
+    assertIsWaiting(taskA);
+
+    taskB.start();
+    assertIsRunning(taskA);
+    assertIsRunning(taskB);
+
+    taskB.finish();
+    assertNotRunning(taskA);
+    assertNotWaiting(taskA);
+    assertNotRunning(taskB);
+    assertNotWaiting(taskB);
+  }
+
+  @Test
+  public void retryCanBeRetriedAndCompletedByOtherNode() {
+    taskA.create();
+    taskA.start();
+    taskA.reset();
+    taskB.start();
+
+    taskB.reset();
+    assertIsWaiting(taskA);
+
+    taskB.start();
+    assertIsRunning(taskA);
+
+    taskB.finish();
+    assertNotRunning(taskA);
+    assertNotWaiting(taskA);
+    assertNotRunning(taskB);
+    assertNotWaiting(taskB);
+  }
+
+  @Test
+  public void retryCanBeRetriedOtherNodeAndCompletedByOriginalNode() {
+    taskA.create();
+    taskA.start();
+    taskA.reset();
+    taskB.start();
+    taskB.reset();
+
+    taskA.start();
+    assertIsRunning(taskA);
+
+    taskA.finish();
+    assertNotRunning(taskA);
+    assertNotWaiting(taskA);
+    assertNotRunning(taskB);
+    assertNotWaiting(taskB);
+  }
+
+  @Test
+  public void canBeResetAllAndCompletedByOtherNode() {
+    taskA.create();
+    taskA.start();
+
+    nodeB.resetAll();
+    assertIsWaiting(taskA);
+
+    taskB.create();
+    taskB.start();
+    assertIsRunning(taskA);
+
+    taskA.finish();
+    //  Bug: https://crbug.com/gerrit/12973
+    // assertIsRunning(taskB);
+
+    taskB.finish();
+    assertNotRunning(taskA);
+    assertNotWaiting(taskA);
+    assertNotRunning(taskB);
+    assertNotWaiting(taskB);
+  }
+
+  @Test
+  public void resetAllAndCompletedByOtherNodeWhenTaskAFinishesBeforeTaskB() {
+    taskA.create();
+    taskA.start();
+    nodeB.resetAll();
+
+    taskA.finish();
+    assertIsWaiting(taskA);
+
+    taskB.start();
+    assertIsRunning(taskA);
+
+    taskB.finish();
+    assertNotRunning(taskA);
+    assertNotWaiting(taskA);
+    assertNotRunning(taskB);
+    assertNotWaiting(taskB);
+  }
+
+  @Test
+  public void resetAllAndCompletedByOtherNodeWhenTaskAFinishesAfterTaskB() {
+    taskA.create();
+    taskA.start();
+    nodeB.resetAll();
+
+    taskB.start();
+    assertIsRunning(taskA);
+
+    taskB.finish();
+    assertNotWaiting(taskA);
+    assertNotRunning(taskA);
+
+    taskA.finish();
+    assertNotWaiting(taskA);
+    assertNotRunning(taskA);
+    assertNotRunning(taskB);
+    assertNotWaiting(taskB);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
index 178f125..d9fbbe5 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
@@ -346,23 +346,23 @@
     assertIsWaiting(persistedView);
   }
 
-  private void assertIsWaiting(Task task) {
+  protected static void assertIsWaiting(Task task) {
     assertTrue(task.isWaiting());
   }
 
-  private void assertNotWaiting(Task task) {
+  protected static void assertNotWaiting(Task task) {
     assertFalse(task.isWaiting());
   }
 
-  private void assertIsRunning(Task task) {
+  protected static void assertIsRunning(Task task) {
     assertTrue(whiteBoxIsRunning(task));
   }
 
-  private void assertNotRunning(Task task) {
+  protected static void assertNotRunning(Task task) {
     assertFalse(whiteBoxIsRunning(task));
   }
 
-  private boolean whiteBoxIsRunning(Task task) {
+  private static boolean whiteBoxIsRunning(Task task) {
     return Files.exists(task.running);
   }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
index faabc90..141f739 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -373,12 +373,12 @@
     assertThat(storage.listRunning()).isEmpty();
   }
 
-  private void assertNoIncompleteTasks(ReplicationTasksStorage storage) {
+  protected static void assertNoIncompleteTasks(ReplicationTasksStorage storage) {
     assertThat(storage.listWaiting()).isEmpty();
     assertThat(storage.listRunning()).isEmpty();
   }
 
-  private void assertContainsExactly(
+  protected static void assertContainsExactly(
       List<ReplicateRefUpdate> all, ReplicateRefUpdate... refUpdates) {
     assertThat(all).hasSize(refUpdates.length);
     for (int i = 0; i < refUpdates.length; i++) {
@@ -386,7 +386,7 @@
     }
   }
 
-  private boolean equals(ReplicateRefUpdate one, ReplicateRefUpdate two) {
+  public static boolean equals(ReplicateRefUpdate one, ReplicateRefUpdate two) {
     return (one == null && two == null)
         || (one != null
             && two != null