Merge branch 'stable-3.3'

* stable-3.3:
  Remove dependency on commons-io library
  Fix replication to retry on lock errors

Change-Id: I52fea645ccb55c1380702c071f1c7f7f134dc1b2
diff --git a/BUILD b/BUILD
index df6a2c0..2a6dfd5 100644
--- a/BUILD
+++ b/BUILD
@@ -14,6 +14,9 @@
         "Gerrit-SshModule: com.googlesource.gerrit.plugins.replication.SshModule",
     ],
     resources = glob(["src/main/resources/**/*"]),
+    deps = [
+        "//lib/auto:auto-value-gson",
+    ],
 )
 
 junit_tests(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoValueTypeAdapterFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoValueTypeAdapterFactory.java
new file mode 100644
index 0000000..5b27176
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoValueTypeAdapterFactory.java
@@ -0,0 +1,25 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gson.TypeAdapterFactory;
+import com.ryanharter.auto.value.gson.GsonTypeAdapterFactory;
+
+@GsonTypeAdapterFactory
+public abstract class AutoValueTypeAdapterFactory implements TypeAdapterFactory {
+  public static TypeAdapterFactory create() {
+    return new AutoValueGson_AutoValueTypeAdapterFactory();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
new file mode 100644
index 0000000..89b97e9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
@@ -0,0 +1,179 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.flogger.FluentLogger;
+import java.util.Iterator;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Stream;
+
+/**
+ * Non-greedily schedules consecutive tasks in a executor, one for each item returned by an {@link
+ * Iterator}.
+ *
+ * <p>This scheduler is useful when an {@link Iterator} might provide a large amount of items to be
+ * worked on in a non prioritized fashion. This scheduler will scavenge any unused threads in its
+ * executor to do work, however only one item will ever be outstanding and waiting at a time. This
+ * scheduling policy allows large collections to be processed without interfering much with higher
+ * prioritized tasks while still making regular progress on the items provided by the {@link
+ * Iterator}. To keep the level of interference to a minimum, ensure that the amount of work needed
+ * for each item is small and short.
+ */
+public class ChainedScheduler<T> {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  /**
+   * Override to implement a common thread pool task for all the items returned by the {@link
+   * Iterator}
+   */
+  public interface Runner<T> {
+    /** Will get executed in the thread pool task for each item */
+    void run(T item);
+
+    /** Will get called after the last item completes */
+    default void onDone() {}
+
+    /** Will get called to display {@link Runnable} for item in show-queue output */
+    default String toString(T item) {
+      return "Chained " + item.toString();
+    }
+  }
+
+  /** Override to decorate an existing {@link Runner} */
+  public static class ForwardingRunner<T> implements Runner<T> {
+    protected Runner<T> delegateRunner;
+
+    public ForwardingRunner(Runner<T> delegate) {
+      delegateRunner = delegate;
+    }
+
+    @Override
+    public void run(T item) {
+      delegateRunner.run(item);
+    }
+
+    @Override
+    public void onDone() {
+      delegateRunner.onDone();
+    }
+
+    @Override
+    public String toString(T item) {
+      return delegateRunner.toString(item);
+    }
+  }
+
+  /**
+   * Use when a {@link Stream} is needed instead of an {@link Iterator}, it will close the {@link
+   * Stream}
+   */
+  public static class StreamScheduler<T> extends ChainedScheduler<T> {
+    public StreamScheduler(
+        ScheduledExecutorService threadPool, final Stream<T> stream, Runner<T> runner) {
+      super(
+          threadPool,
+          stream.iterator(),
+          new ForwardingRunner<T>(runner) {
+            @Override
+            public void onDone() {
+              stream.close();
+              super.onDone();
+            }
+          });
+    }
+  }
+
+  /** Internal {@link Runnable} containing one item to run and which schedules the next one. */
+  protected class Chainer implements Runnable {
+    protected T item;
+
+    public Chainer(T item) {
+      this.item = item;
+    }
+
+    @Override
+    public void run() {
+      boolean scheduledNext = scheduleNext();
+      try {
+        runner.run(item);
+      } catch (RuntimeException e) { // catch to prevent chain from breaking
+        logger.atSevere().withCause(e).log("Error while running: " + item);
+      }
+      if (!scheduledNext) {
+        runner.onDone();
+      }
+    }
+
+    @Override
+    public String toString() {
+      return runner.toString(item);
+    }
+  }
+
+  protected final ScheduledExecutorService threadPool;
+  protected final Iterator<T> iterator;
+  protected final Runner<T> runner;
+
+  /**
+   * Note: The {@link Iterator} passed in will only ever be accessed from one thread at a time, and
+   * the internal state of the {@link Iterator} will be updated after each next() call before
+   * operating on the iterator again.
+   */
+  public ChainedScheduler(
+      ScheduledExecutorService threadPool, Iterator<T> iterator, Runner<T> runner) {
+    this.threadPool = threadPool;
+    this.iterator = iterator;
+    this.runner = runner;
+
+    if (!scheduleNext()) {
+      runner.onDone();
+    }
+  }
+
+  /**
+   * Concurrency note:
+   *
+   * <p>Since there is only one chain of tasks and each task submits the next task to the executor,
+   * the calls from here to the iterator.next() call will never be executed concurrently by more
+   * than one thread.
+   *
+   * <p>Data synchronization note:
+   *
+   * <p>This section in the [1] javadoc: "Actions in a thread prior to the submission of a Runnable
+   * to an Executor happen-before its execution begins..." guarantees that since the iterator.next()
+   * happens before the schedule(), and the new tasks call to hasNext() happen after the submission,
+   * that the hasNext() call will see the results of the previous next() call even though it may
+   * have happened on a different thread.
+   *
+   * <p>[1]
+   * <li>https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html
+   *     (section "Memory Consistency Properties")
+   * <li>The methods of all classes in java.util.concurrent and its subpackages extends the
+   *     guarantees of the java memory model to higher-level synchronization.
+   * <li>In particular this guarantee of the java.util.concurrent applies here:
+   */
+  protected boolean scheduleNext() {
+    if (!iterator.hasNext()) {
+      return false;
+    }
+
+    schedule(new Chainer(iterator.next()));
+    return true;
+  }
+
+  protected void schedule(Runnable r) {
+    threadPool.execute(r);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java
index 431c7d2..c3556af 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java
@@ -28,4 +28,10 @@
   public GroupMembership getEffectiveGroups() {
     return effectiveGroups;
   }
+
+  @Override
+  public Object getCacheKey() {
+    // Never cache a remote user
+    return new Object();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 990e387..66be6dd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -86,10 +86,8 @@
     if (!running) {
       destinations.get().startup(workQueue);
       running = true;
-      replicationTasksStorage.resetAll();
-      Thread t = new Thread(this::firePendingEvents, "firePendingEvents");
-      t.setDaemon(true);
-      t.start();
+      replicationTasksStorage.recoverAll();
+      firePendingEvents();
       fireBeforeStartupEvents();
       distributor = new Distributor(workQueue);
     }
@@ -177,7 +175,7 @@
     if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
       for (URIish uri : cfg.getURIs(project, urlMatch)) {
         replicationTasksStorage.create(
-            new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+            ReplicateRefUpdate.create(project.get(), refName, uri, cfg.getRemoteConfigName()));
         cfg.schedule(project, refName, uri, state, now);
       }
     } else {
@@ -190,20 +188,32 @@
 
   private void firePendingEvents() {
     replaying = true;
-    try {
-      replaying = true;
-      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
-        try {
-          fire(new URIish(t.uri), Project.nameKey(t.project), t.ref);
-        } catch (URISyntaxException e) {
-          repLog.atSevere().withCause(e).log("Encountered malformed URI for persisted event %s", t);
-        }
-      }
-    } catch (Throwable e) {
-      repLog.atSevere().withCause(e).log("Unexpected error while firing pending events");
-    } finally {
-      replaying = false;
-    }
+    new ChainedScheduler.StreamScheduler<>(
+        workQueue.getDefaultQueue(),
+        replicationTasksStorage.streamWaiting(),
+        new ChainedScheduler.Runner<ReplicationTasksStorage.ReplicateRefUpdate>() {
+          @Override
+          public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
+            try {
+              fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref());
+            } catch (URISyntaxException e) {
+              repLog.atSevere().withCause(e).log(
+                  "Encountered malformed URI for persisted event %s", u);
+            } catch (Throwable e) {
+              repLog.atSevere().withCause(e).log("Unexpected error while firing pending events");
+            }
+          }
+
+          @Override
+          public void onDone() {
+            replaying = false;
+          }
+
+          @Override
+          public String toString(ReplicationTasksStorage.ReplicateRefUpdate u) {
+            return "Scheduling push to " + String.format("%s:%s", u.project(), u.ref());
+          }
+        });
   }
 
   private void pruneCompleted() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 3947ebc..d6cc8be 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -16,10 +16,13 @@
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.flogger.FluentLogger;
 import com.google.common.hash.Hashing;
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
 import com.google.inject.Inject;
 import com.google.inject.ProvisionException;
 import com.google.inject.Singleton;
@@ -29,9 +32,7 @@
 import java.nio.file.NotDirectoryException;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
-import java.util.List;
 import java.util.Optional;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.transport.URIish;
@@ -61,10 +62,11 @@
 public class ReplicationTasksStorage {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
-  public static class ReplicateRefUpdate {
-    public static Optional<ReplicateRefUpdate> createOptionally(Path file) {
+  @AutoValue
+  public abstract static class ReplicateRefUpdate {
+    public static Optional<ReplicateRefUpdate> createOptionally(Path file, Gson gson) {
       try {
-        return Optional.ofNullable(create(file));
+        return Optional.ofNullable(create(file, gson));
       } catch (NoSuchFileException e) {
         logger.atFine().log("File %s not found while reading task", file);
       } catch (IOException e) {
@@ -73,30 +75,35 @@
       return Optional.empty();
     }
 
-    public static ReplicateRefUpdate create(Path file) throws IOException {
+    public static ReplicateRefUpdate create(Path file, Gson gson) throws IOException {
       String json = new String(Files.readAllBytes(file), UTF_8);
-      return GSON.fromJson(json, ReplicateRefUpdate.class);
+      return gson.fromJson(json, ReplicateRefUpdate.class);
     }
 
-    public final String project;
-    public final String ref;
-    public final String uri;
-    public final String remote;
-
-    public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
-      this.project = project;
-      this.ref = ref;
-      this.uri = uri.toASCIIString();
-      this.remote = remote;
+    public static ReplicateRefUpdate create(String project, String ref, URIish uri, String remote) {
+      return new AutoValue_ReplicationTasksStorage_ReplicateRefUpdate(
+          project, ref, uri.toASCIIString(), remote);
     }
 
+    public abstract String project();
+
+    public abstract String ref();
+
+    public abstract String uri();
+
+    public abstract String remote();
+
     @Override
-    public String toString() {
-      return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
+    public final String toString() {
+      return "ref-update " + project() + ":" + ref() + " uri:" + uri() + " remote:" + remote();
+    }
+
+    public static TypeAdapter<ReplicateRefUpdate> typeAdapter(Gson gson) {
+      return new AutoValue_ReplicationTasksStorage_ReplicateRefUpdate.GsonTypeAdapter(gson);
     }
   }
 
-  private static final Gson GSON = new Gson();
+  private final Gson gson;
 
   private final Path buildingUpdates;
   private final Path runningUpdates;
@@ -112,6 +119,8 @@
     buildingUpdates = refUpdates.resolve("building");
     runningUpdates = refUpdates.resolve("running");
     waitingUpdates = refUpdates.resolve("waiting");
+    gson =
+        new GsonBuilder().registerTypeAdapterFactory(AutoValueTypeAdapterFactory.create()).create();
   }
 
   public synchronized String create(ReplicateRefUpdate r) {
@@ -130,10 +139,8 @@
     }
   }
 
-  public synchronized void resetAll() {
-    for (ReplicateRefUpdate r : list(createDir(runningUpdates))) {
-      new Task(r).reset();
-    }
+  public synchronized void recoverAll() {
+    streamRunning().forEach(r -> new Task(r).recover());
   }
 
   public boolean isWaiting(UriUpdates uriUpdates) {
@@ -148,21 +155,17 @@
     }
   }
 
-  public List<ReplicateRefUpdate> listWaiting() {
-    return list(createDir(waitingUpdates));
+  public Stream<ReplicateRefUpdate> streamWaiting() {
+    return streamRecursive(createDir(waitingUpdates));
   }
 
-  public List<ReplicateRefUpdate> listRunning() {
-    return list(createDir(runningUpdates));
-  }
-
-  private List<ReplicateRefUpdate> list(Path taskDir) {
-    return streamRecursive(taskDir).collect(Collectors.toList());
+  public Stream<ReplicateRefUpdate> streamRunning() {
+    return streamRecursive(createDir(runningUpdates));
   }
 
   private Stream<ReplicateRefUpdate> streamRecursive(Path dir) {
     return walkNonDirs(dir)
-        .map(path -> ReplicateRefUpdate.createOptionally(path))
+        .map(path -> ReplicateRefUpdate.createOptionally(path, gson))
         .filter(Optional::isPresent)
         .map(Optional::get);
   }
@@ -200,7 +203,8 @@
 
     public Task(ReplicateRefUpdate update) {
       this.update = update;
-      String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
+      String key =
+          update.project() + "\n" + update.ref() + "\n" + update.uri() + "\n" + update.remote();
       taskKey = sha1(key).name();
       running = createDir(runningUpdates).resolve(taskKey);
       waiting = createDir(waitingUpdates).resolve(taskKey);
@@ -211,7 +215,7 @@
         return taskKey;
       }
 
-      String json = GSON.toJson(update) + "\n";
+      String json = gson.toJson(update) + "\n";
       try {
         Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null);
         logger.atFine().log("CREATE %s %s", tmp, updateLog());
@@ -232,6 +236,10 @@
       rename(running, waiting);
     }
 
+    public void recover() {
+      rename(running, waiting);
+    }
+
     public boolean isWaiting() {
       return Files.exists(waiting);
     }
@@ -255,7 +263,7 @@
     }
 
     private String updateLog() {
-      return String.format("(%s:%s => %s)", update.project, update.ref, update.uri);
+      return String.format("(%s:%s => %s)", update.project(), update.ref(), update.uri());
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
index 9c56c8e..a9985d2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
@@ -34,7 +34,7 @@
     return getRefs().stream()
         .map(
             (ref) ->
-                new ReplicationTasksStorage.ReplicateRefUpdate(
+                ReplicationTasksStorage.ReplicateRefUpdate.create(
                     getProjectNameKey().get(), ref, getURI(), getRemoteName()))
         .collect(Collectors.toList());
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
new file mode 100644
index 0000000..90191f2
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
@@ -0,0 +1,463 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import com.google.common.collect.ForwardingIterator;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ChainedSchedulerTest {
+  /** A simple {@link Runnable} that waits until the start() method is called. */
+  public class WaitingRunnable implements Runnable {
+    protected final CountDownLatch start;
+
+    public WaitingRunnable() {
+      this(new CountDownLatch(1));
+    }
+
+    public WaitingRunnable(CountDownLatch latch) {
+      this.start = latch;
+    }
+
+    @Override
+    public void run() {
+      try {
+        start.await();
+      } catch (InterruptedException e) {
+        missedAwaits.incrementAndGet();
+      }
+    }
+
+    public void start() {
+      start.countDown();
+    }
+  }
+
+  /** A simple {@link Runnable} that can be awaited to start to run. */
+  public static class WaitableRunnable implements Runnable {
+    protected final CountDownLatch started = new CountDownLatch(1);
+
+    @Override
+    public void run() {
+      started.countDown();
+    }
+
+    public boolean isStarted() {
+      return started.getCount() == 0;
+    }
+
+    public boolean awaitStart(int itemsBefore) {
+      try {
+        return started.await(SECONDS_SYNCHRONIZE * itemsBefore, SECONDS);
+      } catch (InterruptedException e) {
+        return false;
+      }
+    }
+  }
+
+  /** An {@link Iterator} wrapper which keeps track of how many times next() has been called. */
+  public static class CountingIterator extends ForwardingIterator<String> {
+    public volatile int count = 0;
+
+    protected Iterator<String> delegate;
+
+    public CountingIterator(Iterator<String> delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public synchronized String next() {
+      count++;
+      return super.next();
+    }
+
+    @Override
+    protected Iterator<String> delegate() {
+      return delegate;
+    }
+  }
+
+  /** A {@link ChainedScheduler.Runner} which keeps track of completion and counts. */
+  public static class TestRunner implements ChainedScheduler.Runner<String> {
+    protected final AtomicInteger runCount = new AtomicInteger(0);
+    protected final CountDownLatch onDone = new CountDownLatch(1);
+
+    @Override
+    public void run(String item) {
+      incrementAndGet();
+    }
+
+    public int runCount() {
+      return runCount.get();
+    }
+
+    @Override
+    public void onDone() {
+      onDone.countDown();
+    }
+
+    public boolean isDone() {
+      return onDone.getCount() <= 0;
+    }
+
+    public boolean awaitDone(int items) {
+      try {
+        return onDone.await(items * SECONDS_SYNCHRONIZE, SECONDS);
+      } catch (InterruptedException e) {
+        return false;
+      }
+    }
+
+    protected int incrementAndGet() {
+      return runCount.incrementAndGet();
+    }
+  }
+
+  /**
+   * A {@link TestRunner} that can be awaited to start to run and additionally will wait until
+   * increment() or runOnewRandomStarted() is called to complete.
+   */
+  public class WaitingRunner extends TestRunner {
+    protected class RunContext extends WaitableRunnable {
+      CountDownLatch run = new CountDownLatch(1);
+      CountDownLatch ran = new CountDownLatch(1);
+      int count;
+
+      @Override
+      public void run() {
+        super.run();
+        try {
+          run.await();
+          count = incrementAndGet();
+          ran.countDown();
+        } catch (InterruptedException e) {
+          missedAwaits.incrementAndGet();
+        }
+      }
+
+      public synchronized boolean startIfNotRunning() throws InterruptedException {
+        if (run.getCount() > 0) {
+          increment();
+          return true;
+        }
+        return false;
+      }
+
+      public synchronized int increment() throws InterruptedException {
+        run.countDown();
+        ran.await(); // no timeout needed as RunContext.run() calls countDown unless interrupted
+        return count;
+      }
+    }
+
+    protected final Map<String, RunContext> ctxByItem = new ConcurrentHashMap<>();
+
+    @Override
+    public void run(String item) {
+      context(item).run();
+    }
+
+    public void runOneRandomStarted() throws InterruptedException {
+      while (true) {
+        for (RunContext ctx : ctxByItem.values()) {
+          if (ctx.isStarted()) {
+            if (ctx.startIfNotRunning()) {
+              return;
+            }
+          }
+        }
+        MILLISECONDS.sleep(1);
+      }
+    }
+
+    public boolean awaitStart(String item, int itemsBefore) {
+      return context(item).awaitStart(itemsBefore);
+    }
+
+    public int increment(String item) throws InterruptedException {
+      return increment(item, 1);
+    }
+
+    public int increment(String item, int itemsBefore) throws InterruptedException {
+      awaitStart(item, itemsBefore);
+      return context(item).increment();
+    }
+
+    protected RunContext context(String item) {
+      return ctxByItem.computeIfAbsent(item, k -> new RunContext());
+    }
+  }
+
+  // Time for one synchronization event such as await(), or variable
+  // incrementing across threads to take
+  public static final int SECONDS_SYNCHRONIZE = 3;
+  public static final int MANY_ITEMS_SIZE = 1000;
+
+  public static String FIRST = item(1);
+  public static String SECOND = item(2);
+  public static String THIRD = item(3);
+
+  public final AtomicInteger missedAwaits = new AtomicInteger(0); // non-zero signals an error
+
+  @Before
+  public void setup() {
+    missedAwaits.set(0);
+  }
+
+  @Test
+  public void emptyCompletesImmediately() {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    TestRunner runner = new TestRunner();
+    List<String> items = new ArrayList<>();
+
+    new ChainedScheduler<>(executor, items.iterator(), runner);
+    assertThat(runner.awaitDone(1)).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void oneItemCompletes() throws Exception {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    TestRunner runner = new TestRunner();
+    List<String> items = new ArrayList<>();
+    items.add(FIRST);
+
+    new ChainedScheduler<>(executor, items.iterator(), runner);
+    assertThat(runner.awaitDone(1)).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(1);
+  }
+
+  @Test
+  public void manyItemsAllComplete() throws Exception {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    TestRunner runner = new TestRunner();
+    List<String> items = createManyItems();
+
+    new ChainedScheduler<>(executor, items.iterator(), runner);
+    assertThat(runner.awaitDone(items.size())).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(items.size());
+  }
+
+  @Test
+  public void exceptionInTaskDoesNotAbortIteration() throws Exception {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    TestRunner runner =
+        new TestRunner() {
+          @Override
+          public void run(String item) {
+            super.run(item);
+            throw new RuntimeException();
+          }
+        };
+    List<String> items = new ArrayList<>();
+    items.add(FIRST);
+    items.add(SECOND);
+    items.add(THIRD);
+
+    new ChainedScheduler<>(executor, items.iterator(), runner);
+    assertThat(runner.awaitDone(items.size())).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(items.size());
+  }
+
+  @Test
+  public void onDoneNotCalledBeforeAllCompleted() throws Exception {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    WaitingRunner runner = new WaitingRunner();
+    List<String> items = createManyItems();
+
+    new ChainedScheduler<>(executor, items.iterator(), runner);
+    for (int i = 1; i <= items.size(); i++) {
+      assertThat(runner.isDone()).isEqualTo(false);
+      runner.runOneRandomStarted();
+    }
+
+    assertThat(runner.awaitDone(items.size())).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(items.size());
+    assertThat(missedAwaits.get()).isEqualTo(0);
+  }
+
+  @Test
+  public void otherTasksOnlyEverWaitForAtMostOneRunningPlusOneWaiting() throws Exception {
+    for (int threads = 1; threads <= 10; threads++) {
+      ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+      WaitingRunner runner = new WaitingRunner();
+      List<String> items = createItems(threads + 1 /* Running */ + 1 /* Waiting */);
+      CountingIterator it = new CountingIterator(items.iterator());
+
+      new ChainedScheduler<>(executor, it, runner);
+      assertThat(runner.awaitStart(FIRST, 1)).isEqualTo(true); // Confirms at least one Running
+      assertThat(it.count).isGreaterThan(1); // Confirms at least one extra Waiting or Running
+      assertThat(it.count).isLessThan(items.size()); // Confirms at least one still not queued
+
+      WaitableRunnable external = new WaitableRunnable();
+      executor.execute(external);
+      assertThat(external.isStarted()).isEqualTo(false);
+
+      // Completes 2, (at most one Running + 1 Waiting)
+      assertThat(runner.increment(FIRST)).isEqualTo(1); // Was Running
+      assertThat(runner.increment(SECOND)).isEqualTo(2); // Was Waiting
+      // Asserts that the one that still needed to be queued is not blocking this external task
+      assertThat(external.awaitStart(1)).isEqualTo(true);
+
+      for (int i = 3; i <= items.size(); i++) {
+        runner.increment(item(i));
+      }
+      assertThat(missedAwaits.get()).isEqualTo(0);
+    }
+  }
+
+  @Test
+  public void saturatesManyFreeThreads() throws Exception {
+    int threads = 10;
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(threads);
+    WaitingRunner runner = new WaitingRunner();
+    List<String> items = createManyItems();
+
+    new ChainedScheduler<>(executor, items.iterator(), runner);
+
+    for (int j = 1; j <= MANY_ITEMS_SIZE; j += threads) {
+      // If #threads items can start before any complete, it proves #threads are
+      // running in parallel and saturating all available threads.
+      for (int i = j; i < j + threads; i++) {
+        assertThat(runner.awaitStart(item(i), threads)).isEqualTo(true);
+      }
+      for (int i = j; i < j + threads; i++) {
+        assertThat(runner.increment(item(i))).isEqualTo(i);
+      }
+    }
+
+    assertThat(runner.awaitDone(threads)).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(items.size());
+    assertThat(missedAwaits.get()).isEqualTo(0);
+  }
+
+  @Test
+  public void makesProgressEvenWhenSaturatedByOtherTasks() throws Exception {
+    int blockSize = 5; // how many batches to queue at once
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(blockSize);
+    List<String> items = createManyItems();
+    WaitingRunner runner = new WaitingRunner();
+
+    int batchSize = 5; // how many tasks are started concurrently
+    Queue<CountDownLatch> batches = new LinkedList<>();
+    for (int b = 0; b < blockSize; b++) {
+      batches.add(executeWaitingRunnableBatch(batchSize, executor));
+    }
+
+    new ChainedScheduler<>(executor, items.iterator(), runner);
+
+    for (int i = 1; i <= items.size(); i++) {
+      for (int b = 0; b < blockSize; b++) {
+        // Ensure saturation by always having at least a full thread count of
+        // other tasks waiting in the queue after the waiting item so that when
+        // one batch is executed, and the item then executes, there will still
+        // be at least a full batch waiting.
+        batches.add(executeWaitingRunnableBatch(batchSize, executor));
+        batches.remove().countDown();
+      }
+      assertThat(runner.increment(item(i), batchSize)).isEqualTo(i); // Assert progress can be made
+    }
+    assertThat(runner.runCount()).isEqualTo(items.size());
+
+    while (batches.size() > 0) {
+      batches.remove().countDown();
+    }
+    assertThat(missedAwaits.get()).isEqualTo(0);
+  }
+
+  @Test
+  public void forwardingRunnerForwards() throws Exception {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    TestRunner runner = new TestRunner();
+    List<String> items = createManyItems();
+
+    new ChainedScheduler<>(
+        executor, items.iterator(), new ChainedScheduler.ForwardingRunner<>(runner));
+    assertThat(runner.awaitDone(items.size())).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(items.size());
+  }
+
+  @Test
+  public void streamSchedulerClosesStream() throws Exception {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    WaitingRunner runner = new WaitingRunner();
+    List<String> items = new ArrayList<>();
+    items.add(FIRST);
+    items.add(SECOND);
+
+    final AtomicBoolean closed = new AtomicBoolean(false);
+    Object closeRecorder =
+        new Object() {
+          @SuppressWarnings("unused") // Called via reflection
+          public void close() {
+            closed.set(true);
+          }
+        };
+    @SuppressWarnings("unchecked") // Stream.class is converted to Stream<String>.class
+    Stream<String> stream = ForwardingProxy.create(Stream.class, items.stream(), closeRecorder);
+
+    new ChainedScheduler.StreamScheduler<>(executor, stream, runner);
+    assertThat(closed.get()).isEqualTo(false);
+
+    // Since there is only a single thread, the Stream cannot get closed before this runs
+    runner.increment(FIRST);
+    // The Stream should get closed as the last item (SECOND) runs, before its runner is called
+    runner.increment(SECOND); // Ensure the last item's runner has already been called
+    assertThat(runner.awaitDone(items.size())).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(items.size());
+    assertThat(closed.get()).isEqualTo(true);
+  }
+
+  protected CountDownLatch executeWaitingRunnableBatch(
+      int batchSize, ScheduledThreadPoolExecutor executor) {
+    CountDownLatch latch = new CountDownLatch(1);
+    for (int e = 0; e < batchSize; e++) {
+      executor.execute(new WaitingRunnable(latch));
+    }
+    return latch;
+  }
+
+  protected static List<String> createManyItems() {
+    return createItems(MANY_ITEMS_SIZE);
+  }
+
+  protected static List<String> createItems(int count) {
+    List<String> items = new ArrayList<>();
+    for (int i = 1; i <= count; i++) {
+      items.add(item(i));
+    }
+    return items;
+  }
+
+  protected static String item(int i) {
+    return "Item #" + i;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
new file mode 100644
index 0000000..a1f61fe
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
@@ -0,0 +1,81 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+/**
+ * A ForwardingProxy creates a Proxy which forwards all method calls to its delegate except for
+ * calls to methods which are implemented by its overrider's class.
+ *
+ * <p>Using this Proxy class makes it possible to use the delegate pattern on any interface without
+ * having to implement any of the interface's methods which directly forward their calls to the
+ * delegate. Using this is intended to make forwarding automated, easy, and less error prone by
+ * making it possible to implement the delegate pattern with an overrider object which only
+ * implements those methods which need overridden functionality and which will not directly forward
+ * their calls to the delegate.
+ *
+ * <p>The overrider object will be assumed to not implement any default java Object methods which
+ * are not overridden, as that would likely not be desirable behavior, and thus the Proxy will not
+ * forward those methods to the overrider unless the overrider overrides them.
+ *
+ * <p>If an overrider needs to make calls to the delegate, this can be achieved by passing the
+ * delegate into the overrider during construction.
+ */
+public class ForwardingProxy {
+  protected static class Handler<T> implements InvocationHandler {
+    protected T delegate;
+    protected Object overrider;
+
+    protected Handler(T delegate, Object overrider) {
+      this.delegate = delegate;
+      this.overrider = overrider;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+      Method overriden = getOverriden(method);
+      if (overriden != null) {
+        return overriden.invoke(overrider, args);
+      }
+      return method.invoke(delegate, args);
+    }
+
+    protected Method getOverriden(Method method) {
+      try {
+        Method implementedByOverrider =
+            overrider.getClass().getMethod(method.getName(), method.getParameterTypes());
+
+        // Only allow defined (non java defaulted) methods to actually be overridden
+        if (Object.class != implementedByOverrider.getDeclaringClass()) {
+          return implementedByOverrider;
+        }
+      } catch (NoSuchMethodException | SecurityException e) {
+      }
+      return null;
+    }
+  }
+
+  @SuppressWarnings("unchecked") // newProxyInstance returns Object
+  public static <T> T create(Class<T> toProxy, T delegate, Object overrider) {
+    return (T)
+        Proxy.newProxyInstance(
+            delegate.getClass().getClassLoader(),
+            new Class[] {toProxy},
+            new Handler<>(delegate, overrider));
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
index 3619add..54afa5f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -188,8 +188,9 @@
 
   private List<ReplicateRefUpdate> listWaitingTasks(String refRegex) {
     Pattern refmaskPattern = Pattern.compile(refRegex);
-    return tasksStorage.listWaiting().stream()
-        .filter(task -> refmaskPattern.matcher(task.ref).matches())
+    return tasksStorage
+        .streamWaiting()
+        .filter(task -> refmaskPattern.matcher(task.ref()).matches())
         .collect(toList());
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
index dd584ce..a65b257 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -36,6 +36,7 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.eclipse.jgit.transport.URIish;
 import org.junit.Test;
@@ -118,7 +119,7 @@
     reloadConfig();
 
     String changeRef = createChange().getPatchSet().refName();
-    changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
+    changeReplicationTasksForRemote(tasksStorage.streamWaiting(), changeRef, remote1)
         .forEach(
             (update) -> {
               try {
@@ -147,7 +148,7 @@
     reloadConfig();
 
     String changeRef = createChange().getPatchSet().refName();
-    changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
+    changeReplicationTasksForRemote(tasksStorage.streamWaiting(), changeRef, remote1)
         .forEach(
             (update) -> {
               try {
@@ -231,11 +232,14 @@
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false);
 
-    assertThat(tasksStorage.listWaiting()).hasSize(1);
-    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.listWaiting()) {
-      assertThat(task.uri).isEqualTo(expectedURI);
-      assertThat(task.ref).isEqualTo(PushOne.ALL_REFS);
-    }
+    assertThat(listWaiting()).hasSize(1);
+    tasksStorage
+        .streamWaiting()
+        .forEach(
+            (task) -> {
+              assertThat(task.uri()).isEqualTo(expectedURI);
+              assertThat(task.ref()).isEqualTo(PushOne.ALL_REFS);
+            });
   }
 
   @Test
@@ -253,11 +257,14 @@
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false);
 
-    assertThat(tasksStorage.listWaiting()).hasSize(1);
-    for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.listWaiting()) {
-      assertThat(task.uri).isEqualTo(expectedURI);
-      assertThat(task.ref).isEqualTo(PushOne.ALL_REFS);
-    }
+    assertThat(listWaiting()).hasSize(1);
+    tasksStorage
+        .streamWaiting()
+        .forEach(
+            (task) -> {
+              assertThat(task.uri()).isEqualTo(expectedURI);
+              assertThat(task.ref()).isEqualTo(PushOne.ALL_REFS);
+            });
   }
 
   @Test
@@ -296,13 +303,13 @@
     config.setInt("remote", "task_cleanup_project", "replicationRetry", 0);
     config.save();
     reloadConfig();
-    assertThat(tasksStorage.listRunning()).hasSize(0);
+    assertThat(listRunning()).hasSize(0);
     Project.NameKey sourceProject = createTestProject("task_cleanup_project");
 
     WaitUtil.waitUntil(
         () -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")),
         TEST_NEW_PROJECT_TIMEOUT);
-    WaitUtil.waitUntil(() -> tasksStorage.listRunning().size() == 0, TEST_TASK_FINISH_TIMEOUT);
+    WaitUtil.waitUntil(() -> listRunning().size() == 0, TEST_TASK_FINISH_TIMEOUT);
   }
 
   @Test
@@ -311,7 +318,7 @@
     config.setInt("remote", "task_cleanup_locks_project", "replicationRetry", 0);
     config.save();
     reloadConfig();
-    assertThat(tasksStorage.listRunning()).hasSize(0);
+    assertThat(listRunning()).hasSize(0);
     Project.NameKey sourceProject = createTestProject("task_cleanup_locks_project");
 
     WaitUtil.waitUntil(
@@ -369,22 +376,32 @@
 
   private Stream<ReplicateRefUpdate> waitingChangeReplicationTasksForRemote(
       String changeRef, String remote) {
-    return tasksStorage.listWaiting().stream()
-        .filter(task -> changeRef.equals(task.ref))
-        .filter(task -> remote.equals(task.remote));
+    return tasksStorage
+        .streamWaiting()
+        .filter(task -> changeRef.equals(task.ref()))
+        .filter(task -> remote.equals(task.remote()));
   }
 
   private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
       Stream<ReplicateRefUpdate> updates, String changeRef, String remote) {
     return updates
-        .filter(task -> changeRef.equals(task.ref))
-        .filter(task -> remote.equals(task.remote));
+        .filter(task -> changeRef.equals(task.ref()))
+        .filter(task -> remote.equals(task.remote()));
   }
 
   private List<ReplicateRefUpdate> listWaitingReplicationTasks(String refRegex) {
     Pattern refmaskPattern = Pattern.compile(refRegex);
-    return tasksStorage.listWaiting().stream()
-        .filter(task -> refmaskPattern.matcher(task.ref).matches())
+    return tasksStorage
+        .streamWaiting()
+        .filter(task -> refmaskPattern.matcher(task.ref()).matches())
         .collect(toList());
   }
+
+  private List<ReplicateRefUpdate> listWaiting() {
+    return tasksStorage.streamWaiting().collect(Collectors.toList());
+  }
+
+  private List<ReplicateRefUpdate> listRunning() {
+    return tasksStorage.streamRunning().collect(Collectors.toList());
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
index 42c0914..25752b4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
@@ -14,11 +14,12 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertContainsExactly;
 import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertNoIncompleteTasks;
+import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertThatStream;
 
 import com.google.common.jimfs.Configuration;
 import com.google.common.jimfs.Jimfs;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.net.URISyntaxException;
 import java.nio.file.FileSystem;
 import java.nio.file.Path;
@@ -33,8 +34,8 @@
   protected static final String REMOTE = "myDest";
   protected static final URIish URISH =
       ReplicationTasksStorageTest.getUrish("http://example.com/" + PROJECT + ".git");
-  protected static final ReplicationTasksStorage.ReplicateRefUpdate REF_UPDATE =
-      new ReplicationTasksStorage.ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE);
+  protected static final ReplicateRefUpdate REF_UPDATE =
+      ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
   protected static final UriUpdates URI_UPDATES = getUriUpdates(REF_UPDATE);
 
   protected ReplicationTasksStorage nodeA;
@@ -64,7 +65,7 @@
     nodeA.create(REF_UPDATE);
 
     nodeB.create(REF_UPDATE);
-    assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+    assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
   }
 
   @Test
@@ -72,7 +73,7 @@
     nodeA.create(REF_UPDATE);
 
     nodeB.start(URI_UPDATES);
-    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+    assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
 
     nodeB.finish(URI_UPDATES);
     assertNoIncompleteTasks(persistedView);
@@ -84,10 +85,10 @@
     nodeA.start(URI_UPDATES);
 
     nodeA.reset(URI_UPDATES);
-    assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+    assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
 
     nodeB.start(URI_UPDATES);
-    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+    assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
 
     nodeB.finish(URI_UPDATES);
     assertNoIncompleteTasks(persistedView);
@@ -101,10 +102,10 @@
     nodeB.start(URI_UPDATES);
 
     nodeB.reset(URI_UPDATES);
-    assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+    assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
 
     nodeB.start(URI_UPDATES);
-    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+    assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
 
     nodeB.finish(URI_UPDATES);
     assertNoIncompleteTasks(persistedView);
@@ -119,22 +120,22 @@
     nodeB.reset(URI_UPDATES);
 
     nodeA.start(URI_UPDATES);
-    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+    assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
 
     nodeA.finish(URI_UPDATES);
     assertNoIncompleteTasks(persistedView);
   }
 
   @Test
-  public void canBeResetAllAndCompletedByOtherNode() {
+  public void canBeRecoveredAndCompletedByOtherNode() {
     nodeA.create(REF_UPDATE);
     nodeA.start(URI_UPDATES);
 
-    nodeB.resetAll();
-    assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+    nodeB.recoverAll();
+    assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
 
     nodeB.start(URI_UPDATES);
-    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+    assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
 
     nodeA.finish(URI_UPDATES);
     // Bug: https://crbug.com/gerrit/12973
@@ -145,29 +146,29 @@
   }
 
   @Test
-  public void canBeResetAllAndCompletedByOtherNodeFastOriginalNode() {
+  public void canBeRecoveredAndCompletedByOtherNodeFastOriginalNode() {
     nodeA.create(REF_UPDATE);
     nodeA.start(URI_UPDATES);
-    nodeB.resetAll();
+    nodeB.recoverAll();
 
     nodeA.finish(URI_UPDATES);
-    assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+    assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
 
     nodeB.start(URI_UPDATES);
-    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+    assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
 
     nodeB.finish(URI_UPDATES);
     assertNoIncompleteTasks(persistedView);
   }
 
   @Test
-  public void canBeResetAllAndCompletedByOtherNodeSlowOriginalNode() {
+  public void canBeRecoveredAndCompletedByOtherNodeSlowOriginalNode() {
     nodeA.create(REF_UPDATE);
     nodeA.start(URI_UPDATES);
-    nodeB.resetAll();
+    nodeB.recoverAll();
 
     nodeB.start(URI_UPDATES);
-    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+    assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
 
     nodeB.finish(URI_UPDATES);
     ReplicationTasksStorageTest.assertNoIncompleteTasks(persistedView);
@@ -180,14 +181,14 @@
   public void multipleNodesCanReplicateSameRef() {
     nodeA.create(REF_UPDATE);
     nodeA.start(URI_UPDATES);
-    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+    assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
 
     nodeA.finish(URI_UPDATES);
     assertNoIncompleteTasks(persistedView);
 
     nodeB.create(REF_UPDATE);
     nodeB.start(URI_UPDATES);
-    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+    assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
 
     nodeB.finish(URI_UPDATES);
     assertNoIncompleteTasks(persistedView);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
index 23d6759..202cac9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
@@ -21,6 +21,7 @@
 
 import com.google.common.jimfs.Configuration;
 import com.google.common.jimfs.Jimfs;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.nio.file.FileSystem;
 import java.nio.file.Path;
 import org.eclipse.jgit.transport.URIish;
@@ -34,8 +35,8 @@
   protected static final String REMOTE = "myDest";
   protected static final URIish URISH =
       ReplicationTasksStorageTest.getUrish("http://example.com/" + PROJECT + ".git");
-  protected static final ReplicationTasksStorage.ReplicateRefUpdate REF_UPDATE =
-      new ReplicationTasksStorage.ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE);
+  protected static final ReplicateRefUpdate REF_UPDATE =
+      ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
 
   protected FileSystem fileSystem;
   protected Path storageSite;
@@ -131,11 +132,11 @@
   }
 
   @Test
-  public void canBeResetAllAndCompletedByOtherNode() {
+  public void canBeRecoveredAndCompletedByOtherNode() {
     taskA.create();
     taskA.start();
 
-    nodeB.resetAll();
+    nodeB.recoverAll();
     assertIsWaiting(taskA);
 
     taskB.create();
@@ -154,10 +155,10 @@
   }
 
   @Test
-  public void resetAllAndCompletedByOtherNodeWhenTaskAFinishesBeforeTaskB() {
+  public void recoveredAndCompletedByOtherNodeWhenTaskAFinishesBeforeTaskB() {
     taskA.create();
     taskA.start();
-    nodeB.resetAll();
+    nodeB.recoverAll();
 
     taskA.finish();
     assertIsWaiting(taskA);
@@ -173,10 +174,10 @@
   }
 
   @Test
-  public void resetAllAndCompletedByOtherNodeWhenTaskAFinishesAfterTaskB() {
+  public void recoveredAndCompletedByOtherNodeWhenTaskAFinishesAfterTaskB() {
     taskA.create();
     taskA.start();
-    nodeB.resetAll();
+    nodeB.recoverAll();
 
     taskB.start();
     assertIsRunning(taskA);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
index d9fbbe5..a2e5e4d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
@@ -36,7 +36,7 @@
   protected static final String REMOTE = "myDest";
   protected static final URIish URISH = getUrish("http://example.com/" + PROJECT + ".git");
   protected static final ReplicateRefUpdate REF_UPDATE =
-      new ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE);
+      ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
 
   protected ReplicationTasksStorage tasksStorage;
   protected FileSystem fileSystem;
@@ -200,8 +200,8 @@
 
   @Test
   public void canHaveTwoWaitingTasksForDifferentRefs() throws Exception {
-    Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE));
-    Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE));
+    Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE));
+    Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE));
     updateA.create();
     updateB.create();
     assertIsWaiting(updateA);
@@ -210,8 +210,8 @@
 
   @Test
   public void canHaveTwoRunningTasksForDifferentRefs() throws Exception {
-    Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE));
-    Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE));
+    Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE));
+    Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE));
     updateA.create();
     updateB.create();
     updateA.start();
@@ -225,12 +225,12 @@
     Task updateA =
         tasksStorage
         .new Task(
-            new ReplicateRefUpdate(
+            ReplicateRefUpdate.create(
                 "projectA", REF, getUrish("http://example.com/projectA.git"), REMOTE));
     Task updateB =
         tasksStorage
         .new Task(
-            new ReplicateRefUpdate(
+            ReplicateRefUpdate.create(
                 "projectB", REF, getUrish("http://example.com/projectB.git"), REMOTE));
     updateA.create();
     updateB.create();
@@ -243,12 +243,12 @@
     Task updateA =
         tasksStorage
         .new Task(
-            new ReplicateRefUpdate(
+            ReplicateRefUpdate.create(
                 "projectA", REF, getUrish("http://example.com/projectA.git"), REMOTE));
     Task updateB =
         tasksStorage
         .new Task(
-            new ReplicateRefUpdate(
+            ReplicateRefUpdate.create(
                 "projectB", REF, getUrish("http://example.com/projectB.git"), REMOTE));
     updateA.create();
     updateB.create();
@@ -260,8 +260,8 @@
 
   @Test
   public void canHaveTwoWaitingTasksForDifferentRemotes() throws Exception {
-    Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteA"));
-    Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteB"));
+    Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteA"));
+    Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteB"));
     updateA.create();
     updateB.create();
     assertIsWaiting(updateA);
@@ -270,8 +270,8 @@
 
   @Test
   public void canHaveTwoRunningTasksForDifferentRemotes() throws Exception {
-    Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteA"));
-    Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteB"));
+    Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteA"));
+    Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteB"));
     updateA.create();
     updateB.create();
     updateA.start();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
index 141f739..48e145b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -22,12 +22,13 @@
 
 import com.google.common.jimfs.Configuration;
 import com.google.common.jimfs.Jimfs;
+import com.google.common.truth.IterableSubject;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.net.URISyntaxException;
 import java.nio.file.FileSystem;
 import java.nio.file.Path;
-import java.util.List;
-import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.eclipse.jgit.transport.URIish;
 import org.junit.After;
 import org.junit.Before;
@@ -39,7 +40,7 @@
   protected static final String REMOTE = "myDest";
   protected static final URIish URISH = getUrish("http://example.com/" + PROJECT + ".git");
   protected static final ReplicateRefUpdate REF_UPDATE =
-      new ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE);
+      ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
 
   protected ReplicationTasksStorage storage;
   protected FileSystem fileSystem;
@@ -61,14 +62,14 @@
 
   @Test
   public void canListEmptyStorage() throws Exception {
-    assertThat(storage.listWaiting()).isEmpty();
-    assertThat(storage.listRunning()).isEmpty();
+    assertThatStream(storage.streamWaiting()).isEmpty();
+    assertThatStream(storage.streamRunning()).isEmpty();
   }
 
   @Test
   public void canListWaitingUpdate() throws Exception {
     storage.create(REF_UPDATE);
-    assertContainsExactly(storage.listWaiting(), REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
   }
 
   @Test
@@ -84,9 +85,9 @@
   public void canStartWaitingUpdate() throws Exception {
     storage.create(REF_UPDATE);
     storage.start(uriUpdates);
-    assertThat(storage.listWaiting()).isEmpty();
+    assertThatStream(storage.streamWaiting()).isEmpty();
     assertFalse(storage.isWaiting(uriUpdates));
-    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
   }
 
   @Test
@@ -101,22 +102,22 @@
   public void instancesOfTheSameStorageHaveTheSameElements() throws Exception {
     ReplicationTasksStorage persistedView = new ReplicationTasksStorage(storageSite);
 
-    assertThat(storage.listWaiting()).isEmpty();
-    assertThat(persistedView.listWaiting()).isEmpty();
+    assertThatStream(storage.streamWaiting()).isEmpty();
+    assertThatStream(persistedView.streamWaiting()).isEmpty();
 
     storage.create(REF_UPDATE);
-    assertContainsExactly(storage.listWaiting(), REF_UPDATE);
-    assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+    assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
 
     storage.start(uriUpdates);
-    assertThat(storage.listWaiting()).isEmpty();
-    assertThat(persistedView.listWaiting()).isEmpty();
-    assertContainsExactly(storage.listRunning(), REF_UPDATE);
-    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).isEmpty();
+    assertThatStream(persistedView.streamWaiting()).isEmpty();
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+    assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
 
     storage.finish(uriUpdates);
-    assertThat(storage.listRunning()).isEmpty();
-    assertThat(persistedView.listRunning()).isEmpty();
+    assertThatStream(storage.streamRunning()).isEmpty();
+    assertThatStream(persistedView.streamRunning()).isEmpty();
   }
 
   @Test
@@ -124,13 +125,13 @@
     String key = storage.create(REF_UPDATE);
     String secondKey = storage.create(REF_UPDATE);
     assertEquals(key, secondKey);
-    assertContainsExactly(storage.listWaiting(), REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
   }
 
   @Test
   public void canCreateDifferentUris() throws Exception {
     ReplicateRefUpdate updateB =
-        new ReplicateRefUpdate(
+        ReplicateRefUpdate.create(
             PROJECT,
             REF,
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
@@ -138,7 +139,7 @@
 
     String keyA = storage.create(REF_UPDATE);
     String keyB = storage.create(updateB);
-    assertThat(storage.listWaiting()).hasSize(2);
+    assertThatStream(storage.streamWaiting()).hasSize(2);
     assertTrue(storage.isWaiting(uriUpdates));
     assertTrue(storage.isWaiting(TestUriUpdates.create(updateB)));
     assertNotEquals(keyA, keyB);
@@ -147,7 +148,7 @@
   @Test
   public void canStartDifferentUris() throws Exception {
     ReplicateRefUpdate updateB =
-        new ReplicateRefUpdate(
+        ReplicateRefUpdate.create(
             PROJECT,
             REF,
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
@@ -157,18 +158,18 @@
     storage.create(updateB);
 
     storage.start(uriUpdates);
-    assertContainsExactly(storage.listWaiting(), updateB);
-    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).containsExactly(updateB);
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
 
     storage.start(uriUpdatesB);
-    assertThat(storage.listWaiting()).isEmpty();
-    assertContainsExactly(storage.listRunning(), REF_UPDATE, updateB);
+    assertThatStream(storage.streamWaiting()).isEmpty();
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE, updateB);
   }
 
   @Test
   public void canFinishDifferentUris() throws Exception {
     ReplicateRefUpdate updateB =
-        new ReplicateRefUpdate(
+        ReplicateRefUpdate.create(
             PROJECT,
             REF,
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
@@ -180,16 +181,16 @@
     storage.start(uriUpdatesB);
 
     storage.finish(uriUpdates);
-    assertContainsExactly(storage.listRunning(), updateB);
+    assertThatStream(storage.streamRunning()).containsExactly(updateB);
 
     storage.finish(uriUpdatesB);
-    assertThat(storage.listRunning()).isEmpty();
+    assertThatStream(storage.streamRunning()).isEmpty();
   }
 
   @Test
   public void differentUrisCreatedTwiceIsStoredOnce() throws Exception {
     ReplicateRefUpdate updateB =
-        new ReplicateRefUpdate(
+        ReplicateRefUpdate.create(
             PROJECT,
             REF,
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
@@ -199,19 +200,19 @@
     storage.create(updateB);
     storage.create(REF_UPDATE);
     storage.create(updateB);
-    assertThat(storage.listWaiting()).hasSize(2);
+    assertThatStream(storage.streamWaiting()).hasSize(2);
     assertTrue(storage.isWaiting(uriUpdates));
     assertTrue(storage.isWaiting(TestUriUpdates.create(updateB)));
   }
 
   @Test
   public void canCreateMulipleRefsForSameUri() throws Exception {
-    ReplicateRefUpdate refA = new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE);
-    ReplicateRefUpdate refB = new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE);
+    ReplicateRefUpdate refA = ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE);
+    ReplicateRefUpdate refB = ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE);
 
     String keyA = storage.create(refA);
     String keyB = storage.create(refB);
-    assertThat(storage.listWaiting()).hasSize(2);
+    assertThatStream(storage.streamWaiting()).hasSize(2);
     assertNotEquals(keyA, keyB);
     assertTrue(storage.isWaiting(TestUriUpdates.create(refA)));
     assertTrue(storage.isWaiting(TestUriUpdates.create(refB)));
@@ -219,8 +220,8 @@
 
   @Test
   public void canFinishMulipleRefsForSameUri() throws Exception {
-    ReplicateRefUpdate refUpdateA = new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE);
-    ReplicateRefUpdate refUpdateB = new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE);
+    ReplicateRefUpdate refUpdateA = ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE);
+    ReplicateRefUpdate refUpdateB = ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE);
     UriUpdates uriUpdatesA = TestUriUpdates.create(refUpdateA);
     UriUpdates uriUpdatesB = TestUriUpdates.create(refUpdateB);
     storage.create(refUpdateA);
@@ -229,10 +230,10 @@
     storage.start(uriUpdatesB);
 
     storage.finish(uriUpdatesA);
-    assertContainsExactly(storage.listRunning(), refUpdateB);
+    assertThatStream(storage.streamRunning()).containsExactly(refUpdateB);
 
     storage.finish(uriUpdatesB);
-    assertThat(storage.listRunning()).isEmpty();
+    assertThatStream(storage.streamRunning()).isEmpty();
   }
 
   @Test
@@ -241,8 +242,8 @@
     storage.start(uriUpdates);
 
     storage.reset(uriUpdates);
-    assertContainsExactly(storage.listWaiting(), REF_UPDATE);
-    assertThat(storage.listRunning()).isEmpty();
+    assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamRunning()).isEmpty();
   }
 
   @Test
@@ -252,40 +253,40 @@
     storage.reset(uriUpdates);
 
     storage.start(uriUpdates);
-    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).isEmpty();
     assertFalse(storage.isWaiting(uriUpdates));
-    assertThat(storage.listWaiting()).isEmpty();
 
     storage.finish(uriUpdates);
     assertNoIncompleteTasks(storage);
   }
 
   @Test
-  public void canResetAllEmpty() throws Exception {
-    storage.resetAll();
+  public void canRecoverEmpty() throws Exception {
+    storage.recoverAll();
     assertNoIncompleteTasks(storage);
   }
 
   @Test
-  public void canResetAllUpdate() throws Exception {
+  public void canRecoverUpdate() throws Exception {
     storage.create(REF_UPDATE);
     storage.start(uriUpdates);
 
-    storage.resetAll();
-    assertContainsExactly(storage.listWaiting(), REF_UPDATE);
+    storage.recoverAll();
+    assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamRunning()).isEmpty();
     assertTrue(storage.isWaiting(uriUpdates));
-    assertThat(storage.listRunning()).isEmpty();
   }
 
   @Test
-  public void canCompleteResetAllUpdate() throws Exception {
+  public void canCompleteRecoveredUpdate() throws Exception {
     storage.create(REF_UPDATE);
     storage.start(uriUpdates);
-    storage.resetAll();
+    storage.recoverAll();
 
     storage.start(uriUpdates);
-    assertContainsExactly(storage.listRunning(), REF_UPDATE);
-    assertThat(storage.listWaiting()).isEmpty();
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).isEmpty();
     assertFalse(storage.isWaiting(uriUpdates));
 
     storage.finish(uriUpdates);
@@ -293,9 +294,9 @@
   }
 
   @Test
-  public void canResetAllMultipleUpdates() throws Exception {
+  public void canRecoverMultipleUpdates() throws Exception {
     ReplicateRefUpdate updateB =
-        new ReplicateRefUpdate(
+        ReplicateRefUpdate.create(
             PROJECT,
             REF,
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
@@ -306,14 +307,14 @@
     storage.start(uriUpdates);
     storage.start(uriUpdatesB);
 
-    storage.resetAll();
-    assertContainsExactly(storage.listWaiting(), REF_UPDATE, updateB);
+    storage.recoverAll();
+    assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE, updateB);
   }
 
   @Test
-  public void canCompleteMultipleResetAllUpdates() throws Exception {
+  public void canCompleteMultipleRecoveredUpdates() throws Exception {
     ReplicateRefUpdate updateB =
-        new ReplicateRefUpdate(
+        ReplicateRefUpdate.create(
             PROJECT,
             REF,
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
@@ -323,15 +324,15 @@
     storage.create(updateB);
     storage.start(uriUpdates);
     storage.start(uriUpdatesB);
-    storage.resetAll();
+    storage.recoverAll();
 
     storage.start(uriUpdates);
-    assertContainsExactly(storage.listRunning(), REF_UPDATE);
-    assertContainsExactly(storage.listWaiting(), updateB);
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).containsExactly(updateB);
 
     storage.start(uriUpdatesB);
-    assertContainsExactly(storage.listRunning(), REF_UPDATE, updateB);
-    assertThat(storage.listWaiting()).isEmpty();
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE, updateB);
+    assertThatStream(storage.streamWaiting()).isEmpty();
 
     storage.finish(uriUpdates);
     storage.finish(uriUpdatesB);
@@ -355,7 +356,7 @@
   @Test(expected = Test.None.class /* no exception expected */)
   public void illegalDoubleFinishDifferentUriIsGraceful() throws Exception {
     ReplicateRefUpdate updateB =
-        new ReplicateRefUpdate(
+        ReplicateRefUpdate.create(
             PROJECT,
             REF,
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
@@ -370,30 +371,16 @@
 
     storage.finish(uriUpdates);
     storage.finish(uriUpdatesB);
-    assertThat(storage.listRunning()).isEmpty();
+    assertThatStream(storage.streamRunning()).isEmpty();
   }
 
   protected static void assertNoIncompleteTasks(ReplicationTasksStorage storage) {
-    assertThat(storage.listWaiting()).isEmpty();
-    assertThat(storage.listRunning()).isEmpty();
+    assertThatStream(storage.streamWaiting()).isEmpty();
+    assertThatStream(storage.streamRunning()).isEmpty();
   }
 
-  protected static void assertContainsExactly(
-      List<ReplicateRefUpdate> all, ReplicateRefUpdate... refUpdates) {
-    assertThat(all).hasSize(refUpdates.length);
-    for (int i = 0; i < refUpdates.length; i++) {
-      assertTrue(equals(all.get(i), refUpdates[i]));
-    }
-  }
-
-  public static boolean equals(ReplicateRefUpdate one, ReplicateRefUpdate two) {
-    return (one == null && two == null)
-        || (one != null
-            && two != null
-            && Objects.equals(one.project, two.project)
-            && Objects.equals(one.ref, two.ref)
-            && Objects.equals(one.remote, two.remote)
-            && Objects.equals(one.uri, two.uri));
+  protected static IterableSubject assertThatStream(Stream<?> stream) {
+    return assertThat(stream.collect(Collectors.toList()));
   }
 
   public static URIish getUrish(String uri) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
index 2fd3ee3..f61114e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
@@ -26,10 +26,10 @@
 public abstract class TestUriUpdates implements UriUpdates {
   public static TestUriUpdates create(ReplicateRefUpdate update) throws URISyntaxException {
     return create(
-        Project.nameKey(update.project),
-        new URIish(update.uri),
-        update.remote,
-        Collections.singleton(update.ref));
+        Project.nameKey(update.project()),
+        new URIish(update.uri()),
+        update.remote(),
+        Collections.singleton(update.ref()));
   }
 
   public static TestUriUpdates create(