Implement periodic fetch for source When `remote.NAME.fetchEvery` is configured to value greater than `0` for a source then all its projects are fetched using a dedicated (called `PeriodicallyFetchFromSources`) pool (with a single thread). Notes: * dedicated pool is created only when at least single resource is configured to fetch periodically from the remote * the check is performed on pull-replication plugin start therefore plugin should be restarted in case when configuration is modified * all refs (according to `remote.NAME.fetch` spec) are fetched for each repository Bug: Issue 322146240 Change-Id: I4ff6ca67ec4005710c28f6b9cee08d584da03936
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java index 6457f8a..0a75c86 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
@@ -14,6 +14,7 @@ package com.googlesource.gerrit.plugins.replication.pull; +import com.google.common.base.Suppliers; import com.google.common.util.concurrent.Atomics; import com.google.gerrit.extensions.events.LifecycleListener; import com.google.gerrit.extensions.registration.DynamicItem; @@ -22,11 +23,13 @@ import com.google.gerrit.server.events.EventDispatcher; import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Inject; +import com.google.inject.Provider; import com.googlesource.gerrit.plugins.replication.ReplicationConfig; import com.googlesource.gerrit.plugins.replication.ReplicationFilter; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; public class OnStartStop implements LifecycleListener { private final AtomicReference<Future<?>> fetchAllFuture; @@ -37,6 +40,7 @@ private final ReplicationState.Factory replicationStateFactory; private final SourcesCollection sourcesCollection; private final WorkQueue workQueue; + private final Supplier<SourcesFetchPeriodically> fetchAllPeriodically; private boolean isReplica; @Inject @@ -48,6 +52,7 @@ ReplicationState.Factory replicationStateFactory, SourcesCollection sourcesCollection, WorkQueue workQueue, + Provider<SourcesFetchPeriodically> fetchAllPeriodically, @GerritIsReplica Boolean isReplica) { this.srvInfo = srvInfo; this.fetchAll = fetchAll; @@ -58,10 +63,14 @@ this.sourcesCollection = sourcesCollection; this.workQueue = workQueue; this.isReplica = isReplica; + this.fetchAllPeriodically = Suppliers.memoize(() -> fetchAllPeriodically.get()); } @Override public void start() { + sourcesCollection.startup(workQueue); + fetchAllPeriodically.get().start(); + if (isReplica && srvInfo.getState() == ServerInformation.State.STARTUP && config.isReplicateAllOnPluginStart()) { @@ -73,12 +82,12 @@ .create(null, ReplicationFilter.all(), state, false) .schedule(30, TimeUnit.SECONDS)); } - - sourcesCollection.startup(workQueue); } @Override public void stop() { + fetchAllPeriodically.get().stop(); + Future<?> f = fetchAllFuture.getAndSet(null); if (f != null) { f.cancel(true);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java index ddc6258..a4af8ca 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -137,6 +137,8 @@ install(new EventsBrokerConsumerModule(eventBrokerTopic, replicationConfig)); } + install(new FactoryModuleBuilder().build(SourceFetchPeriodically.Factory.class)); + DynamicSet.setOf(binder(), ReplicationStateListener.class); DynamicSet.bind(binder(), ReplicationStateListener.class).to(PullReplicationStateLogger.class); EventTypes.register(FetchRefReplicatedEvent.TYPE, FetchRefReplicatedEvent.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java index 9283033..922ce32 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -448,6 +448,15 @@ return schedule(project, ref, uri, state, apiRequestMetrics, false); } + public Future<?> scheduleNow( + Project.NameKey project, + String ref, + ReplicationState state, + Optional<PullReplicationApiRequestMetrics> apiRequestMetrics) { + URIish uri = getURI(project); + return schedule(project, ref, uri, state, apiRequestMetrics, true); + } + public Future<?> schedule( Project.NameKey project, String ref, @@ -917,6 +926,10 @@ return config.enableBatchedRefs(); } + public long fetchEvery() { + return config.fetchEvery(); + } + void scheduleUpdateHead(String apiUrl, Project.NameKey project, String newHead) { try { URIish apiURI = new URIish(apiUrl);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java new file mode 100644 index 0000000..2c2f6d8 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java
@@ -0,0 +1,80 @@ +// Copyright (C) 2024 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull; + +import com.google.common.flogger.FluentLogger; +import com.google.gerrit.extensions.registration.DynamicItem; +import com.google.gerrit.server.events.EventDispatcher; +import com.google.gerrit.server.project.ProjectCache; +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.assistedinject.Assisted; +import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics; +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +class SourceFetchPeriodically { + interface Factory { + SourceFetchPeriodically create(Source source); + } + + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private final ProjectCache projects; + private final ReplicationState.Factory fetchReplicationFactory; + private final Provider<PullReplicationApiRequestMetrics> metricsProvider; + private final DynamicItem<EventDispatcher> eventDispatcher; + private final Source source; + + @Inject + SourceFetchPeriodically( + ProjectCache projects, + ReplicationState.Factory fetchReplicationFactory, + Provider<PullReplicationApiRequestMetrics> metricsProvider, + DynamicItem<EventDispatcher> eventDispatcher, + @Assisted Source source) { + this.projects = projects; + this.fetchReplicationFactory = fetchReplicationFactory; + this.metricsProvider = metricsProvider; + this.eventDispatcher = eventDispatcher; + this.source = source; + } + + ScheduledFuture<?> start(ScheduledExecutorService pool) { + return pool.scheduleAtFixedRate( + this::scheduleFetchAll, 0L, source.fetchEvery(), TimeUnit.SECONDS); + } + + private void scheduleFetchAll() { + Optional<PullReplicationApiRequestMetrics> metrics = Optional.of(metricsProvider.get()); + long repositoriesToBeFetched = + projects.all().stream() + .filter(source::wouldFetchProject) + .map( + projectToFetch -> + source.scheduleNow( + projectToFetch, + FetchOne.ALL_REFS, + fetchReplicationFactory.create( + new FetchResultProcessing.GitUpdateProcessing(eventDispatcher.get())), + metrics)) + .count(); + logger.atInfo().log( + "The %d repositories were scheduled for %s remote to fetch %s", + repositoriesToBeFetched, source.getRemoteConfigName(), FetchOne.ALL_REFS); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java new file mode 100644 index 0000000..b405a6e --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java
@@ -0,0 +1,77 @@ +// Copyright (C) 2024 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull; + +import static com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration.DEFAULT_PERIODIC_FETCH_DISABLED; +import static java.util.stream.Collectors.toList; + +import com.google.common.base.Suppliers; +import com.google.common.flogger.FluentLogger; +import com.google.gerrit.server.git.WorkQueue; +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.function.Supplier; + +@Singleton +class SourcesFetchPeriodically { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private final WorkQueue workQueue; + private final Provider<SourcesCollection> sources; + private final Provider<SourceFetchPeriodically.Factory> fetchAllCreator; + private final List<ScheduledFuture<?>> scheduled; + + @Inject + SourcesFetchPeriodically( + WorkQueue workQueue, + Provider<SourcesCollection> sources, + Provider<SourceFetchPeriodically.Factory> fetchAllCreator) { + this.workQueue = workQueue; + this.sources = sources; + this.fetchAllCreator = fetchAllCreator; + this.scheduled = new ArrayList<>(); + } + + void start() { + scheduled.addAll(scheduleFetchAll(workQueue, sources.get(), fetchAllCreator.get())); + } + + private List<ScheduledFuture<?>> scheduleFetchAll( + WorkQueue workQueue, + SourcesCollection sources, + SourceFetchPeriodically.Factory fetchAllCreator) { + Supplier<ScheduledExecutorService> queue = + Suppliers.memoize(() -> workQueue.createQueue(1, "PeriodicallyFetchFromSources")); + return sources.getAll().stream() + .filter(source -> source.fetchEvery() > DEFAULT_PERIODIC_FETCH_DISABLED) + .map( + source -> { + logger.atInfo().log( + "Enabling periodic (every %ds) fetch of all refs for [%s] remote", + source.fetchEvery(), source.getRemoteConfigName()); + return fetchAllCreator.create(source).start(queue.get()); + }) + .collect(toList()); + } + + void stop() { + scheduled.forEach(schedule -> schedule.cancel(true)); + } +}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md index 5e791d4..1635e5c 100644 --- a/src/main/resources/Documentation/config.md +++ b/src/main/resources/Documentation/config.md
@@ -631,6 +631,13 @@ > together with `remote.NAME.apiUrl`; such configuration is considered > invalid and prevents the plugin from starting +> *NOTE*: Periodic fetches are scheduled using a dedicated (single +> threaded) pool, called `PeriodicallyFetchFromSources`. It is created only +> when there is at least one remote configured to fetch periodically. + +> *NOTE*: Scheduling is performed on plugin start therefore one needs to +> reload plugin when configuration gets changed. + Directory `replication` -------------------- The optional directory `$site_path/etc/replication` contains Git-style
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java new file mode 100644 index 0000000..5ac1deb --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyIT.java
@@ -0,0 +1,91 @@ +// Copyright (C) 2024 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.gerrit.acceptance.PushOneCommit.Result; +import com.google.gerrit.acceptance.SkipProjectClone; +import com.google.gerrit.acceptance.TestPlugin; +import com.google.gerrit.acceptance.UseLocalDisk; +import com.google.gerrit.acceptance.config.GerritConfig; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevCommit; +import org.junit.Test; + +@SkipProjectClone +@UseLocalDisk +@TestPlugin( + name = "pull-replication", + sysModule = "com.googlesource.gerrit.plugins.replication.pull.TestPullReplicationModule", + httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule") +public class SourcesFetchPeriodicallyIT extends PullReplicationSetupBase { + private static final String TEST_FETCH_FREQUENCY = "1s"; + + @Override + protected boolean useBatchRefUpdateEvent() { + return false; + } + + @Override + protected void setReplicationSource( + String remoteName, List<String> replicaSuffixes, Optional<String> project) + throws IOException { + List<String> fetchUrls = + buildReplicaURLs(replicaSuffixes, s -> gitPath.resolve("${name}" + s + ".git").toString()); + config.setStringList("remote", remoteName, "url", fetchUrls); + project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj)); + config.setString("remote", remoteName, "fetchEvery", TEST_FETCH_FREQUENCY); + config.save(); + } + + @Override + public void setUpTestPlugin() throws Exception { + setUpTestPlugin(false); + } + + @Test + @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE) + public void shouldFetchChangesPeriodically() throws Exception { + testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX)); + + Result pushChangeResult = createChange(); + RevCommit changeCommit = pushChangeResult.getCommit(); + String sourceChangeRef = pushChangeResult.getPatchSet().refName(); + + try (Repository repo = repoManager.openRepository(project)) { + waitUntil(() -> checkedGetRef(repo, sourceChangeRef) != null); + + Ref targetChangeRef = getRef(repo, sourceChangeRef); + assertThat(targetChangeRef).isNotNull(); + assertThat(targetChangeRef.getObjectId()).isEqualTo(changeCommit.getId()); + + // ensure that previous fetch was finished + Thread.sleep(Duration.ofSeconds(TEST_REPLICATION_DELAY).toMillis()); + + Ref sourceNewRef = createNewRef(); + + waitUntil(() -> checkedGetRef(repo, sourceNewRef.getName()) != null); + Ref targetNewRef = getRef(repo, sourceNewRef.getName()); + assertThat(targetNewRef).isNotNull(); + assertThat(targetNewRef.getObjectId()).isEqualTo(sourceNewRef.getObjectId()); + } + } +}