Add tests for asynchronous fetch replication Make sure that ref/batch-ref updates are tested in asynchronous fetch replication as well, managing also the situation where replication tasks are retrying and update/delete need to be applied in the correct order. Test timeout is set to a maximum of 2-times the replication delay, rather than 2k-times which was leading to tests starvation after over 2h (4000 sec) of waiting for a condition to be completed. Change-Id: I0984ac1af805ac79b3c4dd5e96cf122f671ba3b6
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java index 86a44a1..c69b45a 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -222,13 +222,26 @@ fetchAllRefs = true; repLog.trace("[{}] Added all refs for replication from {}", taskIdHex, uri); } else if (!fetchAllRefs) { + // The ref-spec could be already present as either + // update or delete, therefore a simple addition to the + // set may risk to have both update and delete for the same + // ref, which would be an issue: the delta is an unordered set + // and executing updates and deletes for the same ref may have + // unexpected results, depending on which one you execute firt. + // + // Delete any previous ref-spec as delete or update for the same + // refname so that the subsequent addition will make sure that + // there is only one operation per refName. + delta.remove(ref); + delta.remove(FetchRefSpec.fromRef(ref.refName())); + delta.add(ref); repLog.trace("[{}] Added ref {} for replication from {}", taskIdHex, ref, uri); } } Set<FetchRefSpec> getRefSpecs() { - return fetchAllRefs ? Set.of(FetchRefSpec.fromRef(ALL_REFS)) : delta; + return fetchAllRefs ? Set.of(FetchRefSpec.fromRef(ALL_REFS)) : Set.copyOf(delta); } Set<String> getRefs() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java index 9168c29..4d30c97 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -656,7 +656,22 @@ pendingFetchOp.canceledByReplication(); pending.remove(uri); + Set<FetchRefSpec> fetchOpRefSpecs = fetchOp.getRefSpecs(); fetchOp.addRefs(pendingFetchOp.getRefSpecs()); + + if (reason == RetryReason.COLLISION) { + // The fetch was never executed and is delayed + // because of a collision with an in-flight replication. + // The pending one was already in the queue and + // therefore is for sure older than fetchOp + // otherwise it would have been also rescheduled + // because of a collision. + // FetchOp has to take precedence over the pending + // operation that is for sure older, therefore its + // initial ref-specs need to be reapplied. + fetchOp.addRefs(fetchOpRefSpecs); + } + fetchOp.addStates(pendingFetchOp.getStates()); pendingFetchOp.removeStates();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/AsyncPullReplicationBatchRefUpdatedIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/AsyncPullReplicationBatchRefUpdatedIT.java new file mode 100644 index 0000000..c5e9b5f --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/AsyncPullReplicationBatchRefUpdatedIT.java
@@ -0,0 +1,45 @@ +// Copyright (C) 2024 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull; + +import com.google.gerrit.acceptance.SkipProjectClone; +import com.google.gerrit.acceptance.TestPlugin; +import com.google.gerrit.acceptance.UseLocalDisk; +import java.util.List; + +@SkipProjectClone +@UseLocalDisk +@TestPlugin( + name = "pull-replication", + sysModule = "com.googlesource.gerrit.plugins.replication.pull.TestPullReplicationModule", + httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule") +public class AsyncPullReplicationBatchRefUpdatedIT extends PullReplicationITBase { + private static final int ASYNC_REPLICATION_DELAY_SEC = 2; + + @Override + protected List<String> syncRefs() { + return List.of("NONE"); + } + + @Override + protected int replicationDelaySec() { + return ASYNC_REPLICATION_DELAY_SEC; + } + + @Override + protected boolean useBatchRefUpdateEvent() { + return true; + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/AsyncPullReplicationRefUpdatedIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/AsyncPullReplicationRefUpdatedIT.java new file mode 100644 index 0000000..3e481e2 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/AsyncPullReplicationRefUpdatedIT.java
@@ -0,0 +1,45 @@ +// Copyright (C) 2024 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull; + +import com.google.gerrit.acceptance.SkipProjectClone; +import com.google.gerrit.acceptance.TestPlugin; +import com.google.gerrit.acceptance.UseLocalDisk; +import java.util.List; + +@SkipProjectClone +@UseLocalDisk +@TestPlugin( + name = "pull-replication", + sysModule = "com.googlesource.gerrit.plugins.replication.pull.TestPullReplicationModule", + httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule") +public class AsyncPullReplicationRefUpdatedIT extends PullReplicationITBase { + private static final int ASYNC_REPLICATION_DELAY_SEC = 2; + + @Override + protected List<String> syncRefs() { + return List.of("NONE"); + } + + @Override + protected int replicationDelaySec() { + return ASYNC_REPLICATION_DELAY_SEC; + } + + @Override + protected boolean useBatchRefUpdateEvent() { + return false; + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java index e1010d9..84c8525 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java
@@ -39,6 +39,7 @@ import com.googlesource.gerrit.plugins.replication.api.ApiModule; import com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -61,6 +62,10 @@ private static final int TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC = 5; private static final String TEST_PLUGIN_NAME = "pull-replication"; + protected Duration testTimeoutDuration() { + return Duration.ofSeconds(TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC * 2); + } + public static class PullReplicationHealthCheckTestModule extends AbstractModule { private final PullReplicationModule pullReplicationModule; @@ -151,7 +156,7 @@ config.setString("remote", remoteName, "apiUrl", adminRestSession.url()); config.setString("remote", remoteName, "fetch", "+refs/*:refs/*"); config.setInt("remote", remoteName, "timeout", 600); - config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY); + config.setInt("remote", remoteName, "replicationDelay", replicationDelaySec()); project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj)); config.setBoolean("gerrit", null, "autoReload", true); config.setInt("replication", null, "maxApiPayloadSize", 1);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java index e3a5a28..30bff1b 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
@@ -149,7 +149,7 @@ config.setString("remote", remoteName, "apiUrl", adminRestSession.url()); config.setString("remote", remoteName, "fetch", "+refs/*:refs/*"); config.setInt("remote", remoteName, "timeout", 600); - config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY); + config.setInt("remote", remoteName, "replicationDelay", replicationDelaySec()); project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj)); config.setBoolean("gerrit", null, "autoReload", true); config.save();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java index f3381e6..a6058d3 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java
@@ -20,6 +20,7 @@ import static com.google.gerrit.acceptance.testsuite.project.TestProjectUpdate.allow; import static com.google.gerrit.server.group.SystemGroupBackend.REGISTERED_USERS; +import com.google.gerrit.acceptance.PushOneCommit; import com.google.gerrit.acceptance.PushOneCommit.Result; import com.google.gerrit.acceptance.UseLocalDisk; import com.google.gerrit.acceptance.config.GerritConfig; @@ -66,12 +67,17 @@ config.setString("remote", remoteName, "fetch", "+refs/*:refs/*"); config.setInt("remote", remoteName, "timeout", 600); config.setBoolean("remote", remoteName, "mirror", true); - config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY); + config.setInt("remote", remoteName, "replicationDelay", replicationDelaySec()); project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj)); config.setBoolean("gerrit", null, "autoReload", true); + config.setStringList("replication", null, "syncRefs", syncRefs()); config.save(); } + protected List<String> syncRefs() { + return List.of(); + } + @Override public void setUpTestPlugin() throws Exception { setUpTestPlugin(false); @@ -173,6 +179,39 @@ } @Test + @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE) + public void shouldReplicateBranchUpdatedAndThenDeleted() throws Exception { + String testProjectName = project + TEST_REPLICATION_SUFFIX; + createTestProject(testProjectName); + + String master = "refs/heads/master"; + String initial = gApi.projects().name(testProjectName).branch(master).get().revision; + testRepo = cloneProject(NameKey.parse(testProjectName)); + PushOneCommit push = + pushFactory.create(admin.newIdent(), testRepo, "change1", "a.txt", "content"); + PushOneCommit.Result result = push.to("refs/heads/master"); + result.assertOkStatus(); + + String updated = gApi.projects().name(testProjectName).branch(master).get().revision; + + ReplicationQueue pullReplicationQueue = + plugin.getSysInjector().getInstance(ReplicationQueue.class); + ProjectEvent event = + generateUpdateEvent(project, master, initial, updated, TEST_REPLICATION_REMOTE); + pullReplicationQueue.onEvent(event); + + gApi.projects().name(testProjectName).branch(master).delete(); + ProjectEvent deleteEvent = + generateUpdateEvent( + project, master, updated, ObjectId.zeroId().getName(), TEST_REPLICATION_REMOTE); + pullReplicationQueue.onEvent(deleteEvent); + + try (Repository repo = repoManager.openRepository(project)) { + waitUntil(() -> checkedGetRef(repo, master) == null); + } + } + + @Test @UseLocalDisk @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE) public void shouldReplicateForceUpdatedBranch() throws Exception {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java index ed71545..fe5f17b 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
@@ -53,8 +53,6 @@ protected static final Optional<String> ALL_PROJECTS = Optional.empty(); protected static final FluentLogger logger = FluentLogger.forEnclosingClass(); - protected static final int TEST_REPLICATION_DELAY = 1; - protected static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2000); protected static final String TEST_REPLICATION_SUFFIX = "suffix1"; protected static final String TEST_REPLICATION_REMOTE = "remote1"; @Inject protected SitePaths sitePaths; @@ -64,6 +62,14 @@ protected FileBasedConfig config; protected FileBasedConfig secureConfig; + protected int replicationDelaySec() { + return 1; + } + + protected Duration testTimeoutDuration() { + return Duration.ofSeconds(replicationDelaySec() * 2L); + } + protected abstract boolean useBatchRefUpdateEvent(); protected ProjectEvent generateUpdateEvent( @@ -134,7 +140,7 @@ } protected void waitUntil(Supplier<Boolean> waitCondition) throws Exception { - WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT); + WaitUtil.waitUntil(waitCondition, testTimeoutDuration()); } protected <T> T getInstance(Class<T> classObj) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java index d0c25e4..c9ef1b3 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java
@@ -50,7 +50,7 @@ config.setString("remote", remoteName, "apiUrl", adminRestSession.url()); config.setString("remote", remoteName, "fetch", "+refs/*:refs/*"); config.setInt("remote", remoteName, "timeout", 600); - config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY); + config.setInt("remote", remoteName, "replicationDelay", replicationDelaySec()); project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj)); config.setBoolean("gerrit", null, "autoReload", true); config.save();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java index f51376d..8aedb52 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java
@@ -79,7 +79,7 @@ assertThat(targetChangeRef.getObjectId()).isEqualTo(changeCommit.getId()); // ensure that previous fetch was finished - Thread.sleep(Duration.ofSeconds(TEST_REPLICATION_DELAY).toMillis()); + Thread.sleep(Duration.ofSeconds(replicationDelaySec()).toMillis()); Ref sourceNewRef = createNewRef();