Merge branch 'stable-3.2' into master

* stable-3.2:
  ReplicationTasksStorage: Remove synchronized from list* methods
  TasksStorage: Add unit tests for reset() and resetAll()
  TasksStorage: Add canStartDifferentUris unit test
  ReplicationIT: Use streams to simplify
  ReplicationIT: Use PushOne.ALL_REFS constant
  ReplicationTasksStorageTest: Add a test for start()
  ReplicationTasksStorage: Remove test-only list* methods
  Fix synopsis in replication start cmd documentation
  TaskStorage: Fix assertContainsExactly
  TaskStorage: Rename unit tests with 'persist' in their names
  Don't wait for pending events to process on startup
  ReplicateRefUpdate: Drop awkward constructor
  TasksStorage: Replace delete() with start()+finish() in tests
  ReplicationTasksStorage.Task: Add unit tests
  ReplicationTasksStorage: Add unit tests

Change-Id: I2ebe8f3bf299a806726b2feba704750dc2edf239
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index d598925..4b155a7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -86,7 +86,7 @@
  * <p>Instance members are protected by the lock within PushQueue. Callers must take that lock to
  * ensure they are working with a current view of the object.
  */
-class PushOne implements ProjectRunnable, CanceledWhileRunning {
+class PushOne implements ProjectRunnable, CanceledWhileRunning, UriUpdates {
   private final ReplicationStateListener stateLog;
   static final String ALL_REFS = "..all..";
   static final String ID_KEY = "pushOneId";
@@ -231,7 +231,8 @@
     return canceled || canceledWhileRunning.get();
   }
 
-  URIish getURI() {
+  @Override
+  public URIish getURI() {
     return uri;
   }
 
@@ -246,7 +247,8 @@
     }
   }
 
-  Set<String> getRefs() {
+  @Override
+  public Set<String> getRefs() {
     return pushAllRefs ? Sets.newHashSet(ALL_REFS) : delta;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index e685237..5a4d56c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -88,7 +88,9 @@
       destinations.get().startup(workQueue);
       running = true;
       replicationTasksStorage.resetAll();
-      firePendingEvents();
+      Thread t = new Thread(this::firePendingEvents, "firePendingEvents");
+      t.setDaemon(true);
+      t.start();
       fireBeforeStartupEvents();
       distributor = new Distributor(workQueue);
     }
@@ -211,6 +213,8 @@
           repLog.atSevere().withCause(e).log("Encountered malformed URI for persisted event %s", t);
         }
       }
+    } catch (Throwable e) {
+      repLog.atSevere().withCause(e).log("Unexpected error while firing pending events");
     } finally {
       replaying = false;
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index bfd0886..4ef0d13 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -71,10 +71,6 @@
     public final String uri;
     public final String remote;
 
-    public ReplicateRefUpdate(PushOne push, String ref) {
-      this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName());
-    }
-
     public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
       this.project = project;
       this.ref = ref;
@@ -90,14 +86,17 @@
 
   private static final Gson GSON = new Gson();
 
-  private final Path refUpdates;
   private final Path buildingUpdates;
   private final Path runningUpdates;
   private final Path waitingUpdates;
 
   @Inject
   ReplicationTasksStorage(ReplicationConfig config) {
-    refUpdates = config.getEventsDirectory().resolve("ref-updates");
+    this(config.getEventsDirectory().resolve("ref-updates"));
+  }
+
+  @VisibleForTesting
+  public ReplicationTasksStorage(Path refUpdates) {
     buildingUpdates = refUpdates.resolve("building");
     runningUpdates = refUpdates.resolve("running");
     waitingUpdates = refUpdates.resolve("waiting");
@@ -112,20 +111,15 @@
     this.disableDeleteForTesting = deleteDisabled;
   }
 
-  @VisibleForTesting
-  public void delete(ReplicateRefUpdate r) {
-    new Task(r).delete();
-  }
-
-  public synchronized void start(PushOne push) {
-    for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).start();
+  public synchronized void start(UriUpdates uriUpdates) {
+    for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
+      new Task(update).start();
     }
   }
 
-  public synchronized void reset(PushOne push) {
-    for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).reset();
+  public synchronized void reset(UriUpdates uriUpdates) {
+    for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
+      new Task(update).reset();
     }
   }
 
@@ -135,35 +129,26 @@
     }
   }
 
-  public boolean isWaiting(PushOne push) {
-    return push.getRefs().stream().map(ref -> new Task(push, ref)).anyMatch(Task::isWaiting);
+  public boolean isWaiting(UriUpdates uriUpdates) {
+    return uriUpdates.getReplicateRefUpdates().stream()
+        .map(update -> new Task(update))
+        .anyMatch(Task::isWaiting);
   }
 
-  public void finish(PushOne push) {
-    for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).finish();
+  public void finish(UriUpdates uriUpdates) {
+    for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
+      new Task(update).finish();
     }
   }
 
-  public synchronized List<ReplicateRefUpdate> listWaiting() {
+  public List<ReplicateRefUpdate> listWaiting() {
     return list(createDir(waitingUpdates));
   }
 
-  @VisibleForTesting
-  public synchronized List<ReplicateRefUpdate> listRunning() {
+  public List<ReplicateRefUpdate> listRunning() {
     return list(createDir(runningUpdates));
   }
 
-  @VisibleForTesting
-  public synchronized List<ReplicateRefUpdate> listBuilding() {
-    return list(createDir(buildingUpdates));
-  }
-
-  @VisibleForTesting
-  public synchronized List<ReplicateRefUpdate> list() {
-    return list(createDir(refUpdates));
-  }
-
   private List<ReplicateRefUpdate> list(Path tasks) {
     List<ReplicateRefUpdate> results = new ArrayList<>();
     try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) {
@@ -207,16 +192,13 @@
     }
   }
 
-  private class Task {
+  @VisibleForTesting
+  class Task {
     public final ReplicateRefUpdate update;
     public final String taskKey;
     public final Path running;
     public final Path waiting;
 
-    public Task(PushOne push, String ref) {
-      this(new ReplicateRefUpdate(push, ref));
-    }
-
     public Task(ReplicateRefUpdate update) {
       this.update = update;
       String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
@@ -269,15 +251,6 @@
       }
     }
 
-    public void delete() {
-      try {
-        Files.deleteIfExists(waiting);
-        Files.deleteIfExists(running);
-      } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
-      }
-    }
-
     private void rename(Path from, Path to) {
       try {
         logger.atFine().log("RENAME %s to %s %s", from, to, updateLog());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
new file mode 100644
index 0000000..9c56c8e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
@@ -0,0 +1,41 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.entities.Project;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.eclipse.jgit.transport.URIish;
+
+/** A data container for updates to a single URI */
+public interface UriUpdates {
+  Project.NameKey getProjectNameKey();
+
+  URIish getURI();
+
+  String getRemoteName();
+
+  Set<String> getRefs();
+
+  default List<ReplicationTasksStorage.ReplicateRefUpdate> getReplicateRefUpdates() {
+    return getRefs().stream()
+        .map(
+            (ref) ->
+                new ReplicationTasksStorage.ReplicateRefUpdate(
+                    getProjectNameKey().get(), ref, getURI(), getRemoteName()))
+        .collect(Collectors.toList());
+  }
+}
diff --git a/src/main/resources/Documentation/cmd-start.md b/src/main/resources/Documentation/cmd-start.md
index 8291421..e12ec92 100644
--- a/src/main/resources/Documentation/cmd-start.md
+++ b/src/main/resources/Documentation/cmd-start.md
@@ -11,8 +11,7 @@
 ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start
   [--now]
   [--wait]
-  [--url <PATTERN>]
-  {--all | <PROJECT PATTERN> ...}
+  {--url <PATTERN> | [--url <PATTERN>] --all | [--url <PATTERN>] <PROJECT PATTERN> ...}
 ```
 
 DESCRIPTION
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
index adb7fc8..f06b933 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -42,6 +42,7 @@
 import java.util.Optional;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
+import java.util.stream.Stream;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.revwalk.RevCommit;
@@ -108,7 +109,7 @@
     input.revision = master;
     gApi.projects().name(project.get()).branch(newBranch).create(input);
 
-    assertThat(listReplicationTasks("refs/heads/(mybranch|master)")).hasSize(2);
+    assertThat(listIncompleteTasks("refs/heads/(mybranch|master)")).hasSize(2);
 
     try (Repository repo = repoManager.openRepository(targetProject);
         Repository sourceRepo = repoManager.openRepository(project)) {
@@ -134,7 +135,7 @@
     RevCommit sourceCommit = pushResult.getCommit();
     String sourceRef = pushResult.getPatchSet().refName();
 
-    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
+    assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
 
     try (Repository repo1 = repoManager.openRepository(targetProject1);
         Repository repo2 = repoManager.openRepository(targetProject2)) {
@@ -166,7 +167,7 @@
 
     createChange();
 
-    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+    assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
 
     setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
     setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
@@ -241,11 +242,15 @@
     return projectOperations.newProject().name(name).create();
   }
 
-  private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
+  @SuppressWarnings(
+      "SynchronizeOnNonFinalField") // tasksStorage is non-final but only set in setUpTestPlugin()
+  private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) {
     Pattern refmaskPattern = Pattern.compile(refRegex);
-    return tasksStorage.list().stream()
-        .filter(task -> refmaskPattern.matcher(task.ref).matches())
-        .collect(toList());
+    synchronized (tasksStorage) {
+      return Stream.concat(tasksStorage.listWaiting().stream(), tasksStorage.listRunning().stream())
+          .filter(task -> refmaskPattern.matcher(task.ref).matches())
+          .collect(toList());
+    }
   }
 
   public void cleanupReplicationTasks() throws IOException {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 08b8c65..4fb0387 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -39,6 +39,7 @@
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -117,7 +118,7 @@
 
     Project.NameKey sourceProject = createTestProject("foo");
 
-    assertThat(listReplicationTasks("refs/meta/config")).hasSize(1);
+    assertThat(listIncompleteTasks("refs/meta/config")).hasSize(1);
 
     waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")));
 
@@ -164,7 +165,7 @@
     RevCommit sourceCommit = pushResult.getCommit();
     String sourceRef = pushResult.getPatchSet().refName();
 
-    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1);
+    assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1);
 
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -187,7 +188,7 @@
     input.revision = master;
     gApi.projects().name(project.get()).branch(newBranch).create(input);
 
-    assertThat(listReplicationTasks("refs/heads/(mybranch|master)")).hasSize(2);
+    assertThat(listIncompleteTasks("refs/heads/(mybranch|master)")).hasSize(2);
 
     try (Repository repo = repoManager.openRepository(targetProject);
         Repository sourceRepo = repoManager.openRepository(project)) {
@@ -213,7 +214,7 @@
     RevCommit sourceCommit = pushResult.getCommit();
     String sourceRef = pushResult.getPatchSet().refName();
 
-    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
+    assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
 
     try (Repository repo1 = repoManager.openRepository(targetProject1);
         Repository repo2 = repoManager.openRepository(targetProject2)) {
@@ -245,7 +246,7 @@
 
     createChange();
 
-    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+    assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
 
     setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
     setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
@@ -263,7 +264,7 @@
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, null, new ReplicationState(NO_OP), true);
 
-    assertThat(listReplicationTasks(".*all.*")).hasSize(1);
+    assertThat(listIncompleteTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
   }
 
   @Test
@@ -281,10 +282,8 @@
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), true);
 
-    assertThat(listReplicationTasks(".*all.*")).hasSize(1);
-    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.list()) {
-      assertThat(task.uri).isEqualTo(expectedURI);
-    }
+    assertThat(listIncompleteTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
+    streamIncompleteTasks().forEach((task) -> assertThat(task.uri).isEqualTo(expectedURI));
   }
 
   @Test
@@ -302,11 +301,8 @@
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), true);
 
-    assertThat(listReplicationTasks(".*")).hasSize(1);
-    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.list()) {
-      assertThat(task.uri).isEqualTo(expectedURI);
-    }
-    assertThat(tasksStorage.list()).isNotEmpty();
+    assertThat(listIncompleteTasks()).hasSize(1);
+    streamIncompleteTasks().forEach((task) -> assertThat(task.uri).isEqualTo(expectedURI));
   }
 
   @Test
@@ -354,7 +350,7 @@
     input.revision = master;
     gApi.projects().name(project.get()).branch(branchToDelete).create(input);
 
-    assertThat(listReplicationTasks("refs/heads/(todelete|master)")).hasSize(2);
+    assertThat(listIncompleteTasks("refs/heads/(todelete|master)")).hasSize(2);
 
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, branchToDelete) != null);
@@ -362,7 +358,7 @@
 
     gApi.projects().name(project.get()).branch(branchToDelete).delete();
 
-    assertThat(listReplicationTasks(branchToDelete)).hasSize(1);
+    assertThat(listIncompleteTasks(branchToDelete)).hasSize(1);
 
     try (Repository repo = repoManager.openRepository(targetProject)) {
       if (mirror) {
@@ -515,7 +511,7 @@
   }
 
   @Test
-  public void shouldFirePendingOnlyToStoredUri() throws Exception {
+  public void shouldFirePendingOnlyToRemainingUris() throws Exception {
     String suffix1 = "replica1";
     String suffix2 = "replica2";
     Project.NameKey target1 = createTestProject(project + suffix1);
@@ -529,7 +525,16 @@
     String changeRef = createChange().getPatchSet().refName();
 
     tasksStorage.disableDeleteForTesting(false);
-    changeReplicationTasksForRemote(changeRef, remote1).forEach(tasksStorage::delete);
+    changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
+        .forEach(
+            (update) -> {
+              try {
+                UriUpdates uriUpdates = TestUriUpdates.create(update);
+                tasksStorage.start(uriUpdates);
+                tasksStorage.finish(uriUpdates);
+              } catch (URISyntaxException e) {
+              }
+            });
     tasksStorage.disableDeleteForTesting(true);
 
     setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
@@ -657,7 +662,12 @@
 
   private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
       String changeRef, String remote) {
-    return tasksStorage.list().stream()
+    return changeReplicationTasksForRemote(streamIncompleteTasks(), changeRef, remote);
+  }
+
+  private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
+      Stream<ReplicateRefUpdate> updates, String changeRef, String remote) {
+    return updates
         .filter(task -> changeRef.equals(task.ref))
         .filter(task -> remote.equals(task.remote));
   }
@@ -666,13 +676,26 @@
     return projectOperations.newProject().name(name).create();
   }
 
-  private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
+  private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) {
     Pattern refmaskPattern = Pattern.compile(refRegex);
-    return tasksStorage.list().stream()
+    return streamIncompleteTasks()
         .filter(task -> refmaskPattern.matcher(task.ref).matches())
         .collect(toList());
   }
 
+  private List<ReplicateRefUpdate> listIncompleteTasks() {
+    return streamIncompleteTasks().collect(toList());
+  }
+
+  @SuppressWarnings(
+      "SynchronizeOnNonFinalField") // tasksStorage is non-final but only set in setUpTestPlugin()
+  private Stream<ReplicateRefUpdate> streamIncompleteTasks() {
+    synchronized (tasksStorage) {
+      return Stream.concat(
+          tasksStorage.listWaiting().stream(), tasksStorage.listRunning().stream());
+    }
+  }
+
   public void cleanupReplicationTasks() throws IOException {
     cleanupReplicationTasks(storagePath);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
new file mode 100644
index 0000000..ca69644
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
@@ -0,0 +1,380 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.jimfs.Configuration;
+import com.google.common.jimfs.Jimfs;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.Task;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystem;
+import java.nio.file.Files;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicationTasksStorageTaskTest {
+  protected static final String PROJECT = "myProject";
+  protected static final String REF = "myRef";
+  protected static final String REMOTE = "myDest";
+  protected static final URIish URISH = getUrish("http://example.com/" + PROJECT + ".git");
+  protected static final ReplicateRefUpdate REF_UPDATE =
+      new ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE);
+
+  protected ReplicationTasksStorage tasksStorage;
+  protected FileSystem fileSystem;
+
+  @Before
+  public void setUp() throws Exception {
+    fileSystem = Jimfs.newFileSystem(Configuration.unix());
+    tasksStorage = new ReplicationTasksStorage(fileSystem.getPath("replication_site"));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fileSystem.close();
+  }
+
+  @Test
+  public void createdTaskIsWaiting() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+    assertNotWaiting(original);
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    assertNotWaiting(persistedView);
+
+    original.create();
+    assertIsWaiting(original);
+    assertIsWaiting(persistedView);
+  }
+
+  @Test
+  public void startedTaskIsRunning() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+
+    original.create();
+    original.start();
+    assertNotWaiting(original);
+    assertIsRunning(original);
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    assertNotWaiting(persistedView);
+    assertIsRunning(persistedView);
+  }
+
+  @Test
+  public void resetTaskIsWaiting() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+
+    original.create();
+    original.start();
+    original.reset();
+    assertIsWaiting(original);
+    assertNotRunning(original);
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    assertIsWaiting(persistedView);
+    assertNotRunning(persistedView);
+  }
+
+  @Test
+  public void finishedTaskIsRemoved() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+
+    original.create();
+    original.start();
+    original.finish();
+    assertNotWaiting(original);
+    assertNotRunning(original);
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    assertNotWaiting(persistedView);
+    assertNotRunning(persistedView);
+  }
+
+  @Test
+  public void createWaitingTaskIsDeduped() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+    String key = original.create();
+
+    Task secondUpdate = tasksStorage.new Task(REF_UPDATE);
+    assertEquals(secondUpdate.create(), key);
+    assertIsWaiting(secondUpdate);
+    assertIsWaiting(original);
+  }
+
+  @Test
+  public void createWaitingTaskWhileTaskIsRunning() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+    original.create();
+    original.start();
+    assertIsRunning(original);
+
+    Task secondUpdate = tasksStorage.new Task(REF_UPDATE);
+    secondUpdate.create();
+
+    assertIsWaiting(secondUpdate);
+    assertIsRunning(original);
+  }
+
+  @Test
+  public void canCompleteTwoTasks() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+    original.create();
+    original.start();
+    original.finish();
+
+    Task secondUpdate = tasksStorage.new Task(REF_UPDATE);
+    secondUpdate.create();
+    assertIsWaiting(secondUpdate);
+    secondUpdate.start();
+    assertIsRunning(secondUpdate);
+    secondUpdate.finish();
+    assertNotWaiting(secondUpdate);
+    assertNotRunning(secondUpdate);
+  }
+
+  @Test
+  public void canStartResetTask() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+
+    original.create();
+    original.start();
+    original.reset();
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    persistedView.start();
+    assertIsRunning(persistedView);
+    assertIsRunning(original);
+  }
+
+  @Test
+  public void canResetPreviouslyResetAndStartedTask() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+
+    original.create();
+    original.start();
+    original.reset();
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    persistedView.start();
+    persistedView.reset();
+    assertIsWaiting(persistedView);
+    assertIsWaiting(original);
+  }
+
+  @Test
+  public void multipleActorsCanUpdateSameTask() throws Exception {
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+
+    Task fromEvent = tasksStorage.new Task(REF_UPDATE);
+    fromEvent.create();
+    assertIsWaiting(persistedView);
+
+    Task fromPusherStart = tasksStorage.new Task(REF_UPDATE);
+    fromPusherStart.start();
+    assertIsRunning(persistedView);
+
+    Task fromPusherEnd = tasksStorage.new Task(REF_UPDATE);
+    fromPusherEnd.finish();
+    assertNotWaiting(persistedView);
+    assertNotRunning(persistedView);
+  }
+
+  @Test
+  public void canHaveTwoWaitingTasksForDifferentRefs() throws Exception {
+    Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE));
+    Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE));
+    updateA.create();
+    updateB.create();
+    assertIsWaiting(updateA);
+    assertIsWaiting(updateB);
+  }
+
+  @Test
+  public void canHaveTwoRunningTasksForDifferentRefs() throws Exception {
+    Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE));
+    Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE));
+    updateA.create();
+    updateB.create();
+    updateA.start();
+    updateB.start();
+    assertIsRunning(updateA);
+    assertIsRunning(updateB);
+  }
+
+  @Test
+  public void canHaveTwoWaitingTasksForDifferentProjects() throws Exception {
+    Task updateA =
+        tasksStorage
+        .new Task(
+            new ReplicateRefUpdate(
+                "projectA", REF, getUrish("http://example.com/projectA.git"), REMOTE));
+    Task updateB =
+        tasksStorage
+        .new Task(
+            new ReplicateRefUpdate(
+                "projectB", REF, getUrish("http://example.com/projectB.git"), REMOTE));
+    updateA.create();
+    updateB.create();
+    assertIsWaiting(updateA);
+    assertIsWaiting(updateB);
+  }
+
+  @Test
+  public void canHaveTwoRunningTasksForDifferentProjects() throws Exception {
+    Task updateA =
+        tasksStorage
+        .new Task(
+            new ReplicateRefUpdate(
+                "projectA", REF, getUrish("http://example.com/projectA.git"), REMOTE));
+    Task updateB =
+        tasksStorage
+        .new Task(
+            new ReplicateRefUpdate(
+                "projectB", REF, getUrish("http://example.com/projectB.git"), REMOTE));
+    updateA.create();
+    updateB.create();
+    updateA.start();
+    updateB.start();
+    assertIsRunning(updateA);
+    assertIsRunning(updateB);
+  }
+
+  @Test
+  public void canHaveTwoWaitingTasksForDifferentRemotes() throws Exception {
+    Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteA"));
+    Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteB"));
+    updateA.create();
+    updateB.create();
+    assertIsWaiting(updateA);
+    assertIsWaiting(updateB);
+  }
+
+  @Test
+  public void canHaveTwoRunningTasksForDifferentRemotes() throws Exception {
+    Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteA"));
+    Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteB"));
+    updateA.create();
+    updateB.create();
+    updateA.start();
+    updateB.start();
+    assertIsRunning(updateA);
+    assertIsRunning(updateB);
+  }
+
+  @Test
+  public void illegalFinishNonRunningTaskIsGraceful() throws Exception {
+    Task task = tasksStorage.new Task(REF_UPDATE);
+    task.finish();
+    assertNotWaiting(task);
+    assertNotRunning(task);
+  }
+
+  @Test
+  public void illegalResetNonRunningTaskIsGraceful() throws Exception {
+    Task task = tasksStorage.new Task(REF_UPDATE);
+    task.reset();
+    assertNotWaiting(task);
+    assertNotRunning(task);
+  }
+
+  @Test
+  public void illegalResetFinishedTaskIsGraceful() throws Exception {
+    Task task = tasksStorage.new Task(REF_UPDATE);
+
+    task.create();
+    task.start();
+    task.finish();
+    task.reset();
+    assertNotWaiting(task);
+    assertNotRunning(task);
+  }
+
+  @Test
+  public void illegalFinishFinishedTaskIsGraceful() throws Exception {
+    Task task = tasksStorage.new Task(REF_UPDATE);
+
+    task.create();
+    task.start();
+    task.finish();
+    task.finish();
+    assertNotWaiting(task);
+    assertNotRunning(task);
+  }
+
+  @Test
+  public void illegalFinishResetTaskIsGraceful() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+
+    original.create();
+    original.start();
+    original.reset();
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    persistedView.finish();
+    assertIsWaiting(persistedView);
+  }
+
+  @Test
+  public void illegalResetResetTaskIsGraceful() throws Exception {
+    Task original = tasksStorage.new Task(REF_UPDATE);
+
+    original.create();
+    original.start();
+    original.reset();
+
+    Task persistedView = tasksStorage.new Task(REF_UPDATE);
+    persistedView.reset();
+    assertIsWaiting(persistedView);
+  }
+
+  private void assertIsWaiting(Task task) {
+    assertTrue(whiteBoxIsWaiting(task));
+  }
+
+  private void assertNotWaiting(Task task) {
+    assertFalse(whiteBoxIsWaiting(task));
+  }
+
+  private void assertIsRunning(Task task) {
+    assertTrue(whiteBoxIsRunning(task));
+  }
+
+  private void assertNotRunning(Task task) {
+    assertFalse(whiteBoxIsRunning(task));
+  }
+
+  private boolean whiteBoxIsRunning(Task task) {
+    return Files.exists(task.running);
+  }
+
+  private boolean whiteBoxIsWaiting(Task task) {
+    return Files.exists(task.waiting);
+  }
+
+  public static URIish getUrish(String uri) {
+    try {
+      return new URIish(uri);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("Cannot instantiate URIish object", e);
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
new file mode 100644
index 0000000..0a92c76
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -0,0 +1,386 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.jimfs.Configuration;
+import com.google.common.jimfs.Jimfs;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystem;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Objects;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicationTasksStorageTest {
+  protected static final String PROJECT = "myProject";
+  protected static final String REF = "myRef";
+  protected static final String REMOTE = "myDest";
+  protected static final URIish URISH = getUrish("http://example.com/" + PROJECT + ".git");
+  protected static final ReplicateRefUpdate REF_UPDATE =
+      new ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE);
+
+  protected ReplicationTasksStorage storage;
+  protected FileSystem fileSystem;
+  protected Path storageSite;
+  protected UriUpdates uriUpdates;
+
+  @Before
+  public void setUp() throws Exception {
+    fileSystem = Jimfs.newFileSystem(Configuration.unix());
+    storageSite = fileSystem.getPath("replication_site");
+    storage = new ReplicationTasksStorage(storageSite);
+    uriUpdates = TestUriUpdates.create(REF_UPDATE);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fileSystem.close();
+  }
+
+  @Test
+  public void canListEmptyStorage() throws Exception {
+    assertThat(storage.listWaiting()).isEmpty();
+    assertThat(storage.listRunning()).isEmpty();
+  }
+
+  @Test
+  public void canListWaitingUpdate() throws Exception {
+    storage.create(REF_UPDATE);
+    assertContainsExactly(storage.listWaiting(), REF_UPDATE);
+  }
+
+  @Test
+  public void canStartWaitingUpdate() throws Exception {
+    storage.create(REF_UPDATE);
+    storage.start(uriUpdates);
+    assertThat(storage.listWaiting()).isEmpty();
+    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+  }
+
+  @Test
+  public void canFinishRunningUpdate() throws Exception {
+    storage.create(REF_UPDATE);
+    storage.start(uriUpdates);
+    storage.finish(uriUpdates);
+    assertNoIncompleteTasks(storage);
+  }
+
+  @Test
+  public void instancesOfTheSameStorageHaveTheSameElements() throws Exception {
+    ReplicationTasksStorage persistedView = new ReplicationTasksStorage(storageSite);
+
+    assertThat(storage.listWaiting()).isEmpty();
+    assertThat(persistedView.listWaiting()).isEmpty();
+
+    storage.create(REF_UPDATE);
+    assertContainsExactly(storage.listWaiting(), REF_UPDATE);
+    assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+
+    storage.start(uriUpdates);
+    assertThat(storage.listWaiting()).isEmpty();
+    assertThat(persistedView.listWaiting()).isEmpty();
+    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+
+    storage.finish(uriUpdates);
+    assertThat(storage.listRunning()).isEmpty();
+    assertThat(persistedView.listRunning()).isEmpty();
+  }
+
+  @Test
+  public void sameRefUpdateCreatedTwiceIsStoredOnce() throws Exception {
+    String key = storage.create(REF_UPDATE);
+    String secondKey = storage.create(REF_UPDATE);
+    assertEquals(key, secondKey);
+    assertContainsExactly(storage.listWaiting(), REF_UPDATE);
+  }
+
+  @Test
+  public void canCreateDifferentUris() throws Exception {
+    ReplicateRefUpdate updateB =
+        new ReplicateRefUpdate(
+            PROJECT,
+            REF,
+            getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
+            REMOTE);
+
+    String keyA = storage.create(REF_UPDATE);
+    String keyB = storage.create(updateB);
+    assertThat(storage.listWaiting()).hasSize(2);
+    assertNotEquals(keyA, keyB);
+  }
+
+  @Test
+  public void canStartDifferentUris() throws Exception {
+    ReplicateRefUpdate updateB =
+        new ReplicateRefUpdate(
+            PROJECT,
+            REF,
+            getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
+            REMOTE);
+    UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+    storage.create(REF_UPDATE);
+    storage.create(updateB);
+
+    storage.start(uriUpdates);
+    assertContainsExactly(storage.listWaiting(), updateB);
+    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+
+    storage.start(uriUpdatesB);
+    assertThat(storage.listWaiting()).isEmpty();
+    assertContainsExactly(storage.listRunning(), REF_UPDATE, updateB);
+  }
+
+  @Test
+  public void canFinishDifferentUris() throws Exception {
+    ReplicateRefUpdate updateB =
+        new ReplicateRefUpdate(
+            PROJECT,
+            REF,
+            getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
+            REMOTE);
+    UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+    storage.create(REF_UPDATE);
+    storage.create(updateB);
+    storage.start(uriUpdates);
+    storage.start(uriUpdatesB);
+
+    storage.finish(uriUpdates);
+    assertContainsExactly(storage.listRunning(), updateB);
+
+    storage.finish(uriUpdatesB);
+    assertThat(storage.listRunning()).isEmpty();
+  }
+
+  @Test
+  public void differentUrisCreatedTwiceIsStoredOnce() throws Exception {
+    ReplicateRefUpdate updateB =
+        new ReplicateRefUpdate(
+            PROJECT,
+            REF,
+            getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
+            REMOTE);
+
+    storage.create(REF_UPDATE);
+    storage.create(updateB);
+    storage.create(REF_UPDATE);
+    storage.create(updateB);
+    assertThat(storage.listWaiting()).hasSize(2);
+  }
+
+  @Test
+  public void canCreateMulipleRefsForSameUri() throws Exception {
+    ReplicateRefUpdate refA = new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE);
+    ReplicateRefUpdate refB = new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE);
+
+    String keyA = storage.create(refA);
+    String keyB = storage.create(refB);
+    assertThat(storage.listWaiting()).hasSize(2);
+    assertNotEquals(keyA, keyB);
+  }
+
+  @Test
+  public void canFinishMulipleRefsForSameUri() throws Exception {
+    ReplicateRefUpdate refUpdateA = new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE);
+    ReplicateRefUpdate refUpdateB = new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE);
+    UriUpdates uriUpdatesA = TestUriUpdates.create(refUpdateA);
+    UriUpdates uriUpdatesB = TestUriUpdates.create(refUpdateB);
+    storage.create(refUpdateA);
+    storage.create(refUpdateB);
+    storage.start(uriUpdatesA);
+    storage.start(uriUpdatesB);
+
+    storage.finish(uriUpdatesA);
+    assertContainsExactly(storage.listRunning(), refUpdateB);
+
+    storage.finish(uriUpdatesB);
+    assertThat(storage.listRunning()).isEmpty();
+  }
+
+  @Test
+  public void canResetUpdate() throws Exception {
+    storage.create(REF_UPDATE);
+    storage.start(uriUpdates);
+
+    storage.reset(uriUpdates);
+    assertContainsExactly(storage.listWaiting(), REF_UPDATE);
+    assertThat(storage.listRunning()).isEmpty();
+  }
+
+  @Test
+  public void canCompleteResetUpdate() throws Exception {
+    storage.create(REF_UPDATE);
+    storage.start(uriUpdates);
+    storage.reset(uriUpdates);
+
+    storage.start(uriUpdates);
+    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+    assertThat(storage.listWaiting()).isEmpty();
+
+    storage.finish(uriUpdates);
+    assertNoIncompleteTasks(storage);
+  }
+
+  @Test
+  public void canResetAllEmpty() throws Exception {
+    storage.resetAll();
+    assertNoIncompleteTasks(storage);
+  }
+
+  @Test
+  public void canResetAllUpdate() throws Exception {
+    storage.create(REF_UPDATE);
+    storage.start(uriUpdates);
+
+    storage.resetAll();
+    assertContainsExactly(storage.listWaiting(), REF_UPDATE);
+    assertThat(storage.listRunning()).isEmpty();
+  }
+
+  @Test
+  public void canCompleteResetAllUpdate() throws Exception {
+    storage.create(REF_UPDATE);
+    storage.start(uriUpdates);
+    storage.resetAll();
+
+    storage.start(uriUpdates);
+    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+    assertThat(storage.listWaiting()).isEmpty();
+
+    storage.finish(uriUpdates);
+    assertNoIncompleteTasks(storage);
+  }
+
+  @Test
+  public void canResetAllMultipleUpdates() throws Exception {
+    ReplicateRefUpdate updateB =
+        new ReplicateRefUpdate(
+            PROJECT,
+            REF,
+            getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
+            REMOTE);
+    UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+    storage.create(REF_UPDATE);
+    storage.create(updateB);
+    storage.start(uriUpdates);
+    storage.start(uriUpdatesB);
+
+    storage.resetAll();
+    assertContainsExactly(storage.listWaiting(), REF_UPDATE, updateB);
+  }
+
+  @Test
+  public void canCompleteMultipleResetAllUpdates() throws Exception {
+    ReplicateRefUpdate updateB =
+        new ReplicateRefUpdate(
+            PROJECT,
+            REF,
+            getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
+            REMOTE);
+    UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+    storage.create(REF_UPDATE);
+    storage.create(updateB);
+    storage.start(uriUpdates);
+    storage.start(uriUpdatesB);
+    storage.resetAll();
+
+    storage.start(uriUpdates);
+    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+    assertContainsExactly(storage.listWaiting(), updateB);
+
+    storage.start(uriUpdatesB);
+    assertContainsExactly(storage.listRunning(), REF_UPDATE, updateB);
+    assertThat(storage.listWaiting()).isEmpty();
+
+    storage.finish(uriUpdates);
+    storage.finish(uriUpdatesB);
+    assertNoIncompleteTasks(storage);
+  }
+
+  @Test(expected = Test.None.class /* no exception expected */)
+  public void illegalFinishUncreatedIsGraceful() throws Exception {
+    storage.finish(uriUpdates);
+  }
+
+  @Test(expected = Test.None.class /* no exception expected */)
+  public void illegalDoubleFinishIsGraceful() throws Exception {
+    storage.create(REF_UPDATE);
+    storage.start(uriUpdates);
+    storage.finish(uriUpdates);
+
+    storage.finish(uriUpdates);
+  }
+
+  @Test(expected = Test.None.class /* no exception expected */)
+  public void illegalDoubleFinishDifferentUriIsGraceful() throws Exception {
+    ReplicateRefUpdate updateB =
+        new ReplicateRefUpdate(
+            PROJECT,
+            REF,
+            getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
+            REMOTE);
+    UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+    storage.create(REF_UPDATE);
+    storage.create(updateB);
+    storage.start(uriUpdates);
+    storage.start(uriUpdatesB);
+    storage.finish(uriUpdates);
+    storage.finish(uriUpdatesB);
+
+    storage.finish(uriUpdates);
+    storage.finish(uriUpdatesB);
+    assertThat(storage.listRunning()).isEmpty();
+  }
+
+  private void assertNoIncompleteTasks(ReplicationTasksStorage storage) {
+    assertThat(storage.listWaiting()).isEmpty();
+    assertThat(storage.listRunning()).isEmpty();
+  }
+
+  private void assertContainsExactly(
+      List<ReplicateRefUpdate> all, ReplicateRefUpdate... refUpdates) {
+    assertThat(all).hasSize(refUpdates.length);
+    for (int i = 0; i < refUpdates.length; i++) {
+      assertTrue(equals(all.get(i), refUpdates[i]));
+    }
+  }
+
+  private boolean equals(ReplicateRefUpdate one, ReplicateRefUpdate two) {
+    return (one == null && two == null)
+        || (one != null
+            && two != null
+            && Objects.equals(one.project, two.project)
+            && Objects.equals(one.ref, two.ref)
+            && Objects.equals(one.remote, two.remote)
+            && Objects.equals(one.uri, two.uri));
+  }
+
+  public static URIish getUrish(String uri) {
+    try {
+      return new URIish(uri);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("Cannot instantiate URIish object", e);
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
new file mode 100644
index 0000000..2fd3ee3
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
@@ -0,0 +1,51 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.auto.value.AutoValue;
+import com.google.gerrit.entities.Project;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Set;
+import org.eclipse.jgit.transport.URIish;
+
+@AutoValue
+public abstract class TestUriUpdates implements UriUpdates {
+  public static TestUriUpdates create(ReplicateRefUpdate update) throws URISyntaxException {
+    return create(
+        Project.nameKey(update.project),
+        new URIish(update.uri),
+        update.remote,
+        Collections.singleton(update.ref));
+  }
+
+  public static TestUriUpdates create(
+      Project.NameKey project, URIish uri, String remote, Set<String> refs) {
+    return new AutoValue_TestUriUpdates(project, uri, remote, refs);
+  }
+
+  @Override
+  public abstract Project.NameKey getProjectNameKey();
+
+  @Override
+  public abstract URIish getURI();
+
+  @Override
+  public abstract String getRemoteName();
+
+  @Override
+  public abstract Set<String> getRefs();
+}