Use `batch-ref-updated` stream event to trigger apply-object/fetch Idd4410543 Change introduced new stream event which contains multiple refs updated together during the BatchUpdate. Use this new event instead of `ref-updated` which contains information about single ref. This allows to control the sequence of apply-objects/fetch operation. Also allows to implement as follow up change single apply-object/fetch operation that contains data about all refs from a single batch update. Bug: Issue 40015567 Change-Id: I82312acfb82f32204c6e422b27234ad216cb36df
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java index 13484b1..bcbd73d 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -26,6 +26,9 @@ import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.metrics.Timer1.Context; import com.google.gerrit.server.config.GerritInstanceId; +import com.google.gerrit.server.config.GerritServerConfig; +import com.google.gerrit.server.data.RefUpdateAttribute; +import com.google.gerrit.server.events.BatchRefUpdateEvent; import com.google.gerrit.server.events.EventDispatcher; import com.google.gerrit.server.events.EventListener; import com.google.gerrit.server.events.RefUpdatedEvent; @@ -59,6 +62,7 @@ import org.eclipse.jgit.errors.InvalidObjectIdException; import org.eclipse.jgit.errors.MissingObjectException; import org.eclipse.jgit.errors.RepositoryNotFoundException; +import org.eclipse.jgit.lib.Config; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.transport.URIish; import org.slf4j.Logger; @@ -75,6 +79,7 @@ static final Logger repLog = LoggerFactory.getLogger(PULL_REPLICATION_LOG_NAME); private static final Integer DEFAULT_FETCH_CALLS_TIMEOUT = 0; + private static final String BATCH_REF_UPDATED_EVENT_TYPE = BatchRefUpdateEvent.TYPE; private static final String REF_UDPATED_EVENT_TYPE = new RefUpdatedEvent().type; private static final String ZEROS_OBJECTID = ObjectId.zeroId().getName(); private final ReplicationStateListener stateLog; @@ -92,6 +97,7 @@ private final ApplyObjectMetrics applyObjectMetrics; private final FetchReplicationMetrics fetchMetrics; private final String instanceId; + private final boolean useBatchUpdateEvents; private ApplyObjectsRefsFilter applyObjectsRefsFilter; @Inject @@ -106,6 +112,7 @@ ApplyObjectMetrics applyObjectMetrics, FetchReplicationMetrics fetchMetrics, @GerritInstanceId String instanceId, + @GerritServerConfig Config gerritConfig, ApplyObjectsRefsFilter applyObjectsRefsFilter) { workQueue = wq; dispatcher = dis; @@ -118,6 +125,8 @@ this.applyObjectMetrics = applyObjectMetrics; this.fetchMetrics = fetchMetrics; this.instanceId = instanceId; + this.useBatchUpdateEvents = + gerritConfig.getBoolean("event", "stream-events", "enableBatchRefUpdatedEvents", false); this.applyObjectsRefsFilter = applyObjectsRefsFilter; } @@ -158,22 +167,46 @@ @Override public void onEvent(com.google.gerrit.server.events.Event e) { - if (e.type.equals(REF_UDPATED_EVENT_TYPE) && instanceId.equals(e.instanceId)) { + if (!instanceId.equals(e.instanceId)) { + return; + } + + if (useBatchUpdateEvents) { + if (e.type.equals(BATCH_REF_UPDATED_EVENT_TYPE)) { + BatchRefUpdateEvent event = (BatchRefUpdateEvent) e; + + event.refUpdates.get().stream() + .sorted(ReplicationQueue::sortByMetaRefAsLast) + .forEachOrdered( + refUpdateAttribute -> { + if (isRefToBeReplicated(refUpdateAttribute.refName)) { + fireRefUpdate(refUpdateAttribute, e.eventCreatedOn); + } + }); + } + return; + } + + if (e.type.equals(REF_UDPATED_EVENT_TYPE)) { RefUpdatedEvent event = (RefUpdatedEvent) e; if (isRefToBeReplicated(event.getRefName())) { - repLog.info( - "Ref event received: {} on project {}:{} - {} => {}", - refUpdateType(event), - event.refUpdate.get().project, - event.getRefName(), - event.refUpdate.get().oldRev, - event.refUpdate.get().newRev); - fire(ReferenceUpdatedEvent.from(event)); + fireRefUpdate(event.refUpdate.get(), event.eventCreatedOn); } } } + private void fireRefUpdate(RefUpdateAttribute refUpdate, long eventCreatedOn) { + repLog.info( + "Ref event received: {} on project {}:{} - {} => {}", + refUpdateType(refUpdate.oldRev, refUpdate.newRev), + refUpdate.project, + refUpdate.refName, + refUpdate.oldRev, + refUpdate.newRev); + fire(ReferenceUpdatedEvent.from(refUpdate, eventCreatedOn)); + } + @Override public void onProjectDeleted(ProjectDeletedListener.Event event) { Project.NameKey project = Project.nameKey(event.getProjectName()); @@ -184,10 +217,16 @@ source.getApis().forEach(apiUrl -> source.scheduleDeleteProject(apiUrl, project))); } - private static String refUpdateType(RefUpdatedEvent event) { - if (ZEROS_OBJECTID.equals(event.refUpdate.get().oldRev)) { + private static int sortByMetaRefAsLast(RefUpdateAttribute a, RefUpdateAttribute b) { + repLog.debug("sortByMetaRefAsLast({} <=> {})", a.refName, b.refName); + return Boolean.compare( + RefNames.isNoteDbMetaRef(a.refName), RefNames.isNoteDbMetaRef(b.refName)); + } + + private static String refUpdateType(String oldRev, String newRev) { + if (ZEROS_OBJECTID.equals(oldRev)) { return "CREATE"; - } else if (ZEROS_OBJECTID.equals(event.refUpdate.get().newRev)) { + } else if (ZEROS_OBJECTID.equals(newRev)) { return "DELETE"; } else { return "UPDATE"; @@ -550,13 +589,13 @@ projectName, refName, objectId, eventCreatedOn, isDelete); } - static ReferenceUpdatedEvent from(RefUpdatedEvent event) { + static ReferenceUpdatedEvent from(RefUpdateAttribute refUpdateAttribute, long eventCreatedOn) { return ReferenceUpdatedEvent.create( - event.refUpdate.get().project, - event.getRefName(), - ObjectId.fromString(event.refUpdate.get().newRev), - event.eventCreatedOn, - ZEROS_OBJECTID.equals(event.refUpdate.get().newRev)); + refUpdateAttribute.project, + refUpdateAttribute.refName, + ObjectId.fromString(refUpdateAttribute.newRev), + eventCreatedOn, + ZEROS_OBJECTID.equals(refUpdateAttribute.newRev)); } public abstract String projectName();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FakeGitReferenceUpdatedEvent.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FakeGitReferenceUpdatedEvent.java deleted file mode 100644 index 1472be2..0000000 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FakeGitReferenceUpdatedEvent.java +++ /dev/null
@@ -1,37 +0,0 @@ -// Copyright (C) 2020 The Android Open Source Project -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.googlesource.gerrit.plugins.replication.pull; - -import com.google.common.base.Suppliers; -import com.google.gerrit.entities.Project; -import com.google.gerrit.server.data.RefUpdateAttribute; -import com.google.gerrit.server.events.RefUpdatedEvent; - -public class FakeGitReferenceUpdatedEvent extends RefUpdatedEvent { - FakeGitReferenceUpdatedEvent( - Project.NameKey project, - String ref, - String oldObjectId, - String newObjectId, - String instanceId) { - RefUpdateAttribute upd = new RefUpdateAttribute(); - upd.newRev = newObjectId; - upd.oldRev = oldObjectId; - upd.project = project.get(); - upd.refName = ref; - this.refUpdate = Suppliers.ofInstance(upd); - this.instanceId = instanceId; - } -}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedAsyncIT.java similarity index 90% copy from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java copy to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedAsyncIT.java index be017d0..139ddec 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedAsyncIT.java
@@ -28,10 +28,15 @@ name = "pull-replication", sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule", httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule") -public class PullReplicationAsyncIT extends PullReplicationITAbstract { +public class PullReplicationBatchRefUpdatedAsyncIT extends PullReplicationITAbstract { @Inject private SitePaths sitePaths; @Override + protected boolean useBatchRefUpdateEvent() { + return true; + } + + @Override public void setUpTestPlugin() throws Exception { FileBasedConfig config = new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedIT.java similarity index 62% copy from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java copy to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedIT.java index be017d0..2e727d1 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedIT.java
@@ -1,4 +1,4 @@ -// Copyright (C) 2022 The Android Open Source Project +// Copyright (C) 2023 The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,10 +17,6 @@ import com.google.gerrit.acceptance.SkipProjectClone; import com.google.gerrit.acceptance.TestPlugin; import com.google.gerrit.acceptance.UseLocalDisk; -import com.google.gerrit.server.config.SitePaths; -import com.google.inject.Inject; -import org.eclipse.jgit.storage.file.FileBasedConfig; -import org.eclipse.jgit.util.FS; @SkipProjectClone @UseLocalDisk @@ -28,16 +24,10 @@ name = "pull-replication", sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule", httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule") -public class PullReplicationAsyncIT extends PullReplicationITAbstract { - @Inject private SitePaths sitePaths; +public class PullReplicationBatchRefUpdatedIT extends PullReplicationITBase { @Override - public void setUpTestPlugin() throws Exception { - FileBasedConfig config = - new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED); - config.setString("replication", null, "syncRefs", "^$"); - config.save(); - - super.setUpTestPlugin(true); + protected boolean useBatchRefUpdateEvent() { + return true; } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBase.java similarity index 74% rename from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java rename to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBase.java index 4ede1ae..4349cde 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBase.java
@@ -17,27 +17,16 @@ import static com.google.common.truth.Truth.assertThat; import static java.util.stream.Collectors.toList; -import com.google.common.flogger.FluentLogger; -import com.google.gerrit.acceptance.LightweightPluginDaemonTest; import com.google.gerrit.acceptance.PushOneCommit.Result; -import com.google.gerrit.acceptance.SkipProjectClone; -import com.google.gerrit.acceptance.TestPlugin; -import com.google.gerrit.acceptance.UseLocalDisk; import com.google.gerrit.acceptance.config.GerritConfig; -import com.google.gerrit.acceptance.testsuite.project.ProjectOperations; -import com.google.gerrit.entities.Project; import com.google.gerrit.extensions.api.projects.BranchInput; -import com.google.gerrit.server.config.SitePaths; -import com.google.inject.Inject; +import com.google.gerrit.server.events.ProjectEvent; import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator; import java.io.IOException; import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Optional; -import java.util.function.Supplier; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.Ref; import org.eclipse.jgit.lib.Repository; @@ -46,33 +35,14 @@ import org.eclipse.jgit.util.FS; import org.junit.Test; -@SkipProjectClone -@UseLocalDisk -@TestPlugin( - name = "pull-replication", - sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule", - httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule") -public class PullReplicationFanoutConfigIT extends LightweightPluginDaemonTest { - private static final Optional<String> ALL_PROJECTS = Optional.empty(); - private static final FluentLogger logger = FluentLogger.forEnclosingClass(); +abstract class PullReplicationFanoutConfigBase extends PullReplicationSetupBase { private static final int TEST_REPLICATION_DELAY = 60; - private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2); - private static final String TEST_REPLICATION_SUFFIX = "suffix1"; - private static final String TEST_REPLICATION_REMOTE = "remote1"; - @Inject private SitePaths sitePaths; - @Inject private ProjectOperations projectOperations; - private Path gitPath; - private FileBasedConfig config; private FileBasedConfig remoteConfig; - private FileBasedConfig secureConfig; @Override public void setUpTestPlugin() throws Exception { gitPath = sitePaths.site_path.resolve("git"); - - config = - new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED); remoteConfig = new FileBasedConfig( sitePaths @@ -81,17 +51,8 @@ .toFile(), FS.DETECTED); - setReplicationSource( - TEST_REPLICATION_REMOTE); // Simulates a full replication.config initialization - setRemoteConfig(TEST_REPLICATION_SUFFIX, ALL_PROJECTS); - - secureConfig = - new FileBasedConfig(sitePaths.etc_dir.resolve("secure.config").toFile(), FS.DETECTED); - setReplicationCredentials(TEST_REPLICATION_REMOTE, admin.username(), admin.httpPassword()); - secureConfig.save(); - - super.setUpTestPlugin(); + super.setUpTestPlugin(false); } @Test @@ -104,8 +65,8 @@ String sourceRef = pushResult.getPatchSet().refName(); ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, sourceRef, ObjectId.zeroId().getName(), @@ -140,8 +101,8 @@ RevCommit sourceCommit = pushResult.getCommit(); final String sourceRef = pushResult.getPatchSet().refName(); ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, sourceRef, ObjectId.zeroId().getName(), @@ -173,8 +134,8 @@ ReplicationQueue pullReplicationQueue = plugin.getSysInjector().getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, newBranch, ObjectId.zeroId().getName(), @@ -251,17 +212,11 @@ waitUntil(() -> sources.getAll().size() == 1); } - private Ref getRef(Repository repo, String branchName) throws IOException { - return repo.getRefDatabase().exactRef(branchName); - } - - private Ref checkedGetRef(Repository repo, String branchName) { - try { - return repo.getRefDatabase().exactRef(branchName); - } catch (Exception e) { - logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo); - return null; - } + @Override + protected void setReplicationSource( + String remoteName, List<String> replicaSuffixes, Optional<String> project) + throws IOException { + setReplicationSource(remoteName); } private void setReplicationSource(String remoteName) throws IOException { @@ -294,23 +249,4 @@ project.ifPresent(prj -> remoteConfig.setString("remote", null, "projects", prj)); remoteConfig.save(); } - - private void setReplicationCredentials(String remoteName, String username, String password) - throws IOException { - secureConfig.setString("remote", remoteName, "username", username); - secureConfig.setString("remote", remoteName, "password", password); - secureConfig.save(); - } - - private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException { - WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT); - } - - private <T> T getInstance(Class<T> classObj) { - return plugin.getSysInjector().getInstance(classObj); - } - - private Project.NameKey createTestProject(String name) throws Exception { - return projectOperations.newProject().name(name).create(); - } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBatchRefUpdateEventIT.java similarity index 62% copy from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java copy to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBatchRefUpdateEventIT.java index be017d0..7babf34 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBatchRefUpdateEventIT.java
@@ -1,4 +1,4 @@ -// Copyright (C) 2022 The Android Open Source Project +// Copyright (C) 2023 The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -11,16 +11,11 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - package com.googlesource.gerrit.plugins.replication.pull; import com.google.gerrit.acceptance.SkipProjectClone; import com.google.gerrit.acceptance.TestPlugin; import com.google.gerrit.acceptance.UseLocalDisk; -import com.google.gerrit.server.config.SitePaths; -import com.google.inject.Inject; -import org.eclipse.jgit.storage.file.FileBasedConfig; -import org.eclipse.jgit.util.FS; @SkipProjectClone @UseLocalDisk @@ -28,16 +23,11 @@ name = "pull-replication", sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule", httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule") -public class PullReplicationAsyncIT extends PullReplicationITAbstract { - @Inject private SitePaths sitePaths; +public class PullReplicationFanoutConfigBatchRefUpdateEventIT + extends PullReplicationFanoutConfigBase { @Override - public void setUpTestPlugin() throws Exception { - FileBasedConfig config = - new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED); - config.setString("replication", null, "syncRefs", "^$"); - config.save(); - - super.setUpTestPlugin(true); + protected boolean useBatchRefUpdateEvent() { + return true; } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigRefUpdatedEventIT.java similarity index 62% copy from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java copy to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigRefUpdatedEventIT.java index be017d0..16b0b02 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigRefUpdatedEventIT.java
@@ -1,4 +1,4 @@ -// Copyright (C) 2022 The Android Open Source Project +// Copyright (C) 2023 The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -11,16 +11,11 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - package com.googlesource.gerrit.plugins.replication.pull; import com.google.gerrit.acceptance.SkipProjectClone; import com.google.gerrit.acceptance.TestPlugin; import com.google.gerrit.acceptance.UseLocalDisk; -import com.google.gerrit.server.config.SitePaths; -import com.google.inject.Inject; -import org.eclipse.jgit.storage.file.FileBasedConfig; -import org.eclipse.jgit.util.FS; @SkipProjectClone @UseLocalDisk @@ -28,16 +23,10 @@ name = "pull-replication", sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule", httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule") -public class PullReplicationAsyncIT extends PullReplicationITAbstract { - @Inject private SitePaths sitePaths; +public class PullReplicationFanoutConfigRefUpdatedEventIT extends PullReplicationFanoutConfigBase { @Override - public void setUpTestPlugin() throws Exception { - FileBasedConfig config = - new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED); - config.setString("replication", null, "syncRefs", "^$"); - config.save(); - - super.setUpTestPlugin(true); + protected boolean useBatchRefUpdateEvent() { + return false; } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java index ba812e2..50aa1b2 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
@@ -31,6 +31,7 @@ import com.google.gerrit.extensions.events.HeadUpdatedListener; import com.google.gerrit.extensions.events.ProjectDeletedListener; import com.google.gerrit.extensions.restapi.RestApiException; +import com.google.gerrit.server.events.ProjectEvent; import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator; import java.io.IOException; import java.util.Collection; @@ -83,8 +84,8 @@ String sourceRef = pushResult.getPatchSet().refName(); ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, sourceRef, ObjectId.zeroId().getName(), @@ -116,8 +117,8 @@ ReplicationQueue pullReplicationQueue = plugin.getSysInjector().getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, newBranch, ObjectId.zeroId().getName(), @@ -159,8 +160,8 @@ ReplicationQueue pullReplicationQueue = plugin.getSysInjector().getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, newBranch, ObjectId.zeroId().getName(), @@ -185,8 +186,8 @@ assertThat(pushedRefs).hasSize(1); assertThat(pushedRefs.iterator().next().getStatus()).isEqualTo(Status.OK); - FakeGitReferenceUpdatedEvent forcedPushEvent = - new FakeGitReferenceUpdatedEvent( + ProjectEvent forcedPushEvent = + generateUpdateEvent( project, newBranch, branchRevision, @@ -224,8 +225,8 @@ String sourceRef = pushResult.getPatchSet().refName(); ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, sourceRef, ObjectId.zeroId().getName(), @@ -265,8 +266,8 @@ ReplicationQueue pullReplicationQueue = plugin.getSysInjector().getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, newBranch, ObjectId.zeroId().getName(), @@ -357,8 +358,8 @@ String sourceRef = pushResult.getPatchSet().refName(); ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, sourceRef, ObjectId.zeroId().getName(),
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java similarity index 93% rename from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java rename to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java index 5d4aeb4..e76faf5 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java
@@ -21,8 +21,6 @@ import static com.google.gerrit.server.group.SystemGroupBackend.REGISTERED_USERS; import com.google.gerrit.acceptance.PushOneCommit.Result; -import com.google.gerrit.acceptance.SkipProjectClone; -import com.google.gerrit.acceptance.TestPlugin; import com.google.gerrit.acceptance.UseLocalDisk; import com.google.gerrit.acceptance.config.GerritConfig; import com.google.gerrit.entities.Permission; @@ -33,6 +31,7 @@ import com.google.gerrit.extensions.events.HeadUpdatedListener; import com.google.gerrit.extensions.events.ProjectDeletedListener; import com.google.gerrit.extensions.restapi.RestApiException; +import com.google.gerrit.server.events.ProjectEvent; import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator; import java.io.IOException; import java.util.Collection; @@ -51,13 +50,7 @@ import org.junit.Ignore; import org.junit.Test; -@SkipProjectClone -@UseLocalDisk -@TestPlugin( - name = "pull-replication", - sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule", - httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule") -public class PullReplicationIT extends PullReplicationSetupBase { +abstract class PullReplicationITBase extends PullReplicationSetupBase { @Override protected void setReplicationSource( @@ -90,8 +83,8 @@ String sourceRef = pushResult.getPatchSet().refName(); ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, sourceRef, ObjectId.zeroId().getName(), @@ -123,8 +116,8 @@ ReplicationQueue pullReplicationQueue = plugin.getSysInjector().getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, newBranch, ObjectId.zeroId().getName(), @@ -166,8 +159,8 @@ ReplicationQueue pullReplicationQueue = plugin.getSysInjector().getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, newBranch, ObjectId.zeroId().getName(), @@ -192,8 +185,8 @@ assertThat(pushedRefs).hasSize(1); assertThat(pushedRefs.iterator().next().getStatus()).isEqualTo(Status.OK); - FakeGitReferenceUpdatedEvent forcedPushEvent = - new FakeGitReferenceUpdatedEvent( + ProjectEvent forcedPushEvent = + generateUpdateEvent( project, newBranch, branchRevision, @@ -231,8 +224,8 @@ String sourceRef = pushResult.getPatchSet().refName(); ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, sourceRef, ObjectId.zeroId().getName(), @@ -272,8 +265,8 @@ ReplicationQueue pullReplicationQueue = plugin.getSysInjector().getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, newBranch, ObjectId.zeroId().getName(), @@ -363,8 +356,8 @@ String sourceRef = pushResult.getPatchSet().refName(); ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, sourceRef, ObjectId.zeroId().getName(),
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedAsyncIT.java similarity index 90% rename from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java rename to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedAsyncIT.java index be017d0..2ca7538 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedAsyncIT.java
@@ -28,10 +28,15 @@ name = "pull-replication", sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule", httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule") -public class PullReplicationAsyncIT extends PullReplicationITAbstract { +public class PullReplicationRefUpdatedAsyncIT extends PullReplicationITAbstract { @Inject private SitePaths sitePaths; @Override + protected boolean useBatchRefUpdateEvent() { + return false; + } + + @Override public void setUpTestPlugin() throws Exception { FileBasedConfig config = new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedIT.java similarity index 62% copy from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java copy to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedIT.java index be017d0..6e7c369 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedIT.java
@@ -1,4 +1,4 @@ -// Copyright (C) 2022 The Android Open Source Project +// Copyright (C) 2023 The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,10 +17,6 @@ import com.google.gerrit.acceptance.SkipProjectClone; import com.google.gerrit.acceptance.TestPlugin; import com.google.gerrit.acceptance.UseLocalDisk; -import com.google.gerrit.server.config.SitePaths; -import com.google.inject.Inject; -import org.eclipse.jgit.storage.file.FileBasedConfig; -import org.eclipse.jgit.util.FS; @SkipProjectClone @UseLocalDisk @@ -28,16 +24,10 @@ name = "pull-replication", sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule", httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule") -public class PullReplicationAsyncIT extends PullReplicationITAbstract { - @Inject private SitePaths sitePaths; +public class PullReplicationRefUpdatedIT extends PullReplicationITBase { @Override - public void setUpTestPlugin() throws Exception { - FileBasedConfig config = - new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED); - config.setString("replication", null, "syncRefs", "^$"); - config.save(); - - super.setUpTestPlugin(true); + protected boolean useBatchRefUpdateEvent() { + return false; } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java index e07d481..1ef4d35 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
@@ -16,13 +16,20 @@ import static java.util.stream.Collectors.toList; +import com.google.common.base.Suppliers; import com.google.common.flogger.FluentLogger; import com.google.gerrit.acceptance.LightweightPluginDaemonTest; import com.google.gerrit.acceptance.testsuite.project.ProjectOperations; +import com.google.gerrit.entities.Project; import com.google.gerrit.entities.Project.NameKey; import com.google.gerrit.extensions.events.ProjectDeletedListener; import com.google.gerrit.extensions.registration.DynamicSet; import com.google.gerrit.server.config.SitePaths; +import com.google.gerrit.server.data.AccountAttribute; +import com.google.gerrit.server.data.RefUpdateAttribute; +import com.google.gerrit.server.events.BatchRefUpdateEvent; +import com.google.gerrit.server.events.ProjectEvent; +import com.google.gerrit.server.events.RefUpdatedEvent; import com.google.inject.Inject; import java.io.File; import java.io.IOException; @@ -54,6 +61,22 @@ protected FileBasedConfig config; protected FileBasedConfig secureConfig; + protected abstract boolean useBatchRefUpdateEvent(); + + protected ProjectEvent generateUpdateEvent( + Project.NameKey project, + String ref, + String oldObjectId, + String newObjectId, + String instanceId) { + + if (useBatchRefUpdateEvent()) { + return generateBatchRefUpdateEvent(project, ref, oldObjectId, newObjectId, instanceId); + } + + return generateRefUpdateEvent(project, ref, oldObjectId, newObjectId, instanceId); + } + protected void setUpTestPlugin(boolean loadExisting) throws Exception { gitPath = sitePaths.site_path.resolve("git"); @@ -73,6 +96,9 @@ setReplicationCredentials(TEST_REPLICATION_REMOTE, admin.username(), admin.httpPassword()); secureConfig.save(); + cfg.setBoolean( + "event", "stream-events", "enableBatchRefUpdatedEvents", useBatchRefUpdateEvent()); + super.setUpTestPlugin(); } @@ -121,4 +147,38 @@ List<String> replicaSuffixes, Function<String, String> toURL) { return replicaSuffixes.stream().map(suffix -> toURL.apply(suffix)).collect(toList()); } + + private BatchRefUpdateEvent generateBatchRefUpdateEvent( + Project.NameKey project, + String ref, + String oldObjectId, + String newObjectId, + String instanceId) { + RefUpdateAttribute upd = new RefUpdateAttribute(); + upd.newRev = newObjectId; + upd.oldRev = oldObjectId; + upd.project = project.get(); + upd.refName = ref; + BatchRefUpdateEvent event = + new BatchRefUpdateEvent( + project, + Suppliers.ofInstance(List.of(upd)), + Suppliers.ofInstance(new AccountAttribute(admin.id().get()))); + event.instanceId = instanceId; + return event; + } + + private ProjectEvent generateRefUpdateEvent( + NameKey project, String ref, String oldObjectId, String newObjectId, String instanceId) { + RefUpdateAttribute upd = new RefUpdateAttribute(); + upd.newRev = newObjectId; + upd.oldRev = oldObjectId; + upd.project = project.get(); + upd.refName = ref; + RefUpdatedEvent event = new RefUpdatedEvent(); + event.refUpdate = Suppliers.ofInstance(upd); + event.submitter = Suppliers.ofInstance(new AccountAttribute(admin.id().get())); + event.instanceId = instanceId; + return event; + } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java similarity index 94% rename from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolIT.java rename to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java index e55e383..2e95ef1 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java
@@ -21,6 +21,7 @@ import com.google.gerrit.acceptance.TestPlugin; import com.google.gerrit.acceptance.UseLocalDisk; import com.google.gerrit.acceptance.config.GerritConfig; +import com.google.gerrit.server.events.ProjectEvent; import java.io.IOException; import java.util.List; import java.util.Optional; @@ -36,7 +37,8 @@ name = "pull-replication", sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule", httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule") -public class PullReplicationWithGitHttpTransportProtocolIT extends PullReplicationSetupBase { +public abstract class PullReplicationWithGitHttpTransportProtocolBase + extends PullReplicationSetupBase { @Override protected void setReplicationSource( @@ -80,8 +82,8 @@ String sourceRef = pushResult.getPatchSet().refName(); ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class); - FakeGitReferenceUpdatedEvent event = - new FakeGitReferenceUpdatedEvent( + ProjectEvent event = + generateUpdateEvent( project, sourceRef, ObjectId.zeroId().getName(),
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBatchRefUpdatedIT.java similarity index 62% copy from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java copy to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBatchRefUpdatedIT.java index be017d0..8c8ca37 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBatchRefUpdatedIT.java
@@ -1,4 +1,4 @@ -// Copyright (C) 2022 The Android Open Source Project +// Copyright (C) 2023 The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,10 +17,6 @@ import com.google.gerrit.acceptance.SkipProjectClone; import com.google.gerrit.acceptance.TestPlugin; import com.google.gerrit.acceptance.UseLocalDisk; -import com.google.gerrit.server.config.SitePaths; -import com.google.inject.Inject; -import org.eclipse.jgit.storage.file.FileBasedConfig; -import org.eclipse.jgit.util.FS; @SkipProjectClone @UseLocalDisk @@ -28,16 +24,11 @@ name = "pull-replication", sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule", httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule") -public class PullReplicationAsyncIT extends PullReplicationITAbstract { - @Inject private SitePaths sitePaths; +public class PullReplicationWithGitHttpTransportProtocolBatchRefUpdatedIT + extends PullReplicationWithGitHttpTransportProtocolBase { @Override - public void setUpTestPlugin() throws Exception { - FileBasedConfig config = - new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED); - config.setString("replication", null, "syncRefs", "^$"); - config.save(); - - super.setUpTestPlugin(true); + protected boolean useBatchRefUpdateEvent() { + return true; } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolRefUpdatedIT.java similarity index 62% copy from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java copy to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolRefUpdatedIT.java index be017d0..5f3c7b6 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolRefUpdatedIT.java
@@ -1,4 +1,4 @@ -// Copyright (C) 2022 The Android Open Source Project +// Copyright (C) 2023 The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,10 +17,6 @@ import com.google.gerrit.acceptance.SkipProjectClone; import com.google.gerrit.acceptance.TestPlugin; import com.google.gerrit.acceptance.UseLocalDisk; -import com.google.gerrit.server.config.SitePaths; -import com.google.inject.Inject; -import org.eclipse.jgit.storage.file.FileBasedConfig; -import org.eclipse.jgit.util.FS; @SkipProjectClone @UseLocalDisk @@ -28,16 +24,11 @@ name = "pull-replication", sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule", httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule") -public class PullReplicationAsyncIT extends PullReplicationITAbstract { - @Inject private SitePaths sitePaths; +public class PullReplicationWithGitHttpTransportProtocolRefUpdatedIT + extends PullReplicationWithGitHttpTransportProtocolBase { @Override - public void setUpTestPlugin() throws Exception { - FileBasedConfig config = - new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED); - config.setString("replication", null, "syncRefs", "^$"); - config.save(); - - super.setUpTestPlugin(true); + protected boolean useBatchRefUpdateEvent() { + return false; } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java index 15e2c1b..4f853a2 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -34,12 +35,13 @@ import com.google.common.collect.Lists; import com.google.gerrit.entities.Project; import com.google.gerrit.extensions.api.changes.NotifyHandling; -import com.google.gerrit.extensions.common.AccountInfo; import com.google.gerrit.extensions.events.ProjectDeletedListener; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.metrics.DisabledMetricMaker; import com.google.gerrit.server.config.SitePaths; +import com.google.gerrit.server.data.AccountAttribute; import com.google.gerrit.server.data.RefUpdateAttribute; +import com.google.gerrit.server.events.BatchRefUpdateEvent; import com.google.gerrit.server.events.Event; import com.google.gerrit.server.events.EventDispatcher; import com.google.gerrit.server.events.RefUpdatedEvent; @@ -59,7 +61,9 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import org.eclipse.jgit.errors.LargeObjectException; +import org.eclipse.jgit.lib.Config; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.storage.file.FileBasedConfig; import org.eclipse.jgit.util.FS; @@ -68,6 +72,7 @@ import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @@ -78,6 +83,10 @@ private static final String FOREIGN_INSTANCE_ID = "any other instance id"; private static final String TEST_REF_NAME = "refs/meta/heads/anyref"; + private static final Project.NameKey PROJECT = Project.nameKey("defaultProject"); + private static final String NEW_OBJECT_ID = + ObjectId.fromString("3c1ddc050d7906adb0e29bc3bc46af8749b2f63b").getName(); + @Mock private WorkQueue wq; @Mock private Source source; @Mock private SourcesCollection sourceCollection; @@ -86,7 +95,7 @@ @Mock ReplicationStateListeners sl; @Mock FetchRestApiClient fetchRestApiClient; @Mock FetchApiClient.Factory fetchClientFactory; - @Mock AccountInfo accountInfo; + @Mock AccountAttribute accountAttribute; @Mock RevisionReader revReader; @Mock RevisionData revisionData; @Mock HttpResult successfulHttpResult; @@ -95,6 +104,8 @@ List<ObjectId> revisionDataParentObjectIds; @Mock HttpResult httpResult; @Mock ApplyObjectsRefsFilter applyObjectsRefsFilter; + + @Mock Config config; ApplyObjectMetrics applyObjectMetrics; FetchReplicationMetrics fetchMetrics; @@ -121,6 +132,8 @@ when(source.getApis()).thenReturn(apis); when(sourceCollection.getAll()).thenReturn(Lists.newArrayList(source)); when(rd.get()).thenReturn(sourceCollection); + when(config.getBoolean("event", "stream-events", "enableBatchRefUpdatedEvents", false)) + .thenReturn(true); lenient() .when(revReader.read(any(), any(), anyString(), eq(0))) .thenReturn(Optional.of(revisionData)); @@ -169,11 +182,39 @@ applyObjectMetrics, fetchMetrics, LOCAL_INSTANCE_ID, + config, applyObjectsRefsFilter); } @Test public void shouldCallSendObjectWhenMetaRef() throws IOException { + Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta"); + objectUnderTest.start(); + objectUnderTest.onEvent(event); + + verify(fetchRestApiClient).callSendObjects(any(), anyString(), anyLong(), any(), any()); + } + + @Test + public void shouldCallSendObjectWhenMetaRefAndRefUpdateEvent() throws IOException { + when(config.getBoolean("event", "stream-events", "enableBatchRefUpdatedEvents", false)) + .thenReturn(false); + + objectUnderTest = + new ReplicationQueue( + wq, + rd, + dis, + sl, + fetchClientFactory, + refsFilter, + () -> revReader, + applyObjectMetrics, + fetchMetrics, + LOCAL_INSTANCE_ID, + config, + applyObjectsRefsFilter); + Event event = new TestEvent("refs/changes/01/1/meta"); objectUnderTest.start(); objectUnderTest.onEvent(event); @@ -183,7 +224,7 @@ @Test public void shouldIgnoreEventWhenIsNotLocalInstanceId() throws IOException { - Event event = new TestEvent(); + Event event = generateBatchRefUpdateEvent(TEST_REF_NAME); event.instanceId = FOREIGN_INSTANCE_ID; objectUnderTest.start(); objectUnderTest.onEvent(event); @@ -194,7 +235,7 @@ @Test public void shouldCallInitProjectWhenProjectIsMissing() throws IOException { - Event event = new TestEvent("refs/changes/01/1/meta"); + Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta"); when(httpResult.isSuccessful()).thenReturn(false); when(httpResult.isProjectMissing(any())).thenReturn(true); when(source.isCreateMissingRepositories()).thenReturn(true); @@ -206,8 +247,24 @@ } @Test + public void shouldCallSendObjectReorderingRefsHavingMetaAtTheEnd() throws IOException { + Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta", "refs/changes/01/1/1"); + objectUnderTest.start(); + objectUnderTest.onEvent(event); + verifySendObjectOrdering("refs/changes/01/1/1", "refs/changes/01/1/meta"); + } + + @Test + public void shouldCallSendObjectKeepingMetaAtTheEnd() throws IOException { + Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1", "refs/changes/01/1/meta"); + objectUnderTest.start(); + objectUnderTest.onEvent(event); + verifySendObjectOrdering("refs/changes/01/1/1", "refs/changes/01/1/meta"); + } + + @Test public void shouldNotCallInitProjectWhenReplicateNewRepositoriesNotSet() throws IOException { - Event event = new TestEvent("refs/changes/01/1/meta"); + Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta"); when(httpResult.isSuccessful()).thenReturn(false); when(httpResult.isProjectMissing(any())).thenReturn(true); when(source.isCreateMissingRepositories()).thenReturn(false); @@ -219,7 +276,25 @@ } @Test - public void shouldCallSendObjectWhenPatchSetRef() throws IOException { + public void shouldCallSendObjectWhenPatchSetRefAndRefUpdateEvent() throws IOException { + when(config.getBoolean("event", "stream-events", "enableBatchRefUpdatedEvents", false)) + .thenReturn(false); + + objectUnderTest = + new ReplicationQueue( + wq, + rd, + dis, + sl, + fetchClientFactory, + refsFilter, + () -> revReader, + applyObjectMetrics, + fetchMetrics, + LOCAL_INSTANCE_ID, + config, + applyObjectsRefsFilter); + Event event = new TestEvent("refs/changes/01/1/1"); objectUnderTest.start(); objectUnderTest.onEvent(event); @@ -228,9 +303,18 @@ } @Test + public void shouldCallSendObjectWhenPatchSetRef() throws IOException { + Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1"); + objectUnderTest.start(); + objectUnderTest.onEvent(event); + + verify(fetchRestApiClient).callSendObjects(any(), anyString(), anyLong(), any(), any()); + } + + @Test public void shouldFallbackToCallFetchWhenIOException() throws IOException, LargeObjectException, RefUpdateException { - Event event = new TestEvent("refs/changes/01/1/meta"); + Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta"); objectUnderTest.start(); when(revReader.read(any(), any(), anyString(), anyInt())).thenThrow(IOException.class); @@ -243,7 +327,7 @@ @Test public void shouldFallbackToCallFetchWhenLargeRef() throws IOException, LargeObjectException, RefUpdateException { - Event event = new TestEvent("refs/changes/01/1/1"); + Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1"); objectUnderTest.start(); when(revReader.read(any(), any(), anyString(), anyInt())).thenReturn(Optional.empty()); @@ -255,7 +339,7 @@ @Test public void shouldFallbackToCallFetchWhenParentObjectIsMissing() throws IOException { - Event event = new TestEvent("refs/changes/01/1/1"); + Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1"); objectUnderTest.start(); when(httpResult.isSuccessful()).thenReturn(false); @@ -271,7 +355,7 @@ @Test public void shouldFallbackToApplyAllParentObjectsWhenParentObjectIsMissingOnMetaRef() throws IOException { - Event event = new TestEvent("refs/changes/01/1/meta"); + Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta"); objectUnderTest.start(); when(httpResult.isSuccessful()).thenReturn(false, true); @@ -298,7 +382,7 @@ public void shouldFallbackToApplyAllParentObjectsWhenParentObjectIsMissingOnAllowedRefs() throws IOException { String refName = "refs/tags/test-tag"; - Event event = new TestEvent(refName); + Event event = generateBatchRefUpdateEvent(refName); objectUnderTest.start(); when(httpResult.isSuccessful()).thenReturn(false, true); @@ -343,19 +427,20 @@ applyObjectMetrics, fetchMetrics, LOCAL_INSTANCE_ID, + config, applyObjectsRefsFilter); - Event event = new TestEvent("refs/multi-site/version"); + Event event = generateBatchRefUpdateEvent("refs/multi-site/version"); objectUnderTest.onEvent(event); - verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo); + verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountAttribute); } @Test public void shouldSkipEventWhenStarredChangesRef() { - Event event = new TestEvent("refs/starred-changes/41/2941/1000000"); + Event event = generateBatchRefUpdateEvent("refs/starred-changes/41/2941/1000000"); objectUnderTest.onEvent(event); - verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo); + verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountAttribute); } @Test @@ -418,6 +503,38 @@ return createTempDirectory(prefix); } + private BatchRefUpdateEvent generateBatchRefUpdateEvent(String... refs) { + List<RefUpdateAttribute> refUpdates = + Arrays.stream(refs) + .map( + ref -> { + RefUpdateAttribute upd = new RefUpdateAttribute(); + upd.newRev = NEW_OBJECT_ID; + upd.oldRev = ObjectId.zeroId().getName(); + upd.project = PROJECT.get(); + upd.refName = ref; + return upd; + }) + .collect(Collectors.toList()); + + BatchRefUpdateEvent event = + new BatchRefUpdateEvent( + PROJECT, Suppliers.ofInstance(refUpdates), Suppliers.ofInstance(accountAttribute)); + event.instanceId = LOCAL_INSTANCE_ID; + return event; + } + + private void verifySendObjectOrdering(String firstRef, String secondRef) throws IOException { + InOrder inOrder = inOrder(fetchRestApiClient); + + inOrder + .verify(fetchRestApiClient) + .callSendObjects(any(), eq(firstRef), anyLong(), any(), any()); + inOrder + .verify(fetchRestApiClient) + .callSendObjects(any(), eq(secondRef), anyLong(), any(), any()); + } + private class TestEvent extends RefUpdatedEvent { public TestEvent() {