Synchronize access to ReplicationTasksStorage

The ReplicationTasksStorage can be subject to concurrency issues when a
replication task is moved across directories (waiting/running/building)
concurrently with the listing.

The result of the uncontrolled concurrency could be lead to:

1. Flaky tests because of the replication tasks found two or more times
   in different directories
2. Flaky tests because of the failure to list replication tasks
   that are escaping across directories because of the rename
3. File-based exceptions when replication tasks are moved concurrently
   by two threads to different directories.

The replication tasks storage is supposed to contain only small files and only
in-flight operations: the overhead of the additional synchronisation is thus
negligible compared to the overall latency of the replication itself.

To eliminate all residual latency, cleanup all the replication tasks
on all subdirectories at the start of the tests.

Bug: Issue 11843
Change-Id: I5f6293b3f22f0943df79f8ab2cb2c217210e5236
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 5564925..8130051 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -85,19 +85,20 @@
 
   private static Gson GSON = new Gson();
 
+  private final Path refUpdates;
   private final Path buildingUpdates;
   private final Path runningUpdates;
   private final Path waitingUpdates;
 
   @Inject
   ReplicationTasksStorage(ReplicationConfig config) {
-    Path refUpdates = config.getEventsDirectory().resolve("ref-updates");
+    refUpdates = config.getEventsDirectory().resolve("ref-updates");
     buildingUpdates = refUpdates.resolve("building");
     runningUpdates = refUpdates.resolve("running");
     waitingUpdates = refUpdates.resolve("waiting");
   }
 
-  public String create(ReplicateRefUpdate r) {
+  public synchronized String create(ReplicateRefUpdate r) {
     return new Task(r).create();
   }
 
@@ -106,44 +107,49 @@
     this.disableDeleteForTesting = deleteDisabled;
   }
 
-  public void start(PushOne push) {
+  public synchronized void start(PushOne push) {
     for (String ref : push.getRefs()) {
       new Task(new ReplicateRefUpdate(push, ref)).start();
     }
   }
 
-  public void reset(PushOne push) {
+  public synchronized void reset(PushOne push) {
     for (String ref : push.getRefs()) {
       new Task(new ReplicateRefUpdate(push, ref)).reset();
     }
   }
 
-  public void resetAll() {
-    for (ReplicateRefUpdate r : list(createDir(runningUpdates))) {
+  public synchronized void resetAll() {
+    for (ReplicateRefUpdate r : listRunning()) {
       new Task(r).reset();
     }
   }
 
-  public void finish(PushOne push) {
+  public synchronized void finish(PushOne push) {
     for (String ref : push.getRefs()) {
       new Task(new ReplicateRefUpdate(push, ref)).finish();
     }
   }
 
-  public List<ReplicateRefUpdate> listWaiting() {
+  public synchronized List<ReplicateRefUpdate> listWaiting() {
     return list(createDir(waitingUpdates));
   }
 
   @VisibleForTesting
-  public List<ReplicateRefUpdate> listRunning() {
+  public synchronized List<ReplicateRefUpdate> listRunning() {
     return list(createDir(runningUpdates));
   }
 
   @VisibleForTesting
-  public List<ReplicateRefUpdate> listBuilding() {
+  public synchronized List<ReplicateRefUpdate> listBuilding() {
     return list(createDir(buildingUpdates));
   }
 
+  @VisibleForTesting
+  public synchronized List<ReplicateRefUpdate> list() {
+    return list(createDir(refUpdates));
+  }
+
   private List<ReplicateRefUpdate> list(Path tasks) {
     List<ReplicateRefUpdate> results = new ArrayList<>();
     try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) {
@@ -151,6 +157,8 @@
         if (Files.isRegularFile(e)) {
           String json = new String(Files.readAllBytes(e), UTF_8);
           results.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+        } else if (Files.isDirectory(e)) {
+          results.addAll(list(e));
         }
       }
     } catch (IOException e) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 41f93b3..9cf5489 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -42,7 +42,6 @@
 import java.util.Optional;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
-import java.util.stream.Stream;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.Repository;
@@ -400,18 +399,23 @@
 
   private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
     Pattern refmaskPattern = Pattern.compile(refRegex);
-    return Stream.concat(
-            tasksStorage.listWaiting().stream(),
-            Stream.concat(
-                tasksStorage.listBuilding().stream(), tasksStorage.listRunning().stream()))
+    return tasksStorage.list().stream()
         .filter(task -> refmaskPattern.matcher(task.ref).matches())
         .collect(toList());
   }
 
-  private void cleanupReplicationTasks() throws IOException {
-    try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) {
+  public void cleanupReplicationTasks() throws IOException {
+    cleanupReplicationTasks(storagePath);
+  }
+
+  private void cleanupReplicationTasks(Path basePath) throws IOException {
+    try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) {
       for (Path path : files) {
-        path.toFile().delete();
+        if (Files.isDirectory(path)) {
+          cleanupReplicationTasks(path);
+        } else {
+          path.toFile().delete();
+        }
       }
     }
   }