ReplicationTaskStorage: Use Stream API instead of List Change-Id: I76c23bf472ebc0cad804207699fdca0e9cf4de14
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index cd4a3fb..90d159d 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -202,13 +202,17 @@ replaying = true; try { replaying = true; - for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) { - try { - fire(new URIish(t.uri()), Project.nameKey(t.project()), t.ref()); - } catch (URISyntaxException e) { - repLog.atSevere().withCause(e).log("Encountered malformed URI for persisted event %s", t); - } - } + replicationTasksStorage + .streamWaiting() + .forEach( + t -> { + try { + fire(new URIish(t.uri()), Project.nameKey(t.project()), t.ref()); + } catch (URISyntaxException e) { + repLog.atSevere().withCause(e).log( + "Encountered malformed URI for persisted event %s", t); + } + }); } catch (Throwable e) { repLog.atSevere().withCause(e).log("Unexpected error while firing pending events"); } finally {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java index 68661cf..c67bb49 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -32,9 +32,7 @@ import java.nio.file.NotDirectoryException; import java.nio.file.Path; import java.nio.file.StandardCopyOption; -import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.transport.URIish; @@ -147,9 +145,7 @@ } public synchronized void resetAll() { - for (ReplicateRefUpdate r : list(createDir(runningUpdates))) { - new Task(r).reset(); - } + streamRunning().forEach(r -> new Task(r).reset()); } public boolean isWaiting(UriUpdates uriUpdates) { @@ -164,16 +160,12 @@ } } - public List<ReplicateRefUpdate> listWaiting() { - return list(createDir(waitingUpdates)); + public Stream<ReplicateRefUpdate> streamWaiting() { + return streamRecursive(createDir(waitingUpdates)); } - public List<ReplicateRefUpdate> listRunning() { - return list(createDir(runningUpdates)); - } - - private List<ReplicateRefUpdate> list(Path taskDir) { - return streamRecursive(taskDir).collect(Collectors.toList()); + public Stream<ReplicateRefUpdate> streamRunning() { + return streamRecursive(createDir(runningUpdates)); } private Stream<ReplicateRefUpdate> streamRecursive(Path dir) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java index 7e336d1..7f8282a 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -247,7 +247,7 @@ private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) { Pattern refmaskPattern = Pattern.compile(refRegex); synchronized (tasksStorage) { - return Stream.concat(tasksStorage.listWaiting().stream(), tasksStorage.listRunning().stream()) + return Stream.concat(tasksStorage.streamWaiting(), tasksStorage.streamRunning()) .filter(task -> refmaskPattern.matcher(task.ref()).matches()) .collect(toList()); }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java index f0d2347..e0e3380 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -49,6 +49,7 @@ import java.util.Optional; import java.util.function.Supplier; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.eclipse.jgit.lib.Constants; import org.eclipse.jgit.lib.Ref; @@ -453,11 +454,11 @@ config.setInt("remote", "task_cleanup_project", "replicationRetry", 0); config.save(); reloadConfig(); - assertThat(tasksStorage.listRunning()).hasSize(0); + assertThat(listRunning()).hasSize(0); Project.NameKey sourceProject = createTestProject("task_cleanup_project"); waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git"))); - waitUntil(() -> tasksStorage.listRunning().size() == 0); + waitUntil(() -> listRunning().size() == 0); } @Test @@ -466,7 +467,7 @@ config.setInt("remote", "task_cleanup_locks_project", "replicationRetry", 0); config.save(); reloadConfig(); - assertThat(tasksStorage.listRunning()).hasSize(0); + assertThat(listRunning()).hasSize(0); Project.NameKey sourceProject = createTestProject("task_cleanup_locks_project"); waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git"))); @@ -492,7 +493,7 @@ .findFirst() .get(); - waitUntil(() -> tasksStorage.listRunning().size() == 0); + waitUntil(() -> listRunning().size() == 0); createTestProject(projectName); @@ -525,7 +526,7 @@ String changeRef = createChange().getPatchSet().refName(); - changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1) + changeReplicationTasksForRemote(tasksStorage.streamWaiting(), changeRef, remote1) .forEach( (update) -> { try { @@ -675,26 +676,33 @@ return projectOperations.newProject().name(name).create(); } - private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) { - Pattern refmaskPattern = Pattern.compile(refRegex); - return streamIncompleteTasks() - .filter(task -> refmaskPattern.matcher(task.ref()).matches()) - .collect(toList()); - } - - private List<ReplicateRefUpdate> listIncompleteTasks() { - return streamIncompleteTasks().collect(toList()); + public List<ReplicateRefUpdate> listRunning() { + return tasksStorage.streamRunning().collect(Collectors.toList()); } @SuppressWarnings( "SynchronizeOnNonFinalField") // tasksStorage is non-final but only set in setUpTestPlugin() - private Stream<ReplicateRefUpdate> streamIncompleteTasks() { + private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) { + Pattern refmaskPattern = Pattern.compile(refRegex); synchronized (tasksStorage) { - return Stream.concat( - tasksStorage.listWaiting().stream(), tasksStorage.listRunning().stream()); + return streamIncompleteTasks() + .filter(task -> refmaskPattern.matcher(task.ref()).matches()) + .collect(toList()); } } + @SuppressWarnings( + "SynchronizeOnNonFinalField") // tasksStorage is non-final but only set in setUpTestPlugin() + private List<ReplicateRefUpdate> listIncompleteTasks() { + synchronized (tasksStorage) { + return streamIncompleteTasks().collect(toList()); + } + } + + private Stream<ReplicateRefUpdate> streamIncompleteTasks() { + return Stream.concat(tasksStorage.streamWaiting(), tasksStorage.streamRunning()); + } + public void cleanupReplicationTasks() throws IOException { cleanupReplicationTasks(storagePath); }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java index 948044b..4b09d52 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -20,10 +20,13 @@ import com.google.common.jimfs.Configuration; import com.google.common.jimfs.Jimfs; +import com.google.common.truth.IterableSubject; import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate; import java.net.URISyntaxException; import java.nio.file.FileSystem; import java.nio.file.Path; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.eclipse.jgit.transport.URIish; import org.junit.After; import org.junit.Before; @@ -57,22 +60,22 @@ @Test public void canListEmptyStorage() throws Exception { - assertThat(storage.listWaiting()).isEmpty(); - assertThat(storage.listRunning()).isEmpty(); + assertThatStream(storage.streamWaiting()).isEmpty(); + assertThatStream(storage.streamRunning()).isEmpty(); } @Test public void canListWaitingUpdate() throws Exception { storage.create(REF_UPDATE); - assertThat(storage.listWaiting()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE); } @Test public void canStartWaitingUpdate() throws Exception { storage.create(REF_UPDATE); storage.start(uriUpdates); - assertThat(storage.listWaiting()).isEmpty(); - assertThat(storage.listRunning()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamWaiting()).isEmpty(); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE); } @Test @@ -87,22 +90,22 @@ public void instancesOfTheSameStorageHaveTheSameElements() throws Exception { ReplicationTasksStorage persistedView = new ReplicationTasksStorage(storageSite); - assertThat(storage.listWaiting()).isEmpty(); - assertThat(persistedView.listWaiting()).isEmpty(); + assertThatStream(storage.streamWaiting()).isEmpty(); + assertThatStream(persistedView.streamWaiting()).isEmpty(); storage.create(REF_UPDATE); - assertThat(storage.listWaiting()).containsExactly(REF_UPDATE); - assertThat(persistedView.listWaiting()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE); + assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE); storage.start(uriUpdates); - assertThat(storage.listWaiting()).isEmpty(); - assertThat(persistedView.listWaiting()).isEmpty(); - assertThat(storage.listRunning()).containsExactly(REF_UPDATE); - assertThat(persistedView.listRunning()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamWaiting()).isEmpty(); + assertThatStream(persistedView.streamWaiting()).isEmpty(); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE); + assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE); storage.finish(uriUpdates); - assertThat(storage.listRunning()).isEmpty(); - assertThat(persistedView.listRunning()).isEmpty(); + assertThatStream(storage.streamRunning()).isEmpty(); + assertThatStream(persistedView.streamRunning()).isEmpty(); } @Test @@ -110,7 +113,7 @@ String key = storage.create(REF_UPDATE); String secondKey = storage.create(REF_UPDATE); assertEquals(key, secondKey); - assertThat(storage.listWaiting()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE); } @Test @@ -124,7 +127,7 @@ String keyA = storage.create(REF_UPDATE); String keyB = storage.create(updateB); - assertThat(storage.listWaiting()).hasSize(2); + assertThatStream(storage.streamWaiting()).hasSize(2); assertNotEquals(keyA, keyB); } @@ -141,12 +144,12 @@ storage.create(updateB); storage.start(uriUpdates); - assertThat(storage.listWaiting()).containsExactly(updateB); - assertThat(storage.listRunning()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamWaiting()).containsExactly(updateB); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE); storage.start(uriUpdatesB); - assertThat(storage.listWaiting()).isEmpty(); - assertThat(storage.listRunning()).containsExactly(REF_UPDATE, updateB); + assertThatStream(storage.streamWaiting()).isEmpty(); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE, updateB); } @Test @@ -164,10 +167,10 @@ storage.start(uriUpdatesB); storage.finish(uriUpdates); - assertThat(storage.listRunning()).containsExactly(updateB); + assertThatStream(storage.streamRunning()).containsExactly(updateB); storage.finish(uriUpdatesB); - assertThat(storage.listRunning()).isEmpty(); + assertThatStream(storage.streamRunning()).isEmpty(); } @Test @@ -183,7 +186,7 @@ storage.create(updateB); storage.create(REF_UPDATE); storage.create(updateB); - assertThat(storage.listWaiting()).hasSize(2); + assertThatStream(storage.streamWaiting()).hasSize(2); } @Test @@ -193,7 +196,7 @@ String keyA = storage.create(refA); String keyB = storage.create(refB); - assertThat(storage.listWaiting()).hasSize(2); + assertThatStream(storage.streamWaiting()).hasSize(2); assertNotEquals(keyA, keyB); } @@ -209,10 +212,10 @@ storage.start(uriUpdatesB); storage.finish(uriUpdatesA); - assertThat(storage.listRunning()).containsExactly(refUpdateB); + assertThatStream(storage.streamRunning()).containsExactly(refUpdateB); storage.finish(uriUpdatesB); - assertThat(storage.listRunning()).isEmpty(); + assertThatStream(storage.streamRunning()).isEmpty(); } @Test @@ -221,8 +224,8 @@ storage.start(uriUpdates); storage.reset(uriUpdates); - assertThat(storage.listWaiting()).containsExactly(REF_UPDATE); - assertThat(storage.listRunning()).isEmpty(); + assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamRunning()).isEmpty(); } @Test @@ -232,8 +235,8 @@ storage.reset(uriUpdates); storage.start(uriUpdates); - assertThat(storage.listRunning()).containsExactly(REF_UPDATE); - assertThat(storage.listWaiting()).isEmpty(); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamWaiting()).isEmpty(); storage.finish(uriUpdates); assertNoIncompleteTasks(storage); @@ -251,8 +254,8 @@ storage.start(uriUpdates); storage.resetAll(); - assertThat(storage.listWaiting()).containsExactly(REF_UPDATE); - assertThat(storage.listRunning()).isEmpty(); + assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamRunning()).isEmpty(); } @Test @@ -262,8 +265,8 @@ storage.resetAll(); storage.start(uriUpdates); - assertThat(storage.listRunning()).containsExactly(REF_UPDATE); - assertThat(storage.listWaiting()).isEmpty(); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamWaiting()).isEmpty(); storage.finish(uriUpdates); assertNoIncompleteTasks(storage); @@ -284,7 +287,7 @@ storage.start(uriUpdatesB); storage.resetAll(); - assertThat(storage.listWaiting()).containsExactly(REF_UPDATE, updateB); + assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE, updateB); } @Test @@ -303,12 +306,12 @@ storage.resetAll(); storage.start(uriUpdates); - assertThat(storage.listRunning()).containsExactly(REF_UPDATE); - assertThat(storage.listWaiting()).containsExactly(updateB); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamWaiting()).containsExactly(updateB); storage.start(uriUpdatesB); - assertThat(storage.listRunning()).containsExactly(REF_UPDATE, updateB); - assertThat(storage.listWaiting()).isEmpty(); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE, updateB); + assertThatStream(storage.streamWaiting()).isEmpty(); storage.finish(uriUpdates); storage.finish(uriUpdatesB); @@ -347,12 +350,16 @@ storage.finish(uriUpdates); storage.finish(uriUpdatesB); - assertThat(storage.listRunning()).isEmpty(); + assertThatStream(storage.streamRunning()).isEmpty(); } private void assertNoIncompleteTasks(ReplicationTasksStorage storage) { - assertThat(storage.listWaiting()).isEmpty(); - assertThat(storage.listRunning()).isEmpty(); + assertThatStream(storage.streamWaiting()).isEmpty(); + assertThatStream(storage.streamRunning()).isEmpty(); + } + + private IterableSubject assertThatStream(Stream<?> stream) { + return assertThat(stream.collect(Collectors.toList())); } public static URIish getUrish(String uri) {