Synchronize access to ReplicationTasksStorage The ReplicationTasksStorage can be subject to concurrency issues when a replication task is moved across directories (waiting/running/building) concurrently with the listing. The result of the uncontrolled concurrency could be lead to: 1. Flaky tests because of the replication tasks found two or more times in different directories 2. Flaky tests because of the failure to list replication tasks that are escaping across directories because of the rename 3. File-based exceptions when replication tasks are moved concurrently by two threads to different directories. The replication tasks storage is supposed to contain only small files and only in-flight operations: the overhead of the additional synchronisation is thus negligible compared to the overall latency of the replication itself. To eliminate all residual latency, cleanup all the replication tasks on all subdirectories at the start of the tests. Bug: Issue 11843 Change-Id: I5f6293b3f22f0943df79f8ab2cb2c217210e5236
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java index 5564925..8130051 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -85,19 +85,20 @@ private static Gson GSON = new Gson(); + private final Path refUpdates; private final Path buildingUpdates; private final Path runningUpdates; private final Path waitingUpdates; @Inject ReplicationTasksStorage(ReplicationConfig config) { - Path refUpdates = config.getEventsDirectory().resolve("ref-updates"); + refUpdates = config.getEventsDirectory().resolve("ref-updates"); buildingUpdates = refUpdates.resolve("building"); runningUpdates = refUpdates.resolve("running"); waitingUpdates = refUpdates.resolve("waiting"); } - public String create(ReplicateRefUpdate r) { + public synchronized String create(ReplicateRefUpdate r) { return new Task(r).create(); } @@ -106,44 +107,49 @@ this.disableDeleteForTesting = deleteDisabled; } - public void start(PushOne push) { + public synchronized void start(PushOne push) { for (String ref : push.getRefs()) { new Task(new ReplicateRefUpdate(push, ref)).start(); } } - public void reset(PushOne push) { + public synchronized void reset(PushOne push) { for (String ref : push.getRefs()) { new Task(new ReplicateRefUpdate(push, ref)).reset(); } } - public void resetAll() { - for (ReplicateRefUpdate r : list(createDir(runningUpdates))) { + public synchronized void resetAll() { + for (ReplicateRefUpdate r : listRunning()) { new Task(r).reset(); } } - public void finish(PushOne push) { + public synchronized void finish(PushOne push) { for (String ref : push.getRefs()) { new Task(new ReplicateRefUpdate(push, ref)).finish(); } } - public List<ReplicateRefUpdate> listWaiting() { + public synchronized List<ReplicateRefUpdate> listWaiting() { return list(createDir(waitingUpdates)); } @VisibleForTesting - public List<ReplicateRefUpdate> listRunning() { + public synchronized List<ReplicateRefUpdate> listRunning() { return list(createDir(runningUpdates)); } @VisibleForTesting - public List<ReplicateRefUpdate> listBuilding() { + public synchronized List<ReplicateRefUpdate> listBuilding() { return list(createDir(buildingUpdates)); } + @VisibleForTesting + public synchronized List<ReplicateRefUpdate> list() { + return list(createDir(refUpdates)); + } + private List<ReplicateRefUpdate> list(Path tasks) { List<ReplicateRefUpdate> results = new ArrayList<>(); try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) { @@ -151,6 +157,8 @@ if (Files.isRegularFile(e)) { String json = new String(Files.readAllBytes(e), UTF_8); results.add(GSON.fromJson(json, ReplicateRefUpdate.class)); + } else if (Files.isDirectory(e)) { + results.addAll(list(e)); } } } catch (IOException e) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java index 41f93b3..9cf5489 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -42,7 +42,6 @@ import java.util.Optional; import java.util.function.Supplier; import java.util.regex.Pattern; -import java.util.stream.Stream; import org.eclipse.jgit.lib.Constants; import org.eclipse.jgit.lib.Ref; import org.eclipse.jgit.lib.Repository; @@ -400,18 +399,23 @@ private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) { Pattern refmaskPattern = Pattern.compile(refRegex); - return Stream.concat( - tasksStorage.listWaiting().stream(), - Stream.concat( - tasksStorage.listBuilding().stream(), tasksStorage.listRunning().stream())) + return tasksStorage.list().stream() .filter(task -> refmaskPattern.matcher(task.ref).matches()) .collect(toList()); } - private void cleanupReplicationTasks() throws IOException { - try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) { + public void cleanupReplicationTasks() throws IOException { + cleanupReplicationTasks(storagePath); + } + + private void cleanupReplicationTasks(Path basePath) throws IOException { + try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) { for (Path path : files) { - path.toFile().delete(); + if (Files.isDirectory(path)) { + cleanupReplicationTasks(path); + } else { + path.toFile().delete(); + } } } }