Add tests for asynchronous fetch replication

Make sure that ref/batch-ref updates are tested
in asynchronous fetch replication as well, managing also
the situation where replication tasks are retrying and
update/delete need to be applied in the correct order.
Test timeout is set to a maximum of 2-times the replication
delay, rather than 2k-times which was leading to
tests starvation after over 2h (4000 sec) of waiting for a
condition to be completed.

Change-Id: I0984ac1af805ac79b3c4dd5e96cf122f671ba3b6
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index 86a44a1..c69b45a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -222,13 +222,26 @@
       fetchAllRefs = true;
       repLog.trace("[{}] Added all refs for replication from {}", taskIdHex, uri);
     } else if (!fetchAllRefs) {
+      // The ref-spec could be already present as either
+      // update or delete, therefore a simple addition to the
+      // set may risk to have both update and delete for the same
+      // ref, which would be an issue: the delta is an unordered set
+      // and executing updates and deletes for the same ref may have
+      // unexpected results, depending on which one you execute firt.
+      //
+      // Delete any previous ref-spec as delete or update for the same
+      // refname so that the subsequent addition will make sure that
+      // there is only one operation per refName.
+      delta.remove(ref);
+      delta.remove(FetchRefSpec.fromRef(ref.refName()));
+
       delta.add(ref);
       repLog.trace("[{}] Added ref {} for replication from {}", taskIdHex, ref, uri);
     }
   }
 
   Set<FetchRefSpec> getRefSpecs() {
-    return fetchAllRefs ? Set.of(FetchRefSpec.fromRef(ALL_REFS)) : delta;
+    return fetchAllRefs ? Set.of(FetchRefSpec.fromRef(ALL_REFS)) : Set.copyOf(delta);
   }
 
   Set<String> getRefs() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 9168c29..4d30c97 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -656,7 +656,22 @@
           pendingFetchOp.canceledByReplication();
           pending.remove(uri);
 
+          Set<FetchRefSpec> fetchOpRefSpecs = fetchOp.getRefSpecs();
           fetchOp.addRefs(pendingFetchOp.getRefSpecs());
+
+          if (reason == RetryReason.COLLISION) {
+            // The fetch was never executed and is delayed
+            // because of a collision with an in-flight replication.
+            // The pending one was already in the queue and
+            // therefore is for sure older than fetchOp
+            // otherwise it would have been also rescheduled
+            // because of a collision.
+            // FetchOp has to take precedence over the pending
+            // operation that is for sure older, therefore its
+            // initial ref-specs need to be reapplied.
+            fetchOp.addRefs(fetchOpRefSpecs);
+          }
+
           fetchOp.addStates(pendingFetchOp.getStates());
           pendingFetchOp.removeStates();
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/AsyncPullReplicationBatchRefUpdatedIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/AsyncPullReplicationBatchRefUpdatedIT.java
new file mode 100644
index 0000000..c5e9b5f
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/AsyncPullReplicationBatchRefUpdatedIT.java
@@ -0,0 +1,45 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import java.util.List;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.TestPullReplicationModule",
+    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class AsyncPullReplicationBatchRefUpdatedIT extends PullReplicationITBase {
+  private static final int ASYNC_REPLICATION_DELAY_SEC = 2;
+
+  @Override
+  protected List<String> syncRefs() {
+    return List.of("NONE");
+  }
+
+  @Override
+  protected int replicationDelaySec() {
+    return ASYNC_REPLICATION_DELAY_SEC;
+  }
+
+  @Override
+  protected boolean useBatchRefUpdateEvent() {
+    return true;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/AsyncPullReplicationRefUpdatedIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/AsyncPullReplicationRefUpdatedIT.java
new file mode 100644
index 0000000..3e481e2
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/AsyncPullReplicationRefUpdatedIT.java
@@ -0,0 +1,45 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import java.util.List;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.TestPullReplicationModule",
+    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class AsyncPullReplicationRefUpdatedIT extends PullReplicationITBase {
+  private static final int ASYNC_REPLICATION_DELAY_SEC = 2;
+
+  @Override
+  protected List<String> syncRefs() {
+    return List.of("NONE");
+  }
+
+  @Override
+  protected int replicationDelaySec() {
+    return ASYNC_REPLICATION_DELAY_SEC;
+  }
+
+  @Override
+  protected boolean useBatchRefUpdateEvent() {
+    return false;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java
index e1010d9..84c8525 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java
@@ -39,6 +39,7 @@
 import com.googlesource.gerrit.plugins.replication.api.ApiModule;
 import com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -61,6 +62,10 @@
   private static final int TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC = 5;
   private static final String TEST_PLUGIN_NAME = "pull-replication";
 
+  protected Duration testTimeoutDuration() {
+    return Duration.ofSeconds(TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC * 2);
+  }
+
   public static class PullReplicationHealthCheckTestModule extends AbstractModule {
     private final PullReplicationModule pullReplicationModule;
 
@@ -151,7 +156,7 @@
     config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
     config.setString("remote", remoteName, "fetch", "+refs/*:refs/*");
     config.setInt("remote", remoteName, "timeout", 600);
-    config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+    config.setInt("remote", remoteName, "replicationDelay", replicationDelaySec());
     project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
     config.setBoolean("gerrit", null, "autoReload", true);
     config.setInt("replication", null, "maxApiPayloadSize", 1);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
index e3a5a28..30bff1b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
@@ -149,7 +149,7 @@
     config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
     config.setString("remote", remoteName, "fetch", "+refs/*:refs/*");
     config.setInt("remote", remoteName, "timeout", 600);
-    config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+    config.setInt("remote", remoteName, "replicationDelay", replicationDelaySec());
     project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
     config.setBoolean("gerrit", null, "autoReload", true);
     config.save();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java
index f3381e6..a6058d3 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java
@@ -20,6 +20,7 @@
 import static com.google.gerrit.acceptance.testsuite.project.TestProjectUpdate.allow;
 import static com.google.gerrit.server.group.SystemGroupBackend.REGISTERED_USERS;
 
+import com.google.gerrit.acceptance.PushOneCommit;
 import com.google.gerrit.acceptance.PushOneCommit.Result;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.config.GerritConfig;
@@ -66,12 +67,17 @@
     config.setString("remote", remoteName, "fetch", "+refs/*:refs/*");
     config.setInt("remote", remoteName, "timeout", 600);
     config.setBoolean("remote", remoteName, "mirror", true);
-    config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+    config.setInt("remote", remoteName, "replicationDelay", replicationDelaySec());
     project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
     config.setBoolean("gerrit", null, "autoReload", true);
+    config.setStringList("replication", null, "syncRefs", syncRefs());
     config.save();
   }
 
+  protected List<String> syncRefs() {
+    return List.of();
+  }
+
   @Override
   public void setUpTestPlugin() throws Exception {
     setUpTestPlugin(false);
@@ -173,6 +179,39 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
+  public void shouldReplicateBranchUpdatedAndThenDeleted() throws Exception {
+    String testProjectName = project + TEST_REPLICATION_SUFFIX;
+    createTestProject(testProjectName);
+
+    String master = "refs/heads/master";
+    String initial = gApi.projects().name(testProjectName).branch(master).get().revision;
+    testRepo = cloneProject(NameKey.parse(testProjectName));
+    PushOneCommit push =
+        pushFactory.create(admin.newIdent(), testRepo, "change1", "a.txt", "content");
+    PushOneCommit.Result result = push.to("refs/heads/master");
+    result.assertOkStatus();
+
+    String updated = gApi.projects().name(testProjectName).branch(master).get().revision;
+
+    ReplicationQueue pullReplicationQueue =
+        plugin.getSysInjector().getInstance(ReplicationQueue.class);
+    ProjectEvent event =
+        generateUpdateEvent(project, master, initial, updated, TEST_REPLICATION_REMOTE);
+    pullReplicationQueue.onEvent(event);
+
+    gApi.projects().name(testProjectName).branch(master).delete();
+    ProjectEvent deleteEvent =
+        generateUpdateEvent(
+            project, master, updated, ObjectId.zeroId().getName(), TEST_REPLICATION_REMOTE);
+    pullReplicationQueue.onEvent(deleteEvent);
+
+    try (Repository repo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, master) == null);
+    }
+  }
+
+  @Test
   @UseLocalDisk
   @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateForceUpdatedBranch() throws Exception {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
index ed71545..fe5f17b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
@@ -53,8 +53,6 @@
 
   protected static final Optional<String> ALL_PROJECTS = Optional.empty();
   protected static final FluentLogger logger = FluentLogger.forEnclosingClass();
-  protected static final int TEST_REPLICATION_DELAY = 1;
-  protected static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2000);
   protected static final String TEST_REPLICATION_SUFFIX = "suffix1";
   protected static final String TEST_REPLICATION_REMOTE = "remote1";
   @Inject protected SitePaths sitePaths;
@@ -64,6 +62,14 @@
   protected FileBasedConfig config;
   protected FileBasedConfig secureConfig;
 
+  protected int replicationDelaySec() {
+    return 1;
+  }
+
+  protected Duration testTimeoutDuration() {
+    return Duration.ofSeconds(replicationDelaySec() * 2L);
+  }
+
   protected abstract boolean useBatchRefUpdateEvent();
 
   protected ProjectEvent generateUpdateEvent(
@@ -134,7 +140,7 @@
   }
 
   protected void waitUntil(Supplier<Boolean> waitCondition) throws Exception {
-    WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
+    WaitUtil.waitUntil(waitCondition, testTimeoutDuration());
   }
 
   protected <T> T getInstance(Class<T> classObj) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java
index d0c25e4..c9ef1b3 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java
@@ -50,7 +50,7 @@
     config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
     config.setString("remote", remoteName, "fetch", "+refs/*:refs/*");
     config.setInt("remote", remoteName, "timeout", 600);
-    config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+    config.setInt("remote", remoteName, "replicationDelay", replicationDelaySec());
     project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
     config.setBoolean("gerrit", null, "autoReload", true);
     config.save();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java
index f51376d..8aedb52 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java
@@ -79,7 +79,7 @@
       assertThat(targetChangeRef.getObjectId()).isEqualTo(changeCommit.getId());
 
       // ensure that previous fetch was finished
-      Thread.sleep(Duration.ofSeconds(TEST_REPLICATION_DELAY).toMillis());
+      Thread.sleep(Duration.ofSeconds(replicationDelaySec()).toMillis());
 
       Ref sourceNewRef = createNewRef();