Merge branch 'stable-3.1'

* stable-3.1:
  Synchronize access to ReplicationTasksStorage

Change-Id: I43047730142e2d2324ca8cc929a4e69e61c995c5
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index e827d4f..a557eb7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -113,19 +113,20 @@
 
   private static Gson GSON = new Gson();
 
+  private final Path refUpdates;
   private final Path buildingUpdates;
   private final Path runningUpdates;
   private final Path waitingUpdates;
 
   @Inject
   ReplicationTasksStorage(ReplicationConfig config) {
-    Path refUpdates = config.getEventsDirectory().resolve("ref-updates");
+    refUpdates = config.getEventsDirectory().resolve("ref-updates");
     buildingUpdates = refUpdates.resolve("building");
     runningUpdates = refUpdates.resolve("running");
     waitingUpdates = refUpdates.resolve("waiting");
   }
 
-  public String create(ReplicateRefUpdate r) {
+  public synchronized String create(ReplicateRefUpdate r) {
     return new Task(r).create();
   }
 
@@ -134,7 +135,7 @@
     this.disableDeleteForTesting = deleteDisabled;
   }
 
-  public boolean start(PushOne push) {
+  public synchronized boolean start(PushOne push) {
     UriLock lock = new UriLock(push);
     if (!lock.acquire()) {
       return false;
@@ -151,7 +152,7 @@
     return started;
   }
 
-  public void reset(PushOne push) {
+  public synchronized void reset(PushOne push) {
     UriLock lock = new UriLock(push);
     for (String ref : push.getRefs()) {
       new Task(lock, ref).reset();
@@ -159,7 +160,7 @@
     lock.release();
   }
 
-  public void resetAll() {
+  public synchronized void resetAll() {
     try (DirectoryStream<Path> dirs = Files.newDirectoryStream(createDir(runningUpdates))) {
       for (Path dir : dirs) {
         UriLock lock = null;
@@ -190,20 +191,25 @@
     lock.release();
   }
 
-  public List<ReplicateRefUpdate> listWaiting() {
+  public synchronized List<ReplicateRefUpdate> listWaiting() {
     return list(createDir(waitingUpdates));
   }
 
   @VisibleForTesting
-  public List<ReplicateRefUpdate> listRunning() {
+  public synchronized List<ReplicateRefUpdate> listRunning() {
     return list(createDir(runningUpdates));
   }
 
   @VisibleForTesting
-  public List<ReplicateRefUpdate> listBuilding() {
+  public synchronized List<ReplicateRefUpdate> listBuilding() {
     return list(createDir(buildingUpdates));
   }
 
+  @VisibleForTesting
+  public synchronized List<ReplicateRefUpdate> list() {
+    return list(createDir(refUpdates));
+  }
+
   private List<ReplicateRefUpdate> list(Path tasks) {
     List<ReplicateRefUpdate> results = new ArrayList<>();
     try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) {
@@ -211,6 +217,8 @@
         if (Files.isRegularFile(e)) {
           String json = new String(Files.readAllBytes(e), UTF_8);
           results.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+        } else if (Files.isDirectory(e)) {
+          results.addAll(list(e));
         }
       }
     } catch (IOException e) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 41f93b3..9cf5489 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -42,7 +42,6 @@
 import java.util.Optional;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
-import java.util.stream.Stream;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.Repository;
@@ -400,18 +399,23 @@
 
   private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
     Pattern refmaskPattern = Pattern.compile(refRegex);
-    return Stream.concat(
-            tasksStorage.listWaiting().stream(),
-            Stream.concat(
-                tasksStorage.listBuilding().stream(), tasksStorage.listRunning().stream()))
+    return tasksStorage.list().stream()
         .filter(task -> refmaskPattern.matcher(task.ref).matches())
         .collect(toList());
   }
 
-  private void cleanupReplicationTasks() throws IOException {
-    try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) {
+  public void cleanupReplicationTasks() throws IOException {
+    cleanupReplicationTasks(storagePath);
+  }
+
+  private void cleanupReplicationTasks(Path basePath) throws IOException {
+    try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) {
       for (Path path : files) {
-        path.toFile().delete();
+        if (Files.isDirectory(path)) {
+          cleanupReplicationTasks(path);
+        } else {
+          path.toFile().delete();
+        }
       }
     }
   }