Merge branch 'stable-3.0' into stable-3.1

* stable-3.0:
  Move storage portion of replicateBranchDeletion ITs
  Refactor Replication*IT tests to share a base class
  ReplicationIT: Add shouldMatch* e2e tests
  ReplicationStorageIT: Move shouldMatch* tests from ReplicationIT
  ReplicationStorageIT: Add shouldFire*ChangeRefs tests
  Move storage-based ITs into ReplicationStorageIT
  ReplicationQueue: Remove unused method

This change does not try to reimpose the breakdown of tests that was
done in 3.0. That will be done in follow up change(s) to improve
reviewability of this change.

Change-Id: I81d27fd47da8eecad3aca36d8e6400679fb564a3
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 0a34d11..4625407 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -15,7 +15,6 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.auto.value.AutoValue;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import com.google.gerrit.common.UsedAt;
 import com.google.gerrit.entities.Project;
@@ -107,11 +106,6 @@
     return replaying;
   }
 
-  void scheduleFullSync(Project.NameKey project, String urlMatch, ReplicationState state) {
-    scheduleFullSync(project, urlMatch, state, false);
-  }
-
-  @VisibleForTesting
   public void scheduleFullSync(
       Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
     fire(project, urlMatch, PushOne.ALL_REFS, state, now);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
new file mode 100644
index 0000000..f393036
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -0,0 +1,169 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+
+/**
+ * This class can be extended by any Replication*IT class and provides common setup and helper
+ * methods.
+ */
+@UseLocalDisk
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationDaemon extends LightweightPluginDaemonTest {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  protected static final Optional<String> ALL_PROJECTS = Optional.empty();
+
+  protected static final int TEST_REPLICATION_DELAY_SECONDS = 1;
+  protected static final int TEST_REPLICATION_RETRY_MINUTES = 1;
+  protected static final int TEST_PUSH_TIME_SECONDS = 1;
+  protected static final Duration TEST_PUSH_TIMEOUT =
+      Duration.ofSeconds(TEST_REPLICATION_DELAY_SECONDS + TEST_PUSH_TIME_SECONDS);
+
+  @Inject protected SitePaths sitePaths;
+  @Inject private ProjectOperations projectOperations;
+  protected Path gitPath;
+  protected FileBasedConfig config;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    gitPath = sitePaths.site_path.resolve("git");
+    config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    config.save();
+    setReplicationDestination(
+        "remote1",
+        "suffix1",
+        Optional.of("not-used-project")); // Simulates a full replication.config initialization
+    super.setUpTestPlugin();
+  }
+
+  protected void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
+    setReplicationDestination(
+        remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY_SECONDS);
+  }
+
+  protected void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project, boolean mirror)
+      throws IOException {
+    setReplicationDestination(
+        remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY_SECONDS, mirror);
+  }
+
+  protected void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project, int replicationDelay)
+      throws IOException {
+    setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project, replicationDelay);
+  }
+
+  protected void setReplicationDestination(
+      String remoteName,
+      List<String> replicaSuffixes,
+      Optional<String> project,
+      int replicationDelay)
+      throws IOException {
+    setReplicationDestination(remoteName, replicaSuffixes, project, replicationDelay, false);
+  }
+
+  protected void setReplicationDestination(
+      String remoteName,
+      String replicaSuffix,
+      Optional<String> project,
+      int replicationDelay,
+      boolean mirror)
+      throws IOException {
+    setReplicationDestination(
+        remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror);
+  }
+
+  protected FileBasedConfig setReplicationDestination(
+      String remoteName,
+      List<String> replicaSuffixes,
+      Optional<String> project,
+      int replicationDelay,
+      boolean mirror)
+      throws IOException {
+    List<String> replicaUrls =
+        replicaSuffixes.stream()
+            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+            .collect(toList());
+    config.setStringList("remote", remoteName, "url", replicaUrls);
+    config.setInt("remote", remoteName, "replicationDelay", replicationDelay);
+    config.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY_MINUTES);
+    config.setBoolean("remote", remoteName, "mirror", mirror);
+    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+    config.setBoolean("gerrit", null, "autoReload", true);
+    config.save();
+    return config;
+  }
+
+  protected Project.NameKey createTestProject(String name) throws Exception {
+    return projectOperations.newProject().name(name).create();
+  }
+
+  protected boolean isPushCompleted(Project.NameKey project, String ref, Duration timeOut) {
+    try (Repository repo = repoManager.openRepository(project)) {
+      WaitUtil.waitUntil(() -> checkedGetRef(repo, ref) != null, timeOut);
+      return true;
+    } catch (InterruptedException e) {
+      return false;
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot open repo for project" + project, e);
+    }
+  }
+
+  protected Ref checkedGetRef(Repository repo, String branchName) {
+    try {
+      return repo.getRefDatabase().exactRef(branchName);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
+      return null;
+    }
+  }
+
+  protected void reloadConfig() {
+    getAutoReloadConfigDecoratorInstance().reload();
+  }
+
+  protected AutoReloadConfigDecorator getAutoReloadConfigDecoratorInstance() {
+    return getInstance(AutoReloadConfigDecorator.class);
+  }
+
+  protected <T> T getInstance(Class<T> classObj) {
+    return plugin.getSysInjector().getInstance(classObj);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 3c6ab0c..391c62d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -17,14 +17,10 @@
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.gerrit.testing.GerritJUnit.assertThrows;
 import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.NO_OP;
-import static java.util.stream.Collectors.toList;
 
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.PushOneCommit.Result;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.extensions.api.changes.NotifyHandling;
@@ -32,68 +28,45 @@
 import com.google.gerrit.extensions.common.ProjectInfo;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import com.google.inject.Key;
-import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
 import java.util.function.Supplier;
-import java.util.regex.Pattern;
-import java.util.stream.Stream;
 import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.revwalk.RevCommit;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.util.FS;
+import org.eclipse.jgit.revwalk.RevWalk;
 import org.junit.Test;
 
 @UseLocalDisk
 @TestPlugin(
     name = "replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
-public class ReplicationIT extends LightweightPluginDaemonTest {
-  private static final Optional<String> ALL_PROJECTS = Optional.empty();
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-  private static final int TEST_REPLICATION_DELAY = 1;
-  private static final int TEST_REPLICATION_RETRY = 1;
+public class ReplicationIT extends ReplicationDaemon {
   private static final int TEST_PROJECT_CREATION_SECONDS = 10;
   private static final Duration TEST_TIMEOUT =
-      Duration.ofSeconds((TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) + 1);
+      Duration.ofSeconds(
+          (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60) + 1);
 
   private static final Duration TEST_NEW_PROJECT_TIMEOUT =
       Duration.ofSeconds(
-          (TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) + TEST_PROJECT_CREATION_SECONDS);
+          (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
+              + TEST_PROJECT_CREATION_SECONDS);
 
-  @Inject private SitePaths sitePaths;
-  @Inject private ProjectOperations projectOperations;
   @Inject private DynamicSet<ProjectDeletedListener> deletedListeners;
   private Path pluginDataDir;
-  private Path gitPath;
   private Path storagePath;
-  private FileBasedConfig config;
   private ReplicationTasksStorage tasksStorage;
 
   @Override
   public void setUpTestPlugin() throws Exception {
-    gitPath = sitePaths.site_path.resolve("git");
-
-    config =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
-    setReplicationDestination(
-        "remote1",
-        "suffix1",
-        Optional.of("not-used-project")); // Simulates a full replication.config initialization
-    config.save();
-
     super.setUpTestPlugin();
 
     pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
@@ -218,78 +191,55 @@
   }
 
   @Test
-  public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
-    List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
-    createTestProject(project + "replica1");
-    createTestProject(project + "replica2");
-
-    FileBasedConfig dest1 = setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
-    FileBasedConfig dest2 = setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
-    dest1.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
-    dest2.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
-    dest1.save();
-    dest2.save();
-    reloadConfig();
-
-    createChange();
-
-    assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
-
-    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
-    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
-  }
-
-  @Test
-  public void shouldCreateOneReplicationTaskWhenSchedulingRepoFullSync() throws Exception {
-    createTestProject(project + "replica");
-
-    setReplicationDestination("foo", "replica", ALL_PROJECTS);
-    reloadConfig();
-
-    plugin
-        .getSysInjector()
-        .getInstance(ReplicationQueue.class)
-        .scheduleFullSync(project, null, new ReplicationState(NO_OP), true);
-
-    assertThat(listIncompleteTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
-  }
-
-  @Test
   public void shouldMatchTemplatedURL() throws Exception {
-    createTestProject(project + "replica");
+    Project.NameKey targetProject = createTestProject(project + "replica");
 
     setReplicationDestination("foo", "replica", ALL_PROJECTS);
     reloadConfig();
 
+    String newRef = "refs/heads/newForTest";
+    ObjectId newRefTip = createNewBranchWithoutPush("refs/heads/master", newRef);
+
     String urlMatch = gitPath.resolve("${name}" + "replica" + ".git").toString();
-    String expectedURI = gitPath.resolve(project + "replica" + ".git").toString();
 
     plugin
         .getSysInjector()
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), true);
 
-    assertThat(listIncompleteTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
-    streamIncompleteTasks().forEach((task) -> assertThat(task.uri).isEqualTo(expectedURI));
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, newRef) != null);
+
+      Ref targetBranchRef = getRef(repo, newRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(newRefTip);
+    }
   }
 
   @Test
   public void shouldMatchRealURL() throws Exception {
-    createTestProject(project + "replica");
+    Project.NameKey targetProject = createTestProject(project + "replica");
 
     setReplicationDestination("foo", "replica", ALL_PROJECTS);
     reloadConfig();
 
+    String newRef = "refs/heads/newForTest";
+    ObjectId newRefTip = createNewBranchWithoutPush("refs/heads/master", newRef);
+
     String urlMatch = gitPath.resolve(project + "replica" + ".git").toString();
-    String expectedURI = urlMatch;
 
     plugin
         .getSysInjector()
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), true);
 
-    assertThat(listIncompleteTasks()).hasSize(1);
-    streamIncompleteTasks().forEach((task) -> assertThat(task.uri).isEqualTo(expectedURI));
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, newRef) != null);
+
+      Ref targetBranchRef = getRef(repo, newRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(newRefTip);
+    }
   }
 
   @Test
@@ -335,16 +285,12 @@
     input.revision = master;
     gApi.projects().name(project.get()).branch(branchToDelete).create(input);
 
-    assertThat(listIncompleteTasks("refs/heads/(todelete|master)")).hasSize(2);
-
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, branchToDelete) != null);
     }
 
     gApi.projects().name(project.get()).branch(branchToDelete).delete();
 
-    assertThat(listIncompleteTasks(branchToDelete)).hasSize(1);
-
     try (Repository repo = repoManager.openRepository(targetProject)) {
       if (mirror) {
         waitUntil(() -> checkedGetRef(repo, branchToDelete) == null);
@@ -441,118 +387,10 @@
     waitUntil(() -> tasksStorage.listRunning().size() == 0);
   }
 
-  @Test
-  public void shouldFirePendingOnlyToRemainingUris() throws Exception {
-    String suffix1 = "replica1";
-    String suffix2 = "replica2";
-    Project.NameKey target1 = createTestProject(project + suffix1);
-    Project.NameKey target2 = createTestProject(project + suffix2);
-    String remote1 = "foo1";
-    String remote2 = "foo2";
-    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE, false);
-    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE, false);
-    reloadConfig();
-
-    String changeRef = createChange().getPatchSet().refName();
-
-    changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
-        .forEach(
-            (update) -> {
-              try {
-                UriUpdates uriUpdates = TestUriUpdates.create(update);
-                tasksStorage.start(uriUpdates);
-                tasksStorage.finish(uriUpdates);
-              } catch (URISyntaxException e) {
-              }
-            });
-
-    setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
-    setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
-    reloadConfig();
-
-    assertThat(changeReplicationTasksForRemote(changeRef, remote2).count()).isEqualTo(1);
-    assertThat(changeReplicationTasksForRemote(changeRef, remote1).count()).isEqualTo(0);
-
-    assertThat(isPushCompleted(target2, changeRef, TEST_TIMEOUT)).isEqualTo(true);
-    assertThat(isPushCompleted(target1, changeRef, TEST_TIMEOUT)).isEqualTo(false);
-  }
-
-  public boolean isPushCompleted(Project.NameKey project, String ref, Duration timeOut) {
-    try (Repository repo = repoManager.openRepository(project)) {
-      WaitUtil.waitUntil(() -> checkedGetRef(repo, ref) != null, timeOut);
-      return true;
-    } catch (InterruptedException e) {
-      return false;
-    } catch (Exception e) {
-      throw new RuntimeException("Cannot open repo for project" + project, e);
-    }
-  }
-
   private Ref getRef(Repository repo, String branchName) throws IOException {
     return repo.getRefDatabase().exactRef(branchName);
   }
 
-  private Ref checkedGetRef(Repository repo, String branchName) {
-    try {
-      return repo.getRefDatabase().exactRef(branchName);
-    } catch (Exception e) {
-      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
-      return null;
-    }
-  }
-
-  private void setReplicationDestination(
-      String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
-    setReplicationDestination(
-        remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY, false);
-  }
-
-  private void setReplicationDestination(
-      String remoteName, String replicaSuffix, Optional<String> project, boolean mirror)
-      throws IOException {
-    setReplicationDestination(
-        remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY, mirror);
-  }
-
-  private void setReplicationDestination(
-      String remoteName,
-      String replicaSuffix,
-      Optional<String> project,
-      int replicationDelay,
-      boolean mirror)
-      throws IOException {
-    setReplicationDestination(
-        remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror);
-  }
-
-  private FileBasedConfig setReplicationDestination(
-      String remoteName, List<String> replicaSuffixes, Optional<String> project)
-      throws IOException {
-    return setReplicationDestination(
-        remoteName, replicaSuffixes, project, TEST_REPLICATION_DELAY, false);
-  }
-
-  private FileBasedConfig setReplicationDestination(
-      String remoteName,
-      List<String> replicaSuffixes,
-      Optional<String> project,
-      int replicationDelay,
-      boolean mirror)
-      throws IOException {
-    List<String> replicaUrls =
-        replicaSuffixes.stream()
-            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
-            .collect(toList());
-    config.setStringList("remote", remoteName, "url", replicaUrls);
-    config.setInt("remote", remoteName, "replicationDelay", replicationDelay);
-    config.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY);
-    config.setBoolean("remote", remoteName, "mirror", mirror);
-    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
-    config.setBoolean("gerrit", null, "autoReload", true);
-    config.save();
-    return config;
-  }
-
   private void setProjectDeletionReplication(String remoteName, boolean replicateProjectDeletion)
       throws IOException {
     config.setBoolean("remote", remoteName, "replicateProjectDeletions", replicateProjectDeletion);
@@ -563,10 +401,6 @@
     WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
   }
 
-  private void reloadConfig() {
-    getAutoReloadConfigDecoratorInstance().reload();
-  }
-
   private void shutdownDestinations() {
     getInstance(DestinationsCollection.class).shutdown();
   }
@@ -579,54 +413,10 @@
     getReplicationQueueInstance().stop();
   }
 
-  private AutoReloadConfigDecorator getAutoReloadConfigDecoratorInstance() {
-    return getInstance(AutoReloadConfigDecorator.class);
-  }
-
   private ReplicationQueue getReplicationQueueInstance() {
     return getInstance(ReplicationQueue.class);
   }
 
-  private <T> T getInstance(Class<T> classObj) {
-    return plugin.getSysInjector().getInstance(classObj);
-  }
-
-  private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
-      String changeRef, String remote) {
-    return changeReplicationTasksForRemote(streamIncompleteTasks(), changeRef, remote);
-  }
-
-  private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
-      Stream<ReplicateRefUpdate> updates, String changeRef, String remote) {
-    return updates
-        .filter(task -> changeRef.equals(task.ref))
-        .filter(task -> remote.equals(task.remote));
-  }
-
-  private Project.NameKey createTestProject(String name) throws Exception {
-    return projectOperations.newProject().name(name).create();
-  }
-
-  private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) {
-    Pattern refmaskPattern = Pattern.compile(refRegex);
-    return streamIncompleteTasks()
-        .filter(task -> refmaskPattern.matcher(task.ref).matches())
-        .collect(toList());
-  }
-
-  private List<ReplicateRefUpdate> listIncompleteTasks() {
-    return streamIncompleteTasks().collect(toList());
-  }
-
-  @SuppressWarnings(
-      "SynchronizeOnNonFinalField") // tasksStorage is non-final but only set in setUpTestPlugin()
-  private Stream<ReplicateRefUpdate> streamIncompleteTasks() {
-    synchronized (tasksStorage) {
-      return Stream.concat(
-          tasksStorage.listWaiting().stream(), tasksStorage.listRunning().stream());
-    }
-  }
-
   public void cleanupReplicationTasks() throws IOException {
     cleanupReplicationTasks(storagePath);
   }
@@ -650,4 +440,20 @@
       return false;
     }
   }
+
+  private ObjectId createNewBranchWithoutPush(String fromBranch, String newBranch)
+      throws Exception {
+    try (Repository repo = repoManager.openRepository(project);
+        RevWalk walk = new RevWalk(repo)) {
+      Ref ref = repo.exactRef(fromBranch);
+      RevCommit tip = null;
+      if (ref != null) {
+        tip = walk.parseCommit(ref.getObjectId());
+      }
+      RefUpdate update = repo.updateRef(newBranch);
+      update.setNewObjectId(tip);
+      update.update(walk);
+      return update.getNewObjectId();
+    }
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
new file mode 100644
index 0000000..c0ae479
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -0,0 +1,280 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.NO_OP;
+import static java.util.stream.Collectors.toList;
+
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import org.junit.Test;
+
+/**
+ * The tests in this class aim to ensure events are correctly written and read from storage. They
+ * typically do this by setting up replication destinations with long delays, performing actions
+ * that are expected to write to storage, reloading the configuration (which should read from
+ * storage), and then confirming the actions complete as expected.
+ */
+@UseLocalDisk
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationStorageIT extends ReplicationDaemon {
+  protected ReplicationTasksStorage tasksStorage;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    super.setUpTestPlugin();
+    tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+  }
+
+  @Test
+  public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
+    List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+    createTestProject(project + "replica1");
+    createTestProject(project + "replica2");
+
+    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    createChange();
+
+    assertThat(listWaitingReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+  }
+
+  @Test
+  public void shouldCreateOneReplicationTaskWhenSchedulingRepoFullSync() throws Exception {
+    createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    plugin
+        .getSysInjector()
+        .getInstance(ReplicationQueue.class)
+        .scheduleFullSync(project, null, new ReplicationState(NO_OP), false);
+
+    assertThat(listWaitingReplicationTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
+  }
+
+  @Test
+  public void shouldFirePendingOnlyToIncompleteUri() throws Exception {
+    String suffix1 = "replica1";
+    String suffix2 = "replica2";
+    String remote1 = "foo1";
+    String remote2 = "foo2";
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String changeRef = createChange().getPatchSet().refName();
+    changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
+        .forEach(
+            (update) -> {
+              try {
+                UriUpdates uriUpdates = TestUriUpdates.create(update);
+                tasksStorage.start(uriUpdates);
+                tasksStorage.finish(uriUpdates);
+              } catch (URISyntaxException e) {
+              }
+            });
+    reloadConfig();
+
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef, remote2).count()).isEqualTo(1);
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef, remote1).count()).isEqualTo(0);
+  }
+
+  @Test
+  public void shouldFireAndCompletePendingOnlyToIncompleteUri() throws Exception {
+    String suffix1 = "replica1";
+    String suffix2 = "replica2";
+    Project.NameKey target1 = createTestProject(project + suffix1);
+    Project.NameKey target2 = createTestProject(project + suffix2);
+    String remote1 = "foo1";
+    String remote2 = "foo2";
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String changeRef = createChange().getPatchSet().refName();
+    changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
+        .forEach(
+            (update) -> {
+              try {
+                UriUpdates uriUpdates = TestUriUpdates.create(update);
+                tasksStorage.start(uriUpdates);
+                tasksStorage.finish(uriUpdates);
+              } catch (URISyntaxException e) {
+              }
+            });
+
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
+    reloadConfig();
+
+    assertThat(isPushCompleted(target2, changeRef, TEST_PUSH_TIMEOUT)).isEqualTo(true);
+    assertThat(isPushCompleted(target1, changeRef, TEST_PUSH_TIMEOUT)).isEqualTo(false);
+  }
+
+  @Test
+  public void shouldFirePendingChangeRefs() throws Exception {
+    String suffix1 = "replica1";
+    String suffix2 = "replica2";
+    String remote1 = "foo1";
+    String remote2 = "foo2";
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String changeRef1 = createChange().getPatchSet().refName();
+    String changeRef2 = createChange().getPatchSet().refName();
+    reloadConfig();
+
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef1, remote1).count()).isEqualTo(1);
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef1, remote2).count()).isEqualTo(1);
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef2, remote1).count()).isEqualTo(1);
+    assertThat(waitingChangeReplicationTasksForRemote(changeRef2, remote2).count()).isEqualTo(1);
+  }
+
+  @Test
+  public void shouldFireAndCompletePendingChangeRefs() throws Exception {
+    String suffix1 = "replica1";
+    String suffix2 = "replica2";
+    Project.NameKey target1 = createTestProject(project + suffix1);
+    Project.NameKey target2 = createTestProject(project + suffix2);
+    String remote1 = "foo1";
+    String remote2 = "foo2";
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String changeRef1 = createChange().getPatchSet().refName();
+    String changeRef2 = createChange().getPatchSet().refName();
+
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
+    reloadConfig();
+
+    assertThat(isPushCompleted(target1, changeRef1, TEST_PUSH_TIMEOUT)).isEqualTo(true);
+    assertThat(isPushCompleted(target2, changeRef1, TEST_PUSH_TIMEOUT)).isEqualTo(true);
+    assertThat(isPushCompleted(target1, changeRef2, TEST_PUSH_TIMEOUT)).isEqualTo(true);
+    assertThat(isPushCompleted(target2, changeRef2, TEST_PUSH_TIMEOUT)).isEqualTo(true);
+  }
+
+  @Test
+  public void shouldMatchTemplatedUrl() throws Exception {
+    createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String urlMatch = gitPath.resolve("${name}" + "replica" + ".git").toString();
+    String expectedURI = gitPath.resolve(project + "replica" + ".git").toString();
+
+    plugin
+        .getSysInjector()
+        .getInstance(ReplicationQueue.class)
+        .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false);
+
+    assertThat(tasksStorage.listWaiting()).hasSize(1);
+    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.listWaiting()) {
+      assertThat(task.uri).isEqualTo(expectedURI);
+      assertThat(task.ref).isEqualTo(PushOne.ALL_REFS);
+    }
+  }
+
+  @Test
+  public void shouldMatchRealUrl() throws Exception {
+    createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String urlMatch = gitPath.resolve(project + "replica" + ".git").toString();
+    String expectedURI = urlMatch;
+
+    plugin
+        .getSysInjector()
+        .getInstance(ReplicationQueue.class)
+        .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false);
+
+    assertThat(tasksStorage.listWaiting()).hasSize(1);
+    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.listWaiting()) {
+      assertThat(task.uri).isEqualTo(expectedURI);
+      assertThat(task.ref).isEqualTo(PushOne.ALL_REFS);
+    }
+  }
+
+  @Test
+  public void shouldReplicateBranchDeletionWhenMirror() throws Exception {
+    replicateBranchDeletion(true);
+  }
+
+  @Test
+  public void shouldNotReplicateBranchDeletionWhenNotMirror() throws Exception {
+    replicateBranchDeletion(false);
+  }
+
+  private void replicateBranchDeletion(boolean mirror) throws Exception {
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Project.NameKey targetProject = createTestProject(project + "replica");
+    String branchToDelete = "refs/heads/todelete";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(project.get()).branch(branchToDelete).create(input);
+    isPushCompleted(targetProject, branchToDelete, TEST_PUSH_TIMEOUT);
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE, mirror);
+    reloadConfig();
+
+    gApi.projects().name(project.get()).branch(branchToDelete).delete();
+
+    assertThat(listWaitingReplicationTasks(branchToDelete)).hasSize(1);
+  }
+
+  private Stream<ReplicateRefUpdate> waitingChangeReplicationTasksForRemote(
+      String changeRef, String remote) {
+    return tasksStorage.listWaiting().stream()
+        .filter(task -> changeRef.equals(task.ref))
+        .filter(task -> remote.equals(task.remote));
+  }
+
+  private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
+      Stream<ReplicateRefUpdate> updates, String changeRef, String remote) {
+    return updates
+        .filter(task -> changeRef.equals(task.ref))
+        .filter(task -> remote.equals(task.remote));
+  }
+
+  private List<ReplicateRefUpdate> listWaitingReplicationTasks(String refRegex) {
+    Pattern refmaskPattern = Pattern.compile(refRegex);
+    return tasksStorage.listWaiting().stream()
+        .filter(task -> refmaskPattern.matcher(task.ref).matches())
+        .collect(toList());
+  }
+}