Merge "ReplicationTaskStorage: Use Stream API instead of List"
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index cd4a3fb..90d159d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -202,13 +202,17 @@
     replaying = true;
     try {
       replaying = true;
-      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
-        try {
-          fire(new URIish(t.uri()), Project.nameKey(t.project()), t.ref());
-        } catch (URISyntaxException e) {
-          repLog.atSevere().withCause(e).log("Encountered malformed URI for persisted event %s", t);
-        }
-      }
+      replicationTasksStorage
+          .streamWaiting()
+          .forEach(
+              t -> {
+                try {
+                  fire(new URIish(t.uri()), Project.nameKey(t.project()), t.ref());
+                } catch (URISyntaxException e) {
+                  repLog.atSevere().withCause(e).log(
+                      "Encountered malformed URI for persisted event %s", t);
+                }
+              });
     } catch (Throwable e) {
       repLog.atSevere().withCause(e).log("Unexpected error while firing pending events");
     } finally {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 68661cf..c67bb49 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -32,9 +32,7 @@
 import java.nio.file.NotDirectoryException;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
-import java.util.List;
 import java.util.Optional;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.transport.URIish;
@@ -147,9 +145,7 @@
   }
 
   public synchronized void resetAll() {
-    for (ReplicateRefUpdate r : list(createDir(runningUpdates))) {
-      new Task(r).reset();
-    }
+    streamRunning().forEach(r -> new Task(r).reset());
   }
 
   public boolean isWaiting(UriUpdates uriUpdates) {
@@ -164,16 +160,12 @@
     }
   }
 
-  public List<ReplicateRefUpdate> listWaiting() {
-    return list(createDir(waitingUpdates));
+  public Stream<ReplicateRefUpdate> streamWaiting() {
+    return streamRecursive(createDir(waitingUpdates));
   }
 
-  public List<ReplicateRefUpdate> listRunning() {
-    return list(createDir(runningUpdates));
-  }
-
-  private List<ReplicateRefUpdate> list(Path taskDir) {
-    return streamRecursive(taskDir).collect(Collectors.toList());
+  public Stream<ReplicateRefUpdate> streamRunning() {
+    return streamRecursive(createDir(runningUpdates));
   }
 
   private Stream<ReplicateRefUpdate> streamRecursive(Path dir) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
index 7e336d1..7f8282a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -247,7 +247,7 @@
   private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) {
     Pattern refmaskPattern = Pattern.compile(refRegex);
     synchronized (tasksStorage) {
-      return Stream.concat(tasksStorage.listWaiting().stream(), tasksStorage.listRunning().stream())
+      return Stream.concat(tasksStorage.streamWaiting(), tasksStorage.streamRunning())
           .filter(task -> refmaskPattern.matcher(task.ref()).matches())
           .collect(toList());
     }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 6878923..55c6a9c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -49,6 +49,7 @@
 import java.util.Optional;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.Ref;
@@ -447,11 +448,11 @@
     config.setInt("remote", "task_cleanup_project", "replicationRetry", 0);
     config.save();
     reloadConfig();
-    assertThat(tasksStorage.listRunning()).hasSize(0);
+    assertThat(listRunning()).hasSize(0);
     Project.NameKey sourceProject = createTestProject("task_cleanup_project");
 
     waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")));
-    waitUntil(() -> tasksStorage.listRunning().size() == 0);
+    waitUntil(() -> listRunning().size() == 0);
   }
 
   @Test
@@ -460,7 +461,7 @@
     config.setInt("remote", "task_cleanup_locks_project", "replicationRetry", 0);
     config.save();
     reloadConfig();
-    assertThat(tasksStorage.listRunning()).hasSize(0);
+    assertThat(listRunning()).hasSize(0);
     Project.NameKey sourceProject = createTestProject("task_cleanup_locks_project");
 
     waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")));
@@ -486,7 +487,7 @@
             .findFirst()
             .get();
 
-    waitUntil(() -> tasksStorage.listRunning().size() == 0);
+    waitUntil(() -> listRunning().size() == 0);
 
     createTestProject(projectName);
 
@@ -519,7 +520,7 @@
 
     String changeRef = createChange().getPatchSet().refName();
 
-    changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
+    changeReplicationTasksForRemote(tasksStorage.streamWaiting(), changeRef, remote1)
         .forEach(
             (update) -> {
               try {
@@ -671,26 +672,33 @@
     return projectOperations.newProject().name(name).create();
   }
 
-  private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) {
-    Pattern refmaskPattern = Pattern.compile(refRegex);
-    return streamIncompleteTasks()
-        .filter(task -> refmaskPattern.matcher(task.ref()).matches())
-        .collect(toList());
-  }
-
-  private List<ReplicateRefUpdate> listIncompleteTasks() {
-    return streamIncompleteTasks().collect(toList());
+  public List<ReplicateRefUpdate> listRunning() {
+    return tasksStorage.streamRunning().collect(Collectors.toList());
   }
 
   @SuppressWarnings(
       "SynchronizeOnNonFinalField") // tasksStorage is non-final but only set in setUpTestPlugin()
-  private Stream<ReplicateRefUpdate> streamIncompleteTasks() {
+  private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) {
+    Pattern refmaskPattern = Pattern.compile(refRegex);
     synchronized (tasksStorage) {
-      return Stream.concat(
-          tasksStorage.listWaiting().stream(), tasksStorage.listRunning().stream());
+      return streamIncompleteTasks()
+          .filter(task -> refmaskPattern.matcher(task.ref()).matches())
+          .collect(toList());
     }
   }
 
+  @SuppressWarnings(
+      "SynchronizeOnNonFinalField") // tasksStorage is non-final but only set in setUpTestPlugin()
+  private List<ReplicateRefUpdate> listIncompleteTasks() {
+    synchronized (tasksStorage) {
+      return streamIncompleteTasks().collect(toList());
+    }
+  }
+
+  private Stream<ReplicateRefUpdate> streamIncompleteTasks() {
+    return Stream.concat(tasksStorage.streamWaiting(), tasksStorage.streamRunning());
+  }
+
   public void cleanupReplicationTasks() throws IOException {
     cleanupReplicationTasks(storagePath);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
index 948044b..4b09d52 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -20,10 +20,13 @@
 
 import com.google.common.jimfs.Configuration;
 import com.google.common.jimfs.Jimfs;
+import com.google.common.truth.IterableSubject;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.net.URISyntaxException;
 import java.nio.file.FileSystem;
 import java.nio.file.Path;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.eclipse.jgit.transport.URIish;
 import org.junit.After;
 import org.junit.Before;
@@ -57,22 +60,22 @@
 
   @Test
   public void canListEmptyStorage() throws Exception {
-    assertThat(storage.listWaiting()).isEmpty();
-    assertThat(storage.listRunning()).isEmpty();
+    assertThatStream(storage.streamWaiting()).isEmpty();
+    assertThatStream(storage.streamRunning()).isEmpty();
   }
 
   @Test
   public void canListWaitingUpdate() throws Exception {
     storage.create(REF_UPDATE);
-    assertThat(storage.listWaiting()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
   }
 
   @Test
   public void canStartWaitingUpdate() throws Exception {
     storage.create(REF_UPDATE);
     storage.start(uriUpdates);
-    assertThat(storage.listWaiting()).isEmpty();
-    assertThat(storage.listRunning()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).isEmpty();
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
   }
 
   @Test
@@ -87,22 +90,22 @@
   public void instancesOfTheSameStorageHaveTheSameElements() throws Exception {
     ReplicationTasksStorage persistedView = new ReplicationTasksStorage(storageSite);
 
-    assertThat(storage.listWaiting()).isEmpty();
-    assertThat(persistedView.listWaiting()).isEmpty();
+    assertThatStream(storage.streamWaiting()).isEmpty();
+    assertThatStream(persistedView.streamWaiting()).isEmpty();
 
     storage.create(REF_UPDATE);
-    assertThat(storage.listWaiting()).containsExactly(REF_UPDATE);
-    assertThat(persistedView.listWaiting()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+    assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
 
     storage.start(uriUpdates);
-    assertThat(storage.listWaiting()).isEmpty();
-    assertThat(persistedView.listWaiting()).isEmpty();
-    assertThat(storage.listRunning()).containsExactly(REF_UPDATE);
-    assertThat(persistedView.listRunning()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).isEmpty();
+    assertThatStream(persistedView.streamWaiting()).isEmpty();
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+    assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
 
     storage.finish(uriUpdates);
-    assertThat(storage.listRunning()).isEmpty();
-    assertThat(persistedView.listRunning()).isEmpty();
+    assertThatStream(storage.streamRunning()).isEmpty();
+    assertThatStream(persistedView.streamRunning()).isEmpty();
   }
 
   @Test
@@ -110,7 +113,7 @@
     String key = storage.create(REF_UPDATE);
     String secondKey = storage.create(REF_UPDATE);
     assertEquals(key, secondKey);
-    assertThat(storage.listWaiting()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
   }
 
   @Test
@@ -124,7 +127,7 @@
 
     String keyA = storage.create(REF_UPDATE);
     String keyB = storage.create(updateB);
-    assertThat(storage.listWaiting()).hasSize(2);
+    assertThatStream(storage.streamWaiting()).hasSize(2);
     assertNotEquals(keyA, keyB);
   }
 
@@ -141,12 +144,12 @@
     storage.create(updateB);
 
     storage.start(uriUpdates);
-    assertThat(storage.listWaiting()).containsExactly(updateB);
-    assertThat(storage.listRunning()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).containsExactly(updateB);
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
 
     storage.start(uriUpdatesB);
-    assertThat(storage.listWaiting()).isEmpty();
-    assertThat(storage.listRunning()).containsExactly(REF_UPDATE, updateB);
+    assertThatStream(storage.streamWaiting()).isEmpty();
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE, updateB);
   }
 
   @Test
@@ -164,10 +167,10 @@
     storage.start(uriUpdatesB);
 
     storage.finish(uriUpdates);
-    assertThat(storage.listRunning()).containsExactly(updateB);
+    assertThatStream(storage.streamRunning()).containsExactly(updateB);
 
     storage.finish(uriUpdatesB);
-    assertThat(storage.listRunning()).isEmpty();
+    assertThatStream(storage.streamRunning()).isEmpty();
   }
 
   @Test
@@ -183,7 +186,7 @@
     storage.create(updateB);
     storage.create(REF_UPDATE);
     storage.create(updateB);
-    assertThat(storage.listWaiting()).hasSize(2);
+    assertThatStream(storage.streamWaiting()).hasSize(2);
   }
 
   @Test
@@ -193,7 +196,7 @@
 
     String keyA = storage.create(refA);
     String keyB = storage.create(refB);
-    assertThat(storage.listWaiting()).hasSize(2);
+    assertThatStream(storage.streamWaiting()).hasSize(2);
     assertNotEquals(keyA, keyB);
   }
 
@@ -209,10 +212,10 @@
     storage.start(uriUpdatesB);
 
     storage.finish(uriUpdatesA);
-    assertThat(storage.listRunning()).containsExactly(refUpdateB);
+    assertThatStream(storage.streamRunning()).containsExactly(refUpdateB);
 
     storage.finish(uriUpdatesB);
-    assertThat(storage.listRunning()).isEmpty();
+    assertThatStream(storage.streamRunning()).isEmpty();
   }
 
   @Test
@@ -221,8 +224,8 @@
     storage.start(uriUpdates);
 
     storage.reset(uriUpdates);
-    assertThat(storage.listWaiting()).containsExactly(REF_UPDATE);
-    assertThat(storage.listRunning()).isEmpty();
+    assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamRunning()).isEmpty();
   }
 
   @Test
@@ -232,8 +235,8 @@
     storage.reset(uriUpdates);
 
     storage.start(uriUpdates);
-    assertThat(storage.listRunning()).containsExactly(REF_UPDATE);
-    assertThat(storage.listWaiting()).isEmpty();
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).isEmpty();
 
     storage.finish(uriUpdates);
     assertNoIncompleteTasks(storage);
@@ -251,8 +254,8 @@
     storage.start(uriUpdates);
 
     storage.resetAll();
-    assertThat(storage.listWaiting()).containsExactly(REF_UPDATE);
-    assertThat(storage.listRunning()).isEmpty();
+    assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamRunning()).isEmpty();
   }
 
   @Test
@@ -262,8 +265,8 @@
     storage.resetAll();
 
     storage.start(uriUpdates);
-    assertThat(storage.listRunning()).containsExactly(REF_UPDATE);
-    assertThat(storage.listWaiting()).isEmpty();
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).isEmpty();
 
     storage.finish(uriUpdates);
     assertNoIncompleteTasks(storage);
@@ -284,7 +287,7 @@
     storage.start(uriUpdatesB);
 
     storage.resetAll();
-    assertThat(storage.listWaiting()).containsExactly(REF_UPDATE, updateB);
+    assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE, updateB);
   }
 
   @Test
@@ -303,12 +306,12 @@
     storage.resetAll();
 
     storage.start(uriUpdates);
-    assertThat(storage.listRunning()).containsExactly(REF_UPDATE);
-    assertThat(storage.listWaiting()).containsExactly(updateB);
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+    assertThatStream(storage.streamWaiting()).containsExactly(updateB);
 
     storage.start(uriUpdatesB);
-    assertThat(storage.listRunning()).containsExactly(REF_UPDATE, updateB);
-    assertThat(storage.listWaiting()).isEmpty();
+    assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE, updateB);
+    assertThatStream(storage.streamWaiting()).isEmpty();
 
     storage.finish(uriUpdates);
     storage.finish(uriUpdatesB);
@@ -347,12 +350,16 @@
 
     storage.finish(uriUpdates);
     storage.finish(uriUpdatesB);
-    assertThat(storage.listRunning()).isEmpty();
+    assertThatStream(storage.streamRunning()).isEmpty();
   }
 
   private void assertNoIncompleteTasks(ReplicationTasksStorage storage) {
-    assertThat(storage.listWaiting()).isEmpty();
-    assertThat(storage.listRunning()).isEmpty();
+    assertThatStream(storage.streamWaiting()).isEmpty();
+    assertThatStream(storage.streamRunning()).isEmpty();
+  }
+
+  private IterableSubject assertThatStream(Stream<?> stream) {
+    return assertThat(stream.collect(Collectors.toList()));
   }
 
   public static URIish getUrish(String uri) {