ReplicationTaskStorage: Use Stream API instead of List
Change-Id: I76c23bf472ebc0cad804207699fdca0e9cf4de14
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index cd4a3fb..90d159d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -202,13 +202,17 @@
replaying = true;
try {
replaying = true;
- for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
- try {
- fire(new URIish(t.uri()), Project.nameKey(t.project()), t.ref());
- } catch (URISyntaxException e) {
- repLog.atSevere().withCause(e).log("Encountered malformed URI for persisted event %s", t);
- }
- }
+ replicationTasksStorage
+ .streamWaiting()
+ .forEach(
+ t -> {
+ try {
+ fire(new URIish(t.uri()), Project.nameKey(t.project()), t.ref());
+ } catch (URISyntaxException e) {
+ repLog.atSevere().withCause(e).log(
+ "Encountered malformed URI for persisted event %s", t);
+ }
+ });
} catch (Throwable e) {
repLog.atSevere().withCause(e).log("Unexpected error while firing pending events");
} finally {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 68661cf..c67bb49 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -32,9 +32,7 @@
import java.nio.file.NotDirectoryException;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
-import java.util.List;
import java.util.Optional;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.transport.URIish;
@@ -147,9 +145,7 @@
}
public synchronized void resetAll() {
- for (ReplicateRefUpdate r : list(createDir(runningUpdates))) {
- new Task(r).reset();
- }
+ streamRunning().forEach(r -> new Task(r).reset());
}
public boolean isWaiting(UriUpdates uriUpdates) {
@@ -164,16 +160,12 @@
}
}
- public List<ReplicateRefUpdate> listWaiting() {
- return list(createDir(waitingUpdates));
+ public Stream<ReplicateRefUpdate> streamWaiting() {
+ return streamRecursive(createDir(waitingUpdates));
}
- public List<ReplicateRefUpdate> listRunning() {
- return list(createDir(runningUpdates));
- }
-
- private List<ReplicateRefUpdate> list(Path taskDir) {
- return streamRecursive(taskDir).collect(Collectors.toList());
+ public Stream<ReplicateRefUpdate> streamRunning() {
+ return streamRecursive(createDir(runningUpdates));
}
private Stream<ReplicateRefUpdate> streamRecursive(Path dir) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
index 7e336d1..7f8282a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -247,7 +247,7 @@
private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) {
Pattern refmaskPattern = Pattern.compile(refRegex);
synchronized (tasksStorage) {
- return Stream.concat(tasksStorage.listWaiting().stream(), tasksStorage.listRunning().stream())
+ return Stream.concat(tasksStorage.streamWaiting(), tasksStorage.streamRunning())
.filter(task -> refmaskPattern.matcher(task.ref()).matches())
.collect(toList());
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index f0d2347..e0e3380 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -49,6 +49,7 @@
import java.util.Optional;
import java.util.function.Supplier;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.Ref;
@@ -453,11 +454,11 @@
config.setInt("remote", "task_cleanup_project", "replicationRetry", 0);
config.save();
reloadConfig();
- assertThat(tasksStorage.listRunning()).hasSize(0);
+ assertThat(listRunning()).hasSize(0);
Project.NameKey sourceProject = createTestProject("task_cleanup_project");
waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")));
- waitUntil(() -> tasksStorage.listRunning().size() == 0);
+ waitUntil(() -> listRunning().size() == 0);
}
@Test
@@ -466,7 +467,7 @@
config.setInt("remote", "task_cleanup_locks_project", "replicationRetry", 0);
config.save();
reloadConfig();
- assertThat(tasksStorage.listRunning()).hasSize(0);
+ assertThat(listRunning()).hasSize(0);
Project.NameKey sourceProject = createTestProject("task_cleanup_locks_project");
waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")));
@@ -492,7 +493,7 @@
.findFirst()
.get();
- waitUntil(() -> tasksStorage.listRunning().size() == 0);
+ waitUntil(() -> listRunning().size() == 0);
createTestProject(projectName);
@@ -525,7 +526,7 @@
String changeRef = createChange().getPatchSet().refName();
- changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
+ changeReplicationTasksForRemote(tasksStorage.streamWaiting(), changeRef, remote1)
.forEach(
(update) -> {
try {
@@ -675,26 +676,33 @@
return projectOperations.newProject().name(name).create();
}
- private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) {
- Pattern refmaskPattern = Pattern.compile(refRegex);
- return streamIncompleteTasks()
- .filter(task -> refmaskPattern.matcher(task.ref()).matches())
- .collect(toList());
- }
-
- private List<ReplicateRefUpdate> listIncompleteTasks() {
- return streamIncompleteTasks().collect(toList());
+ public List<ReplicateRefUpdate> listRunning() {
+ return tasksStorage.streamRunning().collect(Collectors.toList());
}
@SuppressWarnings(
"SynchronizeOnNonFinalField") // tasksStorage is non-final but only set in setUpTestPlugin()
- private Stream<ReplicateRefUpdate> streamIncompleteTasks() {
+ private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) {
+ Pattern refmaskPattern = Pattern.compile(refRegex);
synchronized (tasksStorage) {
- return Stream.concat(
- tasksStorage.listWaiting().stream(), tasksStorage.listRunning().stream());
+ return streamIncompleteTasks()
+ .filter(task -> refmaskPattern.matcher(task.ref()).matches())
+ .collect(toList());
}
}
+ @SuppressWarnings(
+ "SynchronizeOnNonFinalField") // tasksStorage is non-final but only set in setUpTestPlugin()
+ private List<ReplicateRefUpdate> listIncompleteTasks() {
+ synchronized (tasksStorage) {
+ return streamIncompleteTasks().collect(toList());
+ }
+ }
+
+ private Stream<ReplicateRefUpdate> streamIncompleteTasks() {
+ return Stream.concat(tasksStorage.streamWaiting(), tasksStorage.streamRunning());
+ }
+
public void cleanupReplicationTasks() throws IOException {
cleanupReplicationTasks(storagePath);
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
index 948044b..4b09d52 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -20,10 +20,13 @@
import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
+import com.google.common.truth.IterableSubject;
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.net.URISyntaxException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.eclipse.jgit.transport.URIish;
import org.junit.After;
import org.junit.Before;
@@ -57,22 +60,22 @@
@Test
public void canListEmptyStorage() throws Exception {
- assertThat(storage.listWaiting()).isEmpty();
- assertThat(storage.listRunning()).isEmpty();
+ assertThatStream(storage.streamWaiting()).isEmpty();
+ assertThatStream(storage.streamRunning()).isEmpty();
}
@Test
public void canListWaitingUpdate() throws Exception {
storage.create(REF_UPDATE);
- assertThat(storage.listWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
}
@Test
public void canStartWaitingUpdate() throws Exception {
storage.create(REF_UPDATE);
storage.start(uriUpdates);
- assertThat(storage.listWaiting()).isEmpty();
- assertThat(storage.listRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).isEmpty();
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
}
@Test
@@ -87,22 +90,22 @@
public void instancesOfTheSameStorageHaveTheSameElements() throws Exception {
ReplicationTasksStorage persistedView = new ReplicationTasksStorage(storageSite);
- assertThat(storage.listWaiting()).isEmpty();
- assertThat(persistedView.listWaiting()).isEmpty();
+ assertThatStream(storage.streamWaiting()).isEmpty();
+ assertThatStream(persistedView.streamWaiting()).isEmpty();
storage.create(REF_UPDATE);
- assertThat(storage.listWaiting()).containsExactly(REF_UPDATE);
- assertThat(persistedView.listWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
storage.start(uriUpdates);
- assertThat(storage.listWaiting()).isEmpty();
- assertThat(persistedView.listWaiting()).isEmpty();
- assertThat(storage.listRunning()).containsExactly(REF_UPDATE);
- assertThat(persistedView.listRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).isEmpty();
+ assertThatStream(persistedView.streamWaiting()).isEmpty();
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
storage.finish(uriUpdates);
- assertThat(storage.listRunning()).isEmpty();
- assertThat(persistedView.listRunning()).isEmpty();
+ assertThatStream(storage.streamRunning()).isEmpty();
+ assertThatStream(persistedView.streamRunning()).isEmpty();
}
@Test
@@ -110,7 +113,7 @@
String key = storage.create(REF_UPDATE);
String secondKey = storage.create(REF_UPDATE);
assertEquals(key, secondKey);
- assertThat(storage.listWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
}
@Test
@@ -124,7 +127,7 @@
String keyA = storage.create(REF_UPDATE);
String keyB = storage.create(updateB);
- assertThat(storage.listWaiting()).hasSize(2);
+ assertThatStream(storage.streamWaiting()).hasSize(2);
assertNotEquals(keyA, keyB);
}
@@ -141,12 +144,12 @@
storage.create(updateB);
storage.start(uriUpdates);
- assertThat(storage.listWaiting()).containsExactly(updateB);
- assertThat(storage.listRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(updateB);
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
storage.start(uriUpdatesB);
- assertThat(storage.listWaiting()).isEmpty();
- assertThat(storage.listRunning()).containsExactly(REF_UPDATE, updateB);
+ assertThatStream(storage.streamWaiting()).isEmpty();
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE, updateB);
}
@Test
@@ -164,10 +167,10 @@
storage.start(uriUpdatesB);
storage.finish(uriUpdates);
- assertThat(storage.listRunning()).containsExactly(updateB);
+ assertThatStream(storage.streamRunning()).containsExactly(updateB);
storage.finish(uriUpdatesB);
- assertThat(storage.listRunning()).isEmpty();
+ assertThatStream(storage.streamRunning()).isEmpty();
}
@Test
@@ -183,7 +186,7 @@
storage.create(updateB);
storage.create(REF_UPDATE);
storage.create(updateB);
- assertThat(storage.listWaiting()).hasSize(2);
+ assertThatStream(storage.streamWaiting()).hasSize(2);
}
@Test
@@ -193,7 +196,7 @@
String keyA = storage.create(refA);
String keyB = storage.create(refB);
- assertThat(storage.listWaiting()).hasSize(2);
+ assertThatStream(storage.streamWaiting()).hasSize(2);
assertNotEquals(keyA, keyB);
}
@@ -209,10 +212,10 @@
storage.start(uriUpdatesB);
storage.finish(uriUpdatesA);
- assertThat(storage.listRunning()).containsExactly(refUpdateB);
+ assertThatStream(storage.streamRunning()).containsExactly(refUpdateB);
storage.finish(uriUpdatesB);
- assertThat(storage.listRunning()).isEmpty();
+ assertThatStream(storage.streamRunning()).isEmpty();
}
@Test
@@ -221,8 +224,8 @@
storage.start(uriUpdates);
storage.reset(uriUpdates);
- assertThat(storage.listWaiting()).containsExactly(REF_UPDATE);
- assertThat(storage.listRunning()).isEmpty();
+ assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamRunning()).isEmpty();
}
@Test
@@ -232,8 +235,8 @@
storage.reset(uriUpdates);
storage.start(uriUpdates);
- assertThat(storage.listRunning()).containsExactly(REF_UPDATE);
- assertThat(storage.listWaiting()).isEmpty();
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).isEmpty();
storage.finish(uriUpdates);
assertNoIncompleteTasks(storage);
@@ -251,8 +254,8 @@
storage.start(uriUpdates);
storage.resetAll();
- assertThat(storage.listWaiting()).containsExactly(REF_UPDATE);
- assertThat(storage.listRunning()).isEmpty();
+ assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamRunning()).isEmpty();
}
@Test
@@ -262,8 +265,8 @@
storage.resetAll();
storage.start(uriUpdates);
- assertThat(storage.listRunning()).containsExactly(REF_UPDATE);
- assertThat(storage.listWaiting()).isEmpty();
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).isEmpty();
storage.finish(uriUpdates);
assertNoIncompleteTasks(storage);
@@ -284,7 +287,7 @@
storage.start(uriUpdatesB);
storage.resetAll();
- assertThat(storage.listWaiting()).containsExactly(REF_UPDATE, updateB);
+ assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE, updateB);
}
@Test
@@ -303,12 +306,12 @@
storage.resetAll();
storage.start(uriUpdates);
- assertThat(storage.listRunning()).containsExactly(REF_UPDATE);
- assertThat(storage.listWaiting()).containsExactly(updateB);
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(updateB);
storage.start(uriUpdatesB);
- assertThat(storage.listRunning()).containsExactly(REF_UPDATE, updateB);
- assertThat(storage.listWaiting()).isEmpty();
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE, updateB);
+ assertThatStream(storage.streamWaiting()).isEmpty();
storage.finish(uriUpdates);
storage.finish(uriUpdatesB);
@@ -347,12 +350,16 @@
storage.finish(uriUpdates);
storage.finish(uriUpdatesB);
- assertThat(storage.listRunning()).isEmpty();
+ assertThatStream(storage.streamRunning()).isEmpty();
}
private void assertNoIncompleteTasks(ReplicationTasksStorage storage) {
- assertThat(storage.listWaiting()).isEmpty();
- assertThat(storage.listRunning()).isEmpty();
+ assertThatStream(storage.streamWaiting()).isEmpty();
+ assertThatStream(storage.streamRunning()).isEmpty();
+ }
+
+ private IterableSubject assertThatStream(Stream<?> stream) {
+ return assertThat(stream.collect(Collectors.toList()));
}
public static URIish getUrish(String uri) {