Merge "ReplicationTasksStorage: Add multi-primary unit tests" into stable-2.16
diff --git a/BUILD b/BUILD
index 50615d8..72a3fc8 100644
--- a/BUILD
+++ b/BUILD
@@ -23,6 +23,18 @@
     name = "replication_tests",
     srcs = glob([
         "src/test/java/**/*Test.java",
+    ]),
+    tags = ["replication"],
+    visibility = ["//visibility:public"],
+    deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
+        ":replication__plugin",
+        ":replication_util",
+    ],
+)
+
+junit_tests(
+    name = "replication_it",
+    srcs = glob([
         "src/test/java/**/*IT.java",
     ]),
     tags = ["replication"],
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index e704978..22b6cce 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -17,7 +17,6 @@
 import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
 import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
@@ -99,7 +98,9 @@
     if (!running) {
       config.startup(workQueue);
       running = true;
-      firePendingEvents();
+      Thread t = new Thread(this::firePendingEvents, "firePendingEvents");
+      t.setDaemon(true);
+      t.start();
       fireBeforeStartupEvents();
     }
   }
@@ -121,11 +122,6 @@
     return replaying;
   }
 
-  void scheduleFullSync(Project.NameKey project, String urlMatch, ReplicationState state) {
-    scheduleFullSync(project, urlMatch, state, false);
-  }
-
-  @VisibleForTesting
   public void scheduleFullSync(
       Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
     if (!running) {
@@ -216,6 +212,8 @@
           repLog.error("Encountered malformed URI for persisted event %s", t);
         }
       }
+    } catch (Throwable e) {
+      repLog.error("Unexpected error while firing pending events", e);
     } finally {
       replaying = false;
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index bfb7e95..fd92e27 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -37,8 +37,6 @@
 public class ReplicationTasksStorage {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
-  private boolean disableDeleteForTesting;
-
   public static class ReplicateRefUpdate {
     public final String project;
     public final String ref;
@@ -91,21 +89,11 @@
     return eventKey;
   }
 
-  @VisibleForTesting
-  public void disableDeleteForTesting(boolean deleteDisabled) {
-    this.disableDeleteForTesting = deleteDisabled;
-  }
-
   public void delete(ReplicateRefUpdate r) {
     String key = r.project + "\n" + r.ref + "\n" + r.uri + "\n" + r.remote;
     String taskKey = sha1(key).name();
     Path file = refUpdates().resolve(taskKey);
 
-    if (disableDeleteForTesting) {
-      logger.atFine().log("DELETE %s (%s:%s => %s) DISABLED", file, r.project, r.ref, r.uri);
-      return;
-    }
-
     try {
       logger.atFine().log("DELETE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
       Files.delete(file);
diff --git a/src/main/resources/Documentation/cmd-start.md b/src/main/resources/Documentation/cmd-start.md
index 8291421..e12ec92 100644
--- a/src/main/resources/Documentation/cmd-start.md
+++ b/src/main/resources/Documentation/cmd-start.md
@@ -11,8 +11,7 @@
 ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start
   [--now]
   [--wait]
-  [--url <PATTERN>]
-  {--all | <PROJECT PATTERN> ...}
+  {--url <PATTERN> | [--url <PATTERN>] --all | [--url <PATTERN>] <PROJECT PATTERN> ...}
 ```
 
 DESCRIPTION
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
new file mode 100644
index 0000000..5e38570
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -0,0 +1,129 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+
+/**
+ * This class can be extended by any Replication*IT class and provides common setup and helper
+ * methods.
+ */
+@UseLocalDisk
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationDaemon extends LightweightPluginDaemonTest {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  protected static final Optional<String> ALL_PROJECTS = Optional.empty();
+
+  protected static final int TEST_REPLICATION_DELAY_SECONDS = 1;
+  protected static final int TEST_REPLICATION_RETRY_MINUTES = 1;
+  protected static final int TEST_PUSH_TIME_SECONDS = 1;
+  protected static final Duration TEST_PUSH_TIMEOUT =
+      Duration.ofSeconds(TEST_REPLICATION_DELAY_SECONDS + TEST_PUSH_TIME_SECONDS);
+
+  @Inject protected SitePaths sitePaths;
+  protected Path gitPath;
+  protected FileBasedConfig config;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    gitPath = sitePaths.site_path.resolve("git");
+    config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    config.save();
+    setReplicationDestination(
+        "remote1",
+        "suffix1",
+        Optional.of("not-used-project")); // Simulates a full replication.config initialization
+    super.setUpTestPlugin();
+  }
+
+  protected void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
+    setReplicationDestination(
+        remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_RETRY_MINUTES);
+  }
+
+  protected void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project, int replicationDelay)
+      throws IOException {
+    setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project, replicationDelay);
+  }
+
+  protected FileBasedConfig setReplicationDestination(
+      String remoteName,
+      List<String> replicaSuffixes,
+      Optional<String> project,
+      int replicationDelay)
+      throws IOException {
+    List<String> replicaUrls =
+        replicaSuffixes.stream()
+            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+            .collect(toList());
+    config.setStringList("remote", remoteName, "url", replicaUrls);
+    config.setInt("remote", remoteName, "replicationDelay", replicationDelay);
+    config.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY_MINUTES);
+    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+    config.save();
+    return config;
+  }
+
+  protected Project.NameKey createTestProject(String name) throws Exception {
+    return createProject(name);
+  }
+
+  protected boolean isPushCompleted(Project.NameKey project, String ref, Duration timeOut) {
+    try (Repository repo = repoManager.openRepository(project)) {
+      WaitUtil.waitUntil(() -> checkedGetRef(repo, ref) != null, timeOut);
+      return true;
+    } catch (InterruptedException e) {
+      return false;
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot open repo for project" + project, e);
+    }
+  }
+
+  protected Ref checkedGetRef(Repository repo, String branchName) {
+    try {
+      return repo.getRefDatabase().exactRef(branchName);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
+      return null;
+    }
+  }
+
+  protected void reloadConfig() {
+    plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).forceReload();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 553665d..706a2c6 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -17,83 +17,45 @@
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.gerrit.testing.GerritJUnit.assertThrows;
 import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.NO_OP;
-import static java.util.stream.Collectors.toList;
 
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.PushOneCommit.Result;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.extensions.api.changes.NotifyHandling;
 import com.google.gerrit.extensions.api.projects.BranchInput;
 import com.google.gerrit.extensions.common.ProjectInfo;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
-import com.google.inject.Key;
-import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
 import java.util.function.Supplier;
-import java.util.regex.Pattern;
-import java.util.stream.Stream;
 import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.revwalk.RevCommit;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.util.FS;
+import org.eclipse.jgit.revwalk.RevWalk;
 import org.junit.Test;
 
 @UseLocalDisk
 @TestPlugin(
     name = "replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
-public class ReplicationIT extends LightweightPluginDaemonTest {
-  private static final Optional<String> ALL_PROJECTS = Optional.empty();
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-  private static final int TEST_REPLICATION_DELAY = 1;
-  private static final int TEST_REPLICATION_RETRY = 1;
+public class ReplicationIT extends ReplicationDaemon {
+  private static final int TEST_PROJECT_CREATION_SECONDS = 10;
   private static final Duration TEST_TIMEOUT =
-      Duration.ofSeconds((TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) + 1);
+      Duration.ofSeconds(
+          (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60) + 1);
 
-  @Inject private SitePaths sitePaths;
+  private static final Duration TEST_NEW_PROJECT_TIMEOUT =
+      Duration.ofSeconds(
+          (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
+              + TEST_PROJECT_CREATION_SECONDS);
+
   @Inject private DynamicSet<ProjectDeletedListener> deletedListeners;
-  private Path pluginDataDir;
-  private Path gitPath;
-  private Path storagePath;
-  private FileBasedConfig config;
-  private ReplicationTasksStorage tasksStorage;
-
-  @Override
-  public void setUpTestPlugin() throws Exception {
-    gitPath = sitePaths.site_path.resolve("git");
-
-    config =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
-    setReplicationDestination(
-        "remote1",
-        "suffix1",
-        Optional.of("not-used-project")); // Simulates a full replication.config initialization
-    config.save();
-
-    super.setUpTestPlugin();
-
-    pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
-    storagePath = pluginDataDir.resolve("ref-updates");
-    tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
-    cleanupReplicationTasks();
-    tasksStorage.disableDeleteForTesting(true);
-  }
 
   @Test
   public void shouldReplicateNewProject() throws Exception {
@@ -102,9 +64,9 @@
 
     Project.NameKey sourceProject = createTestProject("foo");
 
-    assertThat(listReplicationTasks("refs/meta/config")).hasSize(1);
-
-    waitUntil(() -> nonEmptyProjectExists(new Project.NameKey(sourceProject + "replica")));
+    WaitUtil.waitUntil(
+        () -> nonEmptyProjectExists(new Project.NameKey(sourceProject + "replica.git")),
+        TEST_NEW_PROJECT_TIMEOUT);
 
     ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get();
     assertThat(replicaProject).isNotNull();
@@ -149,8 +111,6 @@
     RevCommit sourceCommit = pushResult.getCommit();
     String sourceRef = pushResult.getPatchSet().getRefName();
 
-    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1);
-
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
 
@@ -172,8 +132,6 @@
     input.revision = master;
     gApi.projects().name(project.get()).branch(newBranch).create(input);
 
-    assertThat(listReplicationTasks("refs/heads/(mybranch|master)")).hasSize(2);
-
     try (Repository repo = repoManager.openRepository(targetProject);
         Repository sourceRepo = repoManager.openRepository(project)) {
       waitUntil(() -> checkedGetRef(repo, newBranch) != null);
@@ -198,8 +156,6 @@
     RevCommit sourceCommit = pushResult.getCommit();
     String sourceRef = pushResult.getPatchSet().getRefName();
 
-    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
-
     try (Repository repo1 = repoManager.openRepository(targetProject1);
         Repository repo2 = repoManager.openRepository(targetProject2)) {
       waitUntil(
@@ -217,81 +173,55 @@
   }
 
   @Test
-  public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
-    List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
-    createTestProject("projectreplica1");
-    createTestProject("projectreplica2");
-
-    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
-    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
-    config.setInt("remote", "foo1", "replicationDelay", TEST_REPLICATION_DELAY * 100);
-    config.setInt("remote", "foo2", "replicationDelay", TEST_REPLICATION_DELAY * 100);
-    reloadConfig();
-
-    createChange();
-
-    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
-
-    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
-    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
-  }
-
-  @Test
-  public void shouldCreateOneReplicationTaskWhenSchedulingRepoFullSync() throws Exception {
-    createTestProject("projectreplica");
-
-    setReplicationDestination("foo", "replica", ALL_PROJECTS);
-    reloadConfig();
-
-    plugin
-        .getSysInjector()
-        .getInstance(ReplicationQueue.class)
-        .scheduleFullSync(project, null, new ReplicationState(NO_OP), true);
-
-    assertThat(listReplicationTasks(".*all.*")).hasSize(1);
-  }
-
-  @Test
   public void shouldMatchTemplatedURL() throws Exception {
-    createTestProject("projectreplica");
+    Project.NameKey targetProject = createTestProject("projectreplica");
 
     setReplicationDestination("foo", "replica", ALL_PROJECTS);
     reloadConfig();
 
+    String newRef = "refs/heads/newForTest";
+    ObjectId newRefTip = createNewBranchWithoutPush("refs/heads/master", newRef);
+
     String urlMatch = gitPath.resolve("${name}" + "replica" + ".git").toString();
-    String expectedURI = gitPath.resolve(project + "replica" + ".git").toString();
 
     plugin
         .getSysInjector()
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), true);
 
-    assertThat(listReplicationTasks(".*all.*")).hasSize(1);
-    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.list()) {
-      assertThat(task.uri).isEqualTo(expectedURI);
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, newRef) != null);
+
+      Ref targetBranchRef = getRef(repo, newRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(newRefTip);
     }
   }
 
   @Test
   public void shouldMatchRealURL() throws Exception {
-    createTestProject("projectreplica");
+    Project.NameKey targetProject = createTestProject("projectreplica");
 
     setReplicationDestination("foo", "replica", ALL_PROJECTS);
     reloadConfig();
 
+    String newRef = "refs/heads/newForTest";
+    ObjectId newRefTip = createNewBranchWithoutPush("refs/heads/master", newRef);
+
     String urlMatch = gitPath.resolve(project + "replica" + ".git").toString();
-    String expectedURI = urlMatch;
 
     plugin
         .getSysInjector()
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), true);
 
-    assertThat(listReplicationTasks(".*")).hasSize(1);
-    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.list()) {
-      assertThat(task.uri).isEqualTo(expectedURI);
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, newRef) != null);
+
+      Ref targetBranchRef = getRef(repo, newRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(newRefTip);
     }
-    assertThat(tasksStorage.list()).isNotEmpty();
   }
 
   @Test
@@ -363,99 +293,10 @@
     }
   }
 
-  @Test
-  public void shouldFirePendingOnlyToStoredUri() throws Exception {
-    String suffix1 = "replica1";
-    String suffix2 = "replica2";
-    Project.NameKey target1 = createTestProject("project" + suffix1);
-    Project.NameKey target2 = createTestProject("project" + suffix2);
-    String remote1 = "foo1";
-    String remote2 = "foo2";
-    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
-    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
-    reloadConfig();
-
-    String changeRef = createChange().getPatchSet().getRefName();
-
-    tasksStorage.disableDeleteForTesting(false);
-    changeReplicationTasksForRemote(changeRef, remote1).forEach(tasksStorage::delete);
-    tasksStorage.disableDeleteForTesting(true);
-
-    setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
-    setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
-    reloadConfig();
-
-    assertThat(changeReplicationTasksForRemote(changeRef, remote2).count()).isEqualTo(1);
-    assertThat(changeReplicationTasksForRemote(changeRef, remote1).count()).isEqualTo(0);
-
-    assertThat(isPushCompleted(target2, changeRef, TEST_TIMEOUT)).isEqualTo(true);
-    assertThat(isPushCompleted(target1, changeRef, TEST_TIMEOUT)).isEqualTo(false);
-  }
-
-  private Project.NameKey createTestProject(String name) throws Exception {
-    return createProject(name);
-  }
-
-  public boolean isPushCompleted(Project.NameKey project, String ref, Duration timeOut) {
-    try (Repository repo = repoManager.openRepository(project)) {
-      WaitUtil.waitUntil(() -> checkedGetRef(repo, ref) != null, timeOut);
-      return true;
-    } catch (InterruptedException e) {
-      return false;
-    } catch (Exception e) {
-      throw new RuntimeException("Cannot open repo for project" + project, e);
-    }
-  }
-
   private Ref getRef(Repository repo, String branchName) throws IOException {
     return repo.getRefDatabase().exactRef(branchName);
   }
 
-  private Ref checkedGetRef(Repository repo, String branchName) {
-    try {
-      return repo.getRefDatabase().exactRef(branchName);
-    } catch (Exception e) {
-      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
-      return null;
-    }
-  }
-
-  private void setReplicationDestination(
-      String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
-    setReplicationDestination(
-        remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY);
-  }
-
-  private void setReplicationDestination(
-      String remoteName, String replicaSuffix, Optional<String> project, int replicationDelay)
-      throws IOException {
-    setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project, replicationDelay);
-  }
-
-  private void setReplicationDestination(
-      String remoteName, List<String> replicaSuffixes, Optional<String> project)
-      throws IOException {
-    setReplicationDestination(remoteName, replicaSuffixes, project, TEST_REPLICATION_DELAY);
-  }
-
-  private void setReplicationDestination(
-      String remoteName,
-      List<String> replicaSuffixes,
-      Optional<String> project,
-      int replicationDelay)
-      throws IOException {
-
-    List<String> replicaUrls =
-        replicaSuffixes.stream()
-            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
-            .collect(toList());
-    config.setStringList("remote", remoteName, "url", replicaUrls);
-    config.setInt("remote", remoteName, "replicationDelay", replicationDelay);
-    config.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY);
-    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
-    config.save();
-  }
-
   private void setProjectDeletionReplication(String remoteName, boolean replicateProjectDeletion)
       throws IOException {
     config.setBoolean("remote", remoteName, "replicateProjectDeletions", replicateProjectDeletion);
@@ -466,36 +307,10 @@
     WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
   }
 
-  private void reloadConfig() {
-    plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).forceReload();
-  }
-
   private void shutdownConfig() {
     plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).shutdown();
   }
 
-  private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
-      String changeRef, String remote) {
-    return tasksStorage.list().stream()
-        .filter(task -> changeRef.equals(task.ref))
-        .filter(task -> remote.equals(task.remote));
-  }
-
-  private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
-    Pattern refmaskPattern = Pattern.compile(refRegex);
-    return tasksStorage.list().stream()
-        .filter(task -> refmaskPattern.matcher(task.ref).matches())
-        .collect(toList());
-  }
-
-  private void cleanupReplicationTasks() throws IOException {
-    try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) {
-      for (Path path : files) {
-        path.toFile().delete();
-      }
-    }
-  }
-
   private boolean nonEmptyProjectExists(Project.NameKey name) {
     try (Repository r = repoManager.openRepository(name)) {
       return !r.getAllRefsByPeeledObjectId().isEmpty();
@@ -503,4 +318,20 @@
       return false;
     }
   }
+
+  private ObjectId createNewBranchWithoutPush(String fromBranch, String newBranch)
+      throws Exception {
+    try (Repository repo = repoManager.openRepository(project);
+        RevWalk walk = new RevWalk(repo)) {
+      Ref ref = repo.exactRef(fromBranch);
+      RevCommit tip = null;
+      if (ref != null) {
+        tip = walk.parseCommit(ref.getObjectId());
+      }
+      RefUpdate update = repo.updateRef(newBranch);
+      update.setNewObjectId(tip);
+      update.update(walk);
+      return update.getNewObjectId();
+    }
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
new file mode 100644
index 0000000..fdca243
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -0,0 +1,223 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.NO_OP;
+import static java.util.stream.Collectors.toList;
+
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.reviewdb.client.Project;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import org.junit.Test;
+
+/**
+ * The tests in this class aim to ensure events are correctly written and read from storage. They
+ * typically do this by setting up replication destinations with long delays, performing actions
+ * that are expected to write to storage, reloading the configuration (which should read from
+ * storage), and then confirming the actions complete as expected.
+ */
+@UseLocalDisk
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationStorageIT extends ReplicationDaemon {
+  private ReplicationTasksStorage tasksStorage;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    super.setUpTestPlugin();
+    tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+  }
+
+  @Test
+  public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
+    List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+    createTestProject("projectreplica1");
+    createTestProject("projectreplica2");
+
+    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    createChange();
+
+    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+  }
+
+  @Test
+  public void shouldCreateOneReplicationTaskWhenSchedulingRepoFullSync() throws Exception {
+    createTestProject("projectreplica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    plugin
+        .getSysInjector()
+        .getInstance(ReplicationQueue.class)
+        .scheduleFullSync(project, null, new ReplicationState(NO_OP), false);
+
+    assertThat(listReplicationTasks(".*all.*")).hasSize(1);
+  }
+
+  @Test
+  public void shouldFirePendingOnlyToIncompleteUri() throws Exception {
+    String suffix1 = "replica1";
+    String suffix2 = "replica2";
+    String remote1 = "foo1";
+    String remote2 = "foo2";
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String changeRef = createChange().getPatchSet().getRefName();
+    changeReplicationTasksForRemote(changeRef, remote1).forEach(tasksStorage::delete);
+    reloadConfig();
+
+    assertThat(changeReplicationTasksForRemote(changeRef, remote2).count()).isEqualTo(1);
+    assertThat(changeReplicationTasksForRemote(changeRef, remote1).count()).isEqualTo(0);
+  }
+
+  @Test
+  public void shouldFireAndCompletePendingOnlyToIncompleteUri() throws Exception {
+    String suffix1 = "replica1";
+    String suffix2 = "replica2";
+    Project.NameKey target1 = createTestProject("project" + suffix1);
+    Project.NameKey target2 = createTestProject("project" + suffix2);
+    String remote1 = "foo1";
+    String remote2 = "foo2";
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String changeRef = createChange().getPatchSet().getRefName();
+    changeReplicationTasksForRemote(changeRef, remote1).forEach(tasksStorage::delete);
+
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
+    reloadConfig();
+
+    assertThat(isPushCompleted(target2, changeRef, TEST_PUSH_TIMEOUT)).isEqualTo(true);
+    assertThat(isPushCompleted(target1, changeRef, TEST_PUSH_TIMEOUT)).isEqualTo(false);
+  }
+
+  @Test
+  public void shouldFirePendingChangeRefs() throws Exception {
+    String suffix1 = "replica1";
+    String suffix2 = "replica2";
+    String remote1 = "foo1";
+    String remote2 = "foo2";
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String changeRef1 = createChange().getPatchSet().getRefName();
+    String changeRef2 = createChange().getPatchSet().getRefName();
+    reloadConfig();
+
+    assertThat(changeReplicationTasksForRemote(changeRef1, remote1).count()).isEqualTo(1);
+    assertThat(changeReplicationTasksForRemote(changeRef1, remote2).count()).isEqualTo(1);
+    assertThat(changeReplicationTasksForRemote(changeRef2, remote1).count()).isEqualTo(1);
+    assertThat(changeReplicationTasksForRemote(changeRef2, remote2).count()).isEqualTo(1);
+  }
+
+  @Test
+  public void shouldFireAndCompletePendingChangeRefs() throws Exception {
+    String suffix1 = "replica1";
+    String suffix2 = "replica2";
+    Project.NameKey target1 = createTestProject("project" + suffix1);
+    Project.NameKey target2 = createTestProject("project" + suffix2);
+    String remote1 = "foo1";
+    String remote2 = "foo2";
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String changeRef1 = createChange().getPatchSet().getRefName();
+    String changeRef2 = createChange().getPatchSet().getRefName();
+
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
+    reloadConfig();
+
+    assertThat(isPushCompleted(target1, changeRef1, TEST_PUSH_TIMEOUT)).isEqualTo(true);
+    assertThat(isPushCompleted(target2, changeRef1, TEST_PUSH_TIMEOUT)).isEqualTo(true);
+    assertThat(isPushCompleted(target1, changeRef2, TEST_PUSH_TIMEOUT)).isEqualTo(true);
+    assertThat(isPushCompleted(target2, changeRef2, TEST_PUSH_TIMEOUT)).isEqualTo(true);
+  }
+
+  @Test
+  public void shouldMatchTemplatedUrl() throws Exception {
+    createTestProject("projectreplica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String urlMatch = gitPath.resolve("${name}" + "replica" + ".git").toString();
+    String expectedURI = gitPath.resolve(project + "replica" + ".git").toString();
+
+    plugin
+        .getSysInjector()
+        .getInstance(ReplicationQueue.class)
+        .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false);
+
+    assertThat(tasksStorage.list()).hasSize(1);
+    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.list()) {
+      assertThat(task.uri).isEqualTo(expectedURI);
+      assertThat(task.ref).isEqualTo(PushOne.ALL_REFS);
+    }
+  }
+
+  @Test
+  public void shouldMatchRealUrl() throws Exception {
+    createTestProject("projectreplica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String urlMatch = gitPath.resolve(project + "replica" + ".git").toString();
+    String expectedURI = urlMatch;
+
+    plugin
+        .getSysInjector()
+        .getInstance(ReplicationQueue.class)
+        .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false);
+
+    assertThat(tasksStorage.list()).hasSize(1);
+    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.list()) {
+      assertThat(task.uri).isEqualTo(expectedURI);
+      assertThat(task.ref).isEqualTo(PushOne.ALL_REFS);
+    }
+  }
+
+  private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
+      String changeRef, String remote) {
+    return tasksStorage.list().stream()
+        .filter(task -> changeRef.equals(task.ref))
+        .filter(task -> remote.equals(task.remote));
+  }
+
+  private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
+    Pattern refmaskPattern = Pattern.compile(refRegex);
+    return tasksStorage.list().stream()
+        .filter(task -> refmaskPattern.matcher(task.ref).matches())
+        .collect(toList());
+  }
+}