No longer use an extra thread to schedule pending events

Move processing of pending events to the defaultQueue in a non invasive
way so that this processing should not interfere with other important
tasks in the queue for longer than it takes to process a single entry.
This is achieved by using a new ChainedScheduler class which breaks the
workload up into small pieces with their own task each, and only places
a new task in the queue each time a task begins execution. This creates
a natural load based throttling which can execute fast when there is no
load in the executor, but mostly gets out of the way when load is high.

This move also avoids delaying plugin startup until all the pending
events are read from disk. This allows new replication events being
created on the server to be scheduled before all pending events have
been scheduled which results in a slight priority boost for new events
over pending events.

Change-Id: Icddf88b85dec9078bb4a7923b9211614989978ff
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
new file mode 100644
index 0000000..ff3fdf3
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
@@ -0,0 +1,179 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.flogger.FluentLogger;
+import java.util.Iterator;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Stream;
+
+/**
+ * Non-greedily schedules consecutive tasks in a executor, one for each item returned by an {@link
+ * Iterator}.
+ *
+ * <p>This scheduler is useful when an {@link Iterator} might provide a large amount of items to be
+ * worked on in a non prioritized fashion. This scheduler will scavenge any unused threads in its
+ * executor to do work, however only one item will ever be outstanding and waiting at a time. This
+ * scheduling policy allows large collections to be processed without interfering much with higher
+ * prioritized tasks while still making regular progress on the items provided by the {@link
+ * Iterator}. To keep the level of interference to a minimum, ensure that the amount of work needed
+ * for each item is small and short.
+ */
+public class ChainedScheduler<T> {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  /**
+   * Override to implement a common thread pool task for all the items returned by the {@link
+   * Iterator}
+   */
+  public interface Runner<T> {
+    /** Will get executed in the thread pool task for each item */
+    default void run(T item) {}
+
+    /** Will get called after the last item completes */
+    default void onDone() {}
+
+    /** Will get called to display {@link Runnable} for item in show-queue output */
+    default String toString(T item) {
+      return "Chained " + item.toString();
+    }
+  }
+
+  /** Override to decorate an existing {@link Runner} */
+  public static class ForwardingRunner<T> implements Runner<T> {
+    protected Runner<T> delegateRunner;
+
+    public ForwardingRunner(Runner<T> delegate) {
+      delegateRunner = delegate;
+    }
+
+    @Override
+    public void run(T item) {
+      delegateRunner.run(item);
+    }
+
+    @Override
+    public void onDone() {
+      delegateRunner.onDone();
+    }
+
+    @Override
+    public String toString(T item) {
+      return delegateRunner.toString(item);
+    }
+  }
+
+  /**
+   * Use when a {@link Stream} is needed instead of an {@link Iterator}, it will close the {@link
+   * Stream}
+   */
+  public static class StreamScheduler<T> extends ChainedScheduler<T> {
+    public StreamScheduler(
+        ScheduledExecutorService threadPool, final Stream<T> stream, Runner<T> runner) {
+      super(
+          threadPool,
+          stream.iterator(),
+          new ForwardingRunner<T>(runner) {
+            @Override
+            public void onDone() {
+              stream.close();
+              super.onDone();
+            }
+          });
+    }
+  }
+
+  /** Internal {@link Runnable} containing one item to run and which schedules the next one. */
+  protected class Chainer implements Runnable {
+    protected T item;
+
+    public Chainer(T item) {
+      this.item = item;
+    }
+
+    @Override
+    public void run() {
+      boolean scheduledNext = scheduleNext();
+      try {
+        runner.run(item);
+      } catch (RuntimeException e) { // catch to prevent chain from breaking
+        logger.atSevere().withCause(e).log("Error while running: " + item);
+      }
+      if (!scheduledNext) {
+        runner.onDone();
+      }
+    }
+
+    @Override
+    public String toString() {
+      return runner.toString(item);
+    }
+  }
+
+  protected final ScheduledExecutorService threadPool;
+  protected final Iterator<T> iterator;
+  protected final Runner<T> runner;
+
+  /**
+   * Note: The {@link Iterator} passed in will only ever be accessed from one thread at a time, and
+   * the internal state of the {@link Iterator} will be updated after each next() call before
+   * operating on the iterator again.
+   */
+  public ChainedScheduler(
+      ScheduledExecutorService threadPool, Iterator<T> iterator, Runner<T> runner) {
+    this.threadPool = threadPool;
+    this.iterator = iterator;
+    this.runner = runner;
+
+    if (!scheduleNext()) {
+      runner.onDone();
+    }
+  }
+
+  /**
+   * Concurrency note:
+   *
+   * <p>Since there is only one chain of tasks and each task submits the next task to the executor,
+   * the calls from here to the iterator.next() call will never be executed concurrently by more
+   * than one thread.
+   *
+   * <p>Data synchronization note:
+   *
+   * <p>This section in the [1] javadoc: "Actions in a thread prior to the submission of a Runnable
+   * to an Executor happen-before its execution begins..." guarantees that since the iterator.next()
+   * happens before the schedule(), and the new tasks call to hasNext() happen after the submission,
+   * that the hasNext() call will see the results of the previous next() call even though it may
+   * have happened on a different thread.
+   *
+   * <p>[1]
+   * <li>https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html
+   *     (section "Memory Consistency Properties")
+   * <li>The methods of all classes in java.util.concurrent and its subpackages extends the
+   *     guarantees of the java memory model to higher-level synchronization.
+   * <li>In particular this guarantee of the java.util.concurrent applies here:
+   */
+  protected boolean scheduleNext() {
+    if (!iterator.hasNext()) {
+      return false;
+    }
+
+    schedule(new Chainer(iterator.next()));
+    return true;
+  }
+
+  protected void schedule(Runnable r) {
+    threadPool.execute(r);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 90d159d..e277cb0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -88,9 +88,7 @@
       destinations.get().startup(workQueue);
       running = true;
       replicationTasksStorage.resetAll();
-      Thread t = new Thread(this::firePendingEvents, "firePendingEvents");
-      t.setDaemon(true);
-      t.start();
+      firePendingEvents();
       fireBeforeStartupEvents();
       distributor = new Distributor(workQueue);
     }
@@ -200,24 +198,32 @@
 
   private void firePendingEvents() {
     replaying = true;
-    try {
-      replaying = true;
-      replicationTasksStorage
-          .streamWaiting()
-          .forEach(
-              t -> {
-                try {
-                  fire(new URIish(t.uri()), Project.nameKey(t.project()), t.ref());
-                } catch (URISyntaxException e) {
-                  repLog.atSevere().withCause(e).log(
-                      "Encountered malformed URI for persisted event %s", t);
-                }
-              });
-    } catch (Throwable e) {
-      repLog.atSevere().withCause(e).log("Unexpected error while firing pending events");
-    } finally {
-      replaying = false;
-    }
+    new ChainedScheduler.StreamScheduler<ReplicationTasksStorage.ReplicateRefUpdate>(
+        workQueue.getDefaultQueue(),
+        replicationTasksStorage.streamWaiting(),
+        new ChainedScheduler.Runner<ReplicationTasksStorage.ReplicateRefUpdate>() {
+          @Override
+          public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
+            try {
+              fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref());
+            } catch (URISyntaxException e) {
+              repLog.atSevere().withCause(e).log(
+                  "Encountered malformed URI for persisted event %s", u);
+            } catch (Throwable e) {
+              repLog.atSevere().withCause(e).log("Unexpected error while firing pending events");
+            }
+          }
+
+          @Override
+          public void onDone() {
+            replaying = false;
+          }
+
+          @Override
+          public String toString(ReplicationTasksStorage.ReplicateRefUpdate u) {
+            return "Scheduling push to " + String.format("%s:%s", u.project(), u.ref());
+          }
+        });
   }
 
   private void pruneCompleted() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
new file mode 100644
index 0000000..1ad8c4d
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
@@ -0,0 +1,468 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import com.google.common.collect.ForwardingIterator;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ChainedSchedulerTest {
+  /** A simple {@link Runnable} that waits until the start() method is called. */
+  public class WaitingRunnable implements Runnable {
+    protected final CountDownLatch start;
+
+    public WaitingRunnable() {
+      this(new CountDownLatch(1));
+    }
+
+    public WaitingRunnable(CountDownLatch latch) {
+      this.start = latch;
+    }
+
+    @Override
+    public void run() {
+      try {
+        start.await();
+      } catch (InterruptedException e) {
+        missedAwaits.incrementAndGet();
+      }
+    }
+
+    public void start() {
+      start.countDown();
+    }
+  }
+
+  /** A simple {@link Runnable} that can be awaited to start to run. */
+  public static class WaitableRunnable implements Runnable {
+    protected final CountDownLatch started = new CountDownLatch(1);
+
+    @Override
+    public void run() {
+      started.countDown();
+    }
+
+    public boolean isStarted() {
+      return started.getCount() == 0;
+    }
+
+    public boolean awaitStart(int itemsBefore) {
+      try {
+        return started.await(SECONDS_SYNCHRONIZE * itemsBefore, SECONDS);
+      } catch (InterruptedException e) {
+        return false;
+      }
+    }
+  }
+
+  /** An {@link Iterator} wrapper which keeps track of how many times next() has been called. */
+  public static class CountingIterator extends ForwardingIterator<String> {
+    public volatile int count = 0;
+
+    protected Iterator<String> delegate;
+
+    public CountingIterator(Iterator<String> delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public synchronized String next() {
+      count++;
+      return super.next();
+    }
+
+    @Override
+    protected Iterator<String> delegate() {
+      return delegate;
+    }
+  }
+
+  /** A {@link ChainedScheduler.Runner} which keeps track of completion and counts. */
+  public static class TestRunner implements ChainedScheduler.Runner<String> {
+    protected final AtomicInteger runCount = new AtomicInteger(0);
+    protected final CountDownLatch onDone = new CountDownLatch(1);
+
+    @Override
+    public void run(String item) {
+      incrementAndGet();
+    }
+
+    public int runCount() {
+      return runCount.get();
+    }
+
+    @Override
+    public void onDone() {
+      onDone.countDown();
+    }
+
+    public boolean isDone() {
+      return onDone.getCount() <= 0;
+    }
+
+    public boolean awaitDone(int items) {
+      try {
+        return onDone.await(items * SECONDS_SYNCHRONIZE, SECONDS);
+      } catch (InterruptedException e) {
+        return false;
+      }
+    }
+
+    protected int incrementAndGet() {
+      return runCount.incrementAndGet();
+    }
+  }
+
+  /**
+   * A {@link TestRunner} that can be awaited to start to run and additionally will wait until
+   * increment() or runOnewRandomStarted() is called to complete.
+   */
+  public class WaitingRunner extends TestRunner {
+    protected class RunContext extends WaitableRunnable {
+      CountDownLatch run = new CountDownLatch(1);
+      CountDownLatch ran = new CountDownLatch(1);
+      int count;
+
+      @Override
+      public void run() {
+        super.run();
+        try {
+          run.await();
+          count = incrementAndGet();
+          ran.countDown();
+        } catch (InterruptedException e) {
+          missedAwaits.incrementAndGet();
+        }
+      }
+
+      public synchronized boolean startIfNotRunning() throws InterruptedException {
+        if (run.getCount() > 0) {
+          increment();
+          return true;
+        }
+        return false;
+      }
+
+      public synchronized int increment() throws InterruptedException {
+        run.countDown();
+        ran.await(); // no timeout needed as RunContext.run() calls countDown unless interrupted
+        return count;
+      }
+    }
+
+    protected final Map<String, RunContext> ctxByItem = new ConcurrentHashMap<>();
+
+    @Override
+    public void run(String item) {
+      context(item).run();
+    }
+
+    public void runOneRandomStarted() throws InterruptedException {
+      while (true) {
+        for (RunContext ctx : ctxByItem.values()) {
+          if (ctx.isStarted()) {
+            if (ctx.startIfNotRunning()) {
+              return;
+            }
+          }
+        }
+        MILLISECONDS.sleep(1);
+      }
+    }
+
+    public boolean awaitStart(String item, int itemsBefore) {
+      return context(item).awaitStart(itemsBefore);
+    }
+
+    public int increment(String item) throws InterruptedException {
+      return increment(item, 1);
+    }
+
+    public int increment(String item, int itemsBefore) throws InterruptedException {
+      awaitStart(item, itemsBefore);
+      return context(item).increment();
+    }
+
+    protected RunContext context(String item) {
+      return ctxByItem.computeIfAbsent(item, k -> new RunContext());
+    }
+  }
+
+  // Time for one synchronization event such as await(), or variable
+  // incrementing across threads to take
+  public static final int SECONDS_SYNCHRONIZE = 3;
+  public static final int MANY_ITEMS_SIZE = 1000;
+
+  public static String FIRST = item(1);
+  public static String SECOND = item(2);
+  public static String THIRD = item(3);
+
+  public final AtomicInteger missedAwaits = new AtomicInteger(0); // non-zero signals an error
+
+  @Before
+  public void setup() {
+    missedAwaits.set(0);
+  }
+
+  @Test
+  public void emptyCompletesImmediately() {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    TestRunner runner = new TestRunner();
+    List<String> items = new ArrayList<>();
+
+    new ChainedScheduler(executor, items.iterator(), runner);
+    assertThat(runner.awaitDone(1)).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void oneItemCompletes() throws Exception {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    TestRunner runner = new TestRunner();
+    List<String> items = new ArrayList<>();
+    items.add(FIRST);
+
+    new ChainedScheduler(executor, items.iterator(), runner);
+    assertThat(runner.awaitDone(1)).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(1);
+  }
+
+  @Test
+  public void manyItemsAllComplete() throws Exception {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    TestRunner runner = new TestRunner();
+    List<String> items = createManyItems();
+
+    new ChainedScheduler(executor, items.iterator(), runner);
+    assertThat(runner.awaitDone(items.size())).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(items.size());
+  }
+
+  @Test
+  public void exceptionInTaskDoesNotAbortIteration() throws Exception {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    TestRunner runner =
+        new TestRunner() {
+          @Override
+          public void run(String item) {
+            super.run(item);
+            throw new RuntimeException();
+          }
+        };
+    List<String> items = new ArrayList<>();
+    items.add(FIRST);
+    items.add(SECOND);
+    items.add(THIRD);
+
+    new ChainedScheduler(executor, items.iterator(), runner);
+    assertThat(runner.awaitDone(items.size())).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(items.size());
+  }
+
+  @Test
+  public void onDoneNotCalledBeforeAllCompleted() throws Exception {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    WaitingRunner runner = new WaitingRunner();
+    List<String> items = createManyItems();
+
+    new ChainedScheduler(executor, items.iterator(), runner);
+    for (int i = 1; i <= items.size(); i++) {
+      assertThat(runner.isDone()).isEqualTo(false);
+      runner.runOneRandomStarted();
+    }
+
+    assertThat(runner.awaitDone(items.size())).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(items.size());
+    assertThat(missedAwaits.get()).isEqualTo(0);
+  }
+
+  @Test
+  public void otherTasksOnlyEverWaitForAtMostOneRunningPlusOneWaiting() throws Exception {
+    for (int threads = 1; threads <= 10; threads++) {
+      ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+      WaitingRunner runner = new WaitingRunner();
+      List<String> items = createItems(threads + 1 /* Running */ + 1 /* Waiting */);
+      CountingIterator it = new CountingIterator(items.iterator());
+
+      new ChainedScheduler(executor, it, runner);
+      assertThat(runner.awaitStart(FIRST, 1)).isEqualTo(true); // Confirms at least one Running
+      assertThat(it.count).isGreaterThan(1); // Confirms at least one extra Waiting or Running
+      assertThat(it.count).isLessThan(items.size()); // Confirms at least one still not queued
+
+      WaitableRunnable external = new WaitableRunnable();
+      executor.execute(external);
+      assertThat(external.isStarted()).isEqualTo(false);
+
+      // Completes 2, (at most one Running + 1 Waiting)
+      assertThat(runner.increment(FIRST)).isEqualTo(1); // Was Running
+      assertThat(runner.increment(SECOND)).isEqualTo(2); // Was Waiting
+      // Asserts that the one that still needed to be queued is not blocking this external task
+      assertThat(external.awaitStart(1)).isEqualTo(true);
+
+      for (int i = 3; i <= items.size(); i++) {
+        runner.increment(item(i));
+      }
+      assertThat(missedAwaits.get()).isEqualTo(0);
+    }
+  }
+
+  @Test
+  public void saturatesManyFreeThreads() throws Exception {
+    int threads = 10;
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(threads);
+    WaitingRunner runner = new WaitingRunner();
+    List<String> items = createManyItems();
+
+    new ChainedScheduler(executor, items.iterator(), runner);
+
+    for (int j = 1; j <= MANY_ITEMS_SIZE; j += threads) {
+      // If #threads items can start before any complete, it proves #threads are
+      // running in parallel and saturating all available threads.
+      for (int i = j; i < j + threads; i++) {
+        assertThat(runner.awaitStart(item(i), threads)).isEqualTo(true);
+      }
+      for (int i = j; i < j + threads; i++) {
+        assertThat(runner.increment(item(i))).isEqualTo(i);
+      }
+    }
+
+    assertThat(runner.awaitDone(threads)).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(items.size());
+    assertThat(missedAwaits.get()).isEqualTo(0);
+  }
+
+  @Test
+  public void makesProgressEvenWhenSaturatedByOtherTasks() throws Exception {
+    int blockSize = 5; // how many batches to queue at once
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(blockSize);
+    List<String> items = createManyItems();
+    WaitingRunner runner = new WaitingRunner();
+
+    int batchSize = 5; // how many tasks are started concurrently
+    Queue<CountDownLatch> batches = new LinkedList<>();
+    for (int b = 0; b < blockSize; b++) {
+      batches.add(executeWaitingRunnableBatch(batchSize, executor));
+    }
+
+    new ChainedScheduler(executor, items.iterator(), runner);
+
+    for (int i = 1; i <= items.size(); i++) {
+      for (int b = 0; b < blockSize; b++) {
+        // Ensure saturation by always having at least a full thread count of
+        // other tasks waiting in the queue after the waiting item so that when
+        // one batch is executed, and the item then executes, there will still
+        // be at least a full batch waiting.
+        batches.add(executeWaitingRunnableBatch(batchSize, executor));
+        batches.remove().countDown();
+      }
+      assertThat(runner.increment(item(i), batchSize)).isEqualTo(i); // Assert progress can be made
+    }
+    assertThat(runner.runCount()).isEqualTo(items.size());
+
+    while (batches.size() > 0) {
+      batches.remove().countDown();
+    }
+    assertThat(missedAwaits.get()).isEqualTo(0);
+  }
+
+  @Test
+  public void forwardingRunnerForwards() throws Exception {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    TestRunner runner = new TestRunner();
+    List<String> items = createManyItems();
+
+    new ChainedScheduler(executor, items.iterator(), new ChainedScheduler.ForwardingRunner(runner));
+    assertThat(runner.awaitDone(items.size())).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(items.size());
+  }
+
+  @Test
+  public void streamSchedulerClosesStream() throws Exception {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    WaitingRunner runner = new WaitingRunner();
+    List<String> items = new ArrayList<>();
+    items.add(FIRST);
+    items.add(SECOND);
+
+    final AtomicBoolean closed = new AtomicBoolean(false);
+    Object closeRecorder =
+        new Object() {
+          public void close() {
+            closed.set(true);
+          }
+        };
+    Stream<String> stream =
+        ForwardingProxy.create(
+            Stream.class,
+            items.stream(),
+            new Object() {
+              public void close() {
+                closed.set(true);
+              }
+            });
+
+    new ChainedScheduler.StreamScheduler(executor, stream, runner);
+    assertThat(closed.get()).isEqualTo(false);
+
+    // Since there is only a single thread, the Stream cannot get closed before this runs
+    runner.increment(FIRST);
+    // The Stream should get closed as the last item (SECOND) runs, before its runner is called
+    runner.increment(SECOND); // Ensure the last item's runner has already been called
+    assertThat(runner.awaitDone(items.size())).isEqualTo(true);
+    assertThat(runner.runCount()).isEqualTo(items.size());
+    assertThat(closed.get()).isEqualTo(true);
+  }
+
+  protected CountDownLatch executeWaitingRunnableBatch(
+      int batchSize, ScheduledThreadPoolExecutor executor) {
+    CountDownLatch latch = new CountDownLatch(1);
+    for (int e = 0; e < batchSize; e++) {
+      executor.execute(new WaitingRunnable(latch));
+    }
+    return latch;
+  }
+
+  protected static List<String> createManyItems() {
+    return createItems(MANY_ITEMS_SIZE);
+  }
+
+  protected static List<String> createItems(int count) {
+    List<String> items = new ArrayList<>();
+    for (int i = 1; i <= count; i++) {
+      items.add(item(i));
+    }
+    return items;
+  }
+
+  protected static String item(int i) {
+    return "Item #" + i;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
new file mode 100644
index 0000000..dbd538e
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
@@ -0,0 +1,80 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+/**
+ * A ForwardingProxy creates a Proxy which forwards all method calls to its delegate except for
+ * calls to methods which are implemented by its overrider's class.
+ *
+ * <p>Using this Proxy class makes it possible to use the delegate pattern on any interface without
+ * having to implement any of the interface's methods which directly forward their calls to the
+ * delegate. Using this is intended to make forwarding automated, easy, and less error prone by
+ * making it possible to implement the delegate pattern with an overrider object which only
+ * implements those methods which need overridden functionality and which will not directly forward
+ * their calls to the delegate.
+ *
+ * <p>The overrider object will be assumed to not implement any default java Object methods which
+ * are not overridden, as that would likely not be desirable behavior, and thus the Proxy will not
+ * forward those methods to the overrider unless the overrider overrides them.
+ *
+ * <p>If an overrider needs to make calls to the delegate, this can be achieved by passing the
+ * delegate into the overrider during construction.
+ */
+public class ForwardingProxy {
+  protected static class Handler<T> implements InvocationHandler {
+    protected T delegate;
+    protected Object overrider;
+
+    protected Handler(T delegate, Object overrider) {
+      this.delegate = delegate;
+      this.overrider = overrider;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+      Method overriden = getOverriden(method);
+      if (overriden != null) {
+        return overriden.invoke(overrider, args);
+      }
+      return method.invoke(delegate, args);
+    }
+
+    protected Method getOverriden(Method method) {
+      try {
+        Method implementedByOverrider =
+            overrider.getClass().getMethod(method.getName(), method.getParameterTypes());
+
+        // Only allow defined (non java defaulted) methods to actually be overridden
+        if (Object.class != implementedByOverrider.getDeclaringClass()) {
+          return implementedByOverrider;
+        }
+      } catch (NoSuchMethodException | SecurityException e) {
+      }
+      return null;
+    }
+  }
+
+  public static <T> T create(Class<T> toProxy, T delegate, Object overrider) {
+    return (T)
+        Proxy.newProxyInstance(
+            delegate.getClass().getClassLoader(),
+            new Class[] {toProxy},
+            new Handler(delegate, overrider));
+  }
+}