ReplicationIT: fix flakiness

Fix the flakiness when checking for replication tasks stored
events on the filesystem and when replicating new projects.

Make use of the ReplicationTasksStorage for listing events
instead of listing the filesystem and allow disabling the deletion
so that the tests can have the time to verify the files existence.

Change-Id: Idfba7d177faed3c3ff45507b6f5b6aadb0bc5dd0
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index a8d075d..64397f9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -16,6 +16,7 @@
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.flogger.FluentLogger;
 import com.google.common.hash.Hashing;
 import com.google.gson.Gson;
@@ -35,6 +36,8 @@
 public class ReplicationTasksStorage {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
+  private boolean disableDeleteForTesting;
+
   public static class ReplicateRefUpdate {
     public final String project;
     public final String ref;
@@ -81,11 +84,21 @@
     return eventKey;
   }
 
+  @VisibleForTesting
+  public void disableDeleteForTesting(boolean deleteDisabled) {
+    this.disableDeleteForTesting = deleteDisabled;
+  }
+
   public void delete(ReplicateRefUpdate r) {
     String taskJson = GSON.toJson(r) + "\n";
     String taskKey = sha1(taskJson).name();
     Path file = refUpdates().resolve(taskKey);
 
+    if (disableDeleteForTesting) {
+      logger.atFine().log("DELETE %s (%s:%s => %s) DISABLED", file, r.project, r.ref, r.uri);
+      return;
+    }
+
     try {
       logger.atFine().log("DELETE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
       Files.delete(file);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 23148f8..f858350 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -27,18 +27,14 @@
 import com.google.gerrit.extensions.common.ProjectInfo;
 import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.config.SitePaths;
-import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Key;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
@@ -67,7 +63,7 @@
   private Path gitPath;
   private Path storagePath;
   private FileBasedConfig config;
-  private Gson GSON = new Gson();
+  private ReplicationTasksStorage tasksStorage;
 
   @Override
   public void setUpTestPlugin() throws Exception {
@@ -85,19 +81,21 @@
 
     pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
     storagePath = pluginDataDir.resolve("ref-updates");
+    tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+    cleanupReplicationTasks();
+    tasksStorage.disableDeleteForTesting(true);
   }
 
   @Test
   public void shouldReplicateNewProject() throws Exception {
     setReplicationDestination("foo", "replica", ALL_PROJECTS);
     reloadConfig();
-    waitForEmptyTasks();
 
     Project.NameKey sourceProject = createProject("foo");
 
     assertThat(listReplicationTasks("refs/meta/config")).hasSize(1);
 
-    waitUntil(() -> gitPath.resolve(sourceProject + "replica.git").toFile().isDirectory());
+    waitUntil(() -> projectExists(new Project.NameKey(sourceProject + "replica.git")));
 
     ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get();
     assertThat(replicaProject).isNotNull();
@@ -109,7 +107,6 @@
 
     setReplicationDestination("foo", "replica", ALL_PROJECTS);
     reloadConfig();
-    waitForEmptyTasks();
 
     Result pushResult = createChange();
     RevCommit sourceCommit = pushResult.getCommit();
@@ -130,7 +127,6 @@
   public void shouldReplicateNewBranch() throws Exception {
     setReplicationDestination("foo", "replica", ALL_PROJECTS);
     reloadConfig();
-    waitForEmptyTasks();
 
     Project.NameKey targetProject = createProject("projectreplica");
     String newBranch = "refs/heads/mybranch";
@@ -160,7 +156,6 @@
     setReplicationDestination("foo1", "replica1", ALL_PROJECTS);
     setReplicationDestination("foo2", "replica2", ALL_PROJECTS);
     reloadConfig();
-    waitForEmptyTasks();
 
     Result pushResult = createChange();
     RevCommit sourceCommit = pushResult.getCommit();
@@ -192,12 +187,16 @@
 
     setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
     setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+    config.setInt("remote", "foo1", "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    config.setInt("remote", "foo2", "replicationDelay", TEST_REPLICATION_DELAY * 100);
     reloadConfig();
-    waitForEmptyTasks();
 
     createChange();
 
     assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+
+    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
   }
 
   @Test
@@ -262,39 +261,26 @@
     plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).forceReload();
   }
 
-  private void waitForEmptyTasks() throws InterruptedException {
-    waitUntil(
-        () -> {
-          try {
-            return listReplicationTasks(".*").size() == 0;
-          } catch (Exception e) {
-            logger.atSevere().withCause(e).log("Failed to list replication tasks");
-            throw new IllegalStateException(e);
-          }
-        });
+  private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
+    Pattern refmaskPattern = Pattern.compile(refRegex);
+    return tasksStorage.list().stream()
+        .filter(task -> refmaskPattern.matcher(task.ref).matches())
+        .collect(toList());
   }
 
-  private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) throws IOException {
-    Pattern refmaskPattern = Pattern.compile(refRegex);
-    List<ReplicateRefUpdate> tasks = new ArrayList<>();
+  private void cleanupReplicationTasks() throws IOException {
     try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) {
       for (Path path : files) {
-        ReplicateRefUpdate task = readTask(path);
-        if (refmaskPattern.matcher(task.ref).matches()) {
-          tasks.add(readTask(path));
-        }
+        path.toFile().delete();
       }
     }
-
-    return tasks;
   }
 
-  private ReplicateRefUpdate readTask(Path file) {
-    try (BufferedReader reader = Files.newBufferedReader(file, StandardCharsets.UTF_8)) {
-      return GSON.fromJson(reader, ReplicateRefUpdate.class);
+  private boolean projectExists(Project.NameKey name) {
+    try (Repository r = repoManager.openRepository(name)) {
+      return true;
     } catch (Exception e) {
-      logger.atSevere().withCause(e).log("failed to read replication task %s", file);
-      throw new IllegalStateException(e);
+      return false;
     }
   }
 }