Merge "ReplicationTasksStorage: Add multi-primary unit tests" into stable-3.1
diff --git a/BUILD b/BUILD
index 50615d8..72a3fc8 100644
--- a/BUILD
+++ b/BUILD
@@ -23,6 +23,18 @@
     name = "replication_tests",
     srcs = glob([
         "src/test/java/**/*Test.java",
+    ]),
+    tags = ["replication"],
+    visibility = ["//visibility:public"],
+    deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
+        ":replication__plugin",
+        ":replication_util",
+    ],
+)
+
+junit_tests(
+    name = "replication_it",
+    srcs = glob([
         "src/test/java/**/*IT.java",
     ]),
     tags = ["replication"],
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 6a89e80..0a34d11 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -81,7 +81,9 @@
       destinations.get().startup(workQueue);
       running = true;
       replicationTasksStorage.resetAll();
-      firePendingEvents();
+      Thread t = new Thread(this::firePendingEvents, "firePendingEvents");
+      t.setDaemon(true);
+      t.start();
       fireBeforeStartupEvents();
     }
   }
@@ -198,6 +200,8 @@
           repLog.error("Encountered malformed URI for persisted event %s", t);
         }
       }
+    } catch (Throwable e) {
+      repLog.error("Unexpected error while firing pending events", e);
     } finally {
       replaying = false;
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 048767f..ead218b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -60,8 +60,6 @@
 public class ReplicationTasksStorage {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
-  private boolean disableDeleteForTesting;
-
   public static class ReplicateRefUpdate {
     public final String project;
     public final String ref;
@@ -103,11 +101,6 @@
     return new Task(r).create();
   }
 
-  @VisibleForTesting
-  public void disableDeleteForTesting(boolean deleteDisabled) {
-    this.disableDeleteForTesting = deleteDisabled;
-  }
-
   public synchronized void start(UriUpdates uriUpdates) {
     for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
       new Task(update).start();
@@ -132,11 +125,11 @@
     }
   }
 
-  public synchronized List<ReplicateRefUpdate> listWaiting() {
+  public List<ReplicateRefUpdate> listWaiting() {
     return list(createDir(waitingUpdates));
   }
 
-  public synchronized List<ReplicateRefUpdate> listRunning() {
+  public List<ReplicateRefUpdate> listRunning() {
     return list(createDir(runningUpdates));
   }
 
@@ -226,11 +219,6 @@
     }
 
     public void finish() {
-      if (disableDeleteForTesting) {
-        logger.atFine().log("DELETE %s %s DISABLED", running, updateLog());
-        return;
-      }
-
       try {
         logger.atFine().log("DELETE %s %s", running, updateLog());
         Files.delete(running);
diff --git a/src/main/resources/Documentation/cmd-start.md b/src/main/resources/Documentation/cmd-start.md
index 8291421..e12ec92 100644
--- a/src/main/resources/Documentation/cmd-start.md
+++ b/src/main/resources/Documentation/cmd-start.md
@@ -11,8 +11,7 @@
 ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start
   [--now]
   [--wait]
-  [--url <PATTERN>]
-  {--all | <PROJECT PATTERN> ...}
+  {--url <PATTERN> | [--url <PATTERN>] --all | [--url <PATTERN>] <PROJECT PATTERN> ...}
 ```
 
 DESCRIPTION
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
index f06b933..c32a55d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -86,7 +86,6 @@
     storagePath = pluginDataDir.resolve("ref-updates");
     tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
     cleanupReplicationTasks();
-    tasksStorage.disableDeleteForTesting(true);
   }
 
   @After
@@ -156,13 +155,13 @@
   @Test
   public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
     List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
-    createTestProject(project + "replica1");
-    createTestProject(project + "replica2");
 
-    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
-    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
-    config.setInt("remote", "foo1", "replicationDelay", TEST_REPLICATION_DELAY * 100);
-    config.setInt("remote", "foo2", "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    FileBasedConfig dest1 = setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+    FileBasedConfig dest2 = setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+    dest1.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    dest2.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    dest1.save();
+    dest2.save();
     reloadConfig();
 
     createChange();
@@ -191,7 +190,7 @@
     setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project);
   }
 
-  private void setReplicationDestination(
+  private FileBasedConfig setReplicationDestination(
       String remoteName, List<String> replicaSuffixes, Optional<String> allProjects)
       throws IOException {
     FileBasedConfig remoteConfig =
@@ -200,6 +199,7 @@
             FS.DETECTED);
 
     setReplicationDestination(remoteConfig, replicaSuffixes, allProjects);
+    return remoteConfig;
   }
 
   private void setAutoReload() throws IOException {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index bfabc9f..3c6ab0c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -65,9 +65,14 @@
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final int TEST_REPLICATION_DELAY = 1;
   private static final int TEST_REPLICATION_RETRY = 1;
+  private static final int TEST_PROJECT_CREATION_SECONDS = 10;
   private static final Duration TEST_TIMEOUT =
       Duration.ofSeconds((TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) + 1);
 
+  private static final Duration TEST_NEW_PROJECT_TIMEOUT =
+      Duration.ofSeconds(
+          (TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) + TEST_PROJECT_CREATION_SECONDS);
+
   @Inject private SitePaths sitePaths;
   @Inject private ProjectOperations projectOperations;
   @Inject private DynamicSet<ProjectDeletedListener> deletedListeners;
@@ -95,7 +100,6 @@
     storagePath = pluginDataDir.resolve("ref-updates");
     tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
     cleanupReplicationTasks();
-    tasksStorage.disableDeleteForTesting(true);
   }
 
   @Test
@@ -105,9 +109,9 @@
 
     Project.NameKey sourceProject = createTestProject("foo");
 
-    assertThat(listIncompleteTasks("refs/meta/config")).hasSize(1);
-
-    waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")));
+    WaitUtil.waitUntil(
+        () -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")),
+        TEST_NEW_PROJECT_TIMEOUT);
 
     ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get();
     assertThat(replicaProject).isNotNull();
@@ -152,8 +156,6 @@
     RevCommit sourceCommit = pushResult.getCommit();
     String sourceRef = pushResult.getPatchSet().refName();
 
-    assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1);
-
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
 
@@ -175,8 +177,6 @@
     input.revision = master;
     gApi.projects().name(project.get()).branch(newBranch).create(input);
 
-    assertThat(listIncompleteTasks("refs/heads/(mybranch|master)")).hasSize(2);
-
     try (Repository repo = repoManager.openRepository(targetProject);
         Repository sourceRepo = repoManager.openRepository(project)) {
       waitUntil(() -> checkedGetRef(repo, newBranch) != null);
@@ -201,8 +201,6 @@
     RevCommit sourceCommit = pushResult.getCommit();
     String sourceRef = pushResult.getPatchSet().refName();
 
-    assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
-
     try (Repository repo1 = repoManager.openRepository(targetProject1);
         Repository repo2 = repoManager.openRepository(targetProject2)) {
       waitUntil(
@@ -225,10 +223,12 @@
     createTestProject(project + "replica1");
     createTestProject(project + "replica2");
 
-    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
-    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
-    config.setInt("remote", "foo1", "replicationDelay", TEST_REPLICATION_DELAY * 100);
-    config.setInt("remote", "foo2", "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    FileBasedConfig dest1 = setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+    FileBasedConfig dest2 = setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+    dest1.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    dest2.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
+    dest1.save();
+    dest2.save();
     reloadConfig();
 
     createChange();
@@ -251,7 +251,7 @@
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, null, new ReplicationState(NO_OP), true);
 
-    assertThat(listIncompleteTasks(".*all.*")).hasSize(1);
+    assertThat(listIncompleteTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
   }
 
   @Test
@@ -269,10 +269,8 @@
         .getInstance(ReplicationQueue.class)
         .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), true);
 
-    assertThat(listIncompleteTasks(".*all.*")).hasSize(1);
-    for (ReplicationTasksStorage.ReplicateRefUpdate task : listIncompleteTasks()) {
-      assertThat(task.uri).isEqualTo(expectedURI);
-    }
+    assertThat(listIncompleteTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
+    streamIncompleteTasks().forEach((task) -> assertThat(task.uri).isEqualTo(expectedURI));
   }
 
   @Test
@@ -291,9 +289,7 @@
         .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), true);
 
     assertThat(listIncompleteTasks()).hasSize(1);
-    for (ReplicationTasksStorage.ReplicateRefUpdate task : listIncompleteTasks()) {
-      assertThat(task.uri).isEqualTo(expectedURI);
-    }
+    streamIncompleteTasks().forEach((task) -> assertThat(task.uri).isEqualTo(expectedURI));
   }
 
   @Test
@@ -329,8 +325,6 @@
   }
 
   private void replicateBranchDeletion(boolean mirror) throws Exception {
-    tasksStorage.disableDeleteForTesting(false);
-
     setReplicationDestination("foo", "replica", ALL_PROJECTS, mirror);
     reloadConfig();
 
@@ -436,7 +430,6 @@
 
   @Test
   public void shouldCleanupTasksAfterNewProjectReplication() throws Exception {
-    tasksStorage.disableDeleteForTesting(false);
     setReplicationDestination("task_cleanup_project", "replica", ALL_PROJECTS);
     config.setInt("remote", "task_cleanup_project", "replicationRetry", 0);
     config.save();
@@ -462,7 +455,6 @@
 
     String changeRef = createChange().getPatchSet().refName();
 
-    tasksStorage.disableDeleteForTesting(false);
     changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
         .forEach(
             (update) -> {
@@ -473,7 +465,6 @@
               } catch (URISyntaxException e) {
               }
             });
-    tasksStorage.disableDeleteForTesting(true);
 
     setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
     setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
@@ -534,13 +525,14 @@
         remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror);
   }
 
-  private void setReplicationDestination(
+  private FileBasedConfig setReplicationDestination(
       String remoteName, List<String> replicaSuffixes, Optional<String> project)
       throws IOException {
-    setReplicationDestination(remoteName, replicaSuffixes, project, TEST_REPLICATION_DELAY, false);
+    return setReplicationDestination(
+        remoteName, replicaSuffixes, project, TEST_REPLICATION_DELAY, false);
   }
 
-  private void setReplicationDestination(
+  private FileBasedConfig setReplicationDestination(
       String remoteName,
       List<String> replicaSuffixes,
       Optional<String> project,
@@ -558,6 +550,7 @@
     project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
     config.setBoolean("gerrit", null, "autoReload", true);
     config.save();
+    return config;
   }
 
   private void setProjectDeletionReplication(String remoteName, boolean replicateProjectDeletion)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
index fecbd31..97992c8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -71,6 +71,14 @@
   }
 
   @Test
+  public void canStartWaitingUpdate() throws Exception {
+    storage.create(REF_UPDATE);
+    storage.start(uriUpdates);
+    assertThat(storage.listWaiting()).isEmpty();
+    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+  }
+
+  @Test
   public void canFinishRunningUpdate() throws Exception {
     storage.create(REF_UPDATE);
     storage.start(uriUpdates);
@@ -92,6 +100,8 @@
     storage.start(uriUpdates);
     assertThat(storage.listWaiting()).isEmpty();
     assertThat(persistedView.listWaiting()).isEmpty();
+    assertContainsExactly(storage.listRunning(), REF_UPDATE);
+    assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
 
     storage.finish(uriUpdates);
     assertThat(storage.listRunning()).isEmpty();