Ensure that replicateOnStartup doesn't compete with fetch When `gerrit.replicateOnStartup` is set to `true` then corresponding projects will be fetched. Ensure that projects that are subject of periodic fetch are excluded from the `replicateOnStartup` schedule as they are going to be fetched anyway. Bug: Issue 322146240 Change-Id: I29c6b12fc12ba0860f43a7fb2d116faa2482a1fa
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java index 0a75c86..6ac09f6 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
@@ -25,7 +25,6 @@ import com.google.inject.Inject; import com.google.inject.Provider; import com.googlesource.gerrit.plugins.replication.ReplicationConfig; -import com.googlesource.gerrit.plugins.replication.ReplicationFilter; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -79,7 +78,11 @@ new FetchResultProcessing.GitUpdateProcessing(eventDispatcher.get())); fetchAllFuture.set( fetchAll - .create(null, ReplicationFilter.all(), state, false) + .create( + null, + fetchAllPeriodically.get().skipFromReplicateAllOnPluginStart(), + state, + false) .schedule(30, TimeUnit.SECONDS)); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java index 2c2f6d8..264c880 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceFetchPeriodically.java
@@ -15,6 +15,7 @@ package com.googlesource.gerrit.plugins.replication.pull; import com.google.common.flogger.FluentLogger; +import com.google.gerrit.entities.Project; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.server.events.EventDispatcher; import com.google.gerrit.server.project.ProjectCache; @@ -26,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; class SourceFetchPeriodically { interface Factory { @@ -59,11 +61,14 @@ this::scheduleFetchAll, 0L, source.fetchEvery(), TimeUnit.SECONDS); } + Stream<Project.NameKey> projectsToFetch() { + return projects.all().stream().filter(source::wouldFetchProject); + } + private void scheduleFetchAll() { Optional<PullReplicationApiRequestMetrics> metrics = Optional.of(metricsProvider.get()); long repositoriesToBeFetched = - projects.all().stream() - .filter(source::wouldFetchProject) + projectsToFetch() .map( projectToFetch -> source.scheduleNow(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java index b405a6e..de1d571 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodically.java
@@ -16,15 +16,20 @@ import static com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration.DEFAULT_PERIODIC_FETCH_DISABLED; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; import com.google.common.base.Suppliers; import com.google.common.flogger.FluentLogger; +import com.google.gerrit.entities.Project; import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; +import com.googlesource.gerrit.plugins.replication.ReplicationFilter; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.function.Supplier; @@ -36,7 +41,7 @@ private final WorkQueue workQueue; private final Provider<SourcesCollection> sources; private final Provider<SourceFetchPeriodically.Factory> fetchAllCreator; - private final List<ScheduledFuture<?>> scheduled; + private final List<ScheduledSource> scheduled; @Inject SourcesFetchPeriodically( @@ -53,7 +58,19 @@ scheduled.addAll(scheduleFetchAll(workQueue, sources.get(), fetchAllCreator.get())); } - private List<ScheduledFuture<?>> scheduleFetchAll( + ReplicationFilter skipFromReplicateAllOnPluginStart() { + Set<Project.NameKey> projectsToSkip = + scheduled.stream() + .flatMap(scheduledSource -> scheduledSource.sourceFetch.projectsToFetch()) + .collect(toSet()); + if (projectsToSkip.isEmpty()) { + return ReplicationFilter.all(); + } + + return new SkipProjectsFilter(projectsToSkip); + } + + private List<ScheduledSource> scheduleFetchAll( WorkQueue workQueue, SourcesCollection sources, SourceFetchPeriodically.Factory fetchAllCreator) { @@ -66,12 +83,41 @@ logger.atInfo().log( "Enabling periodic (every %ds) fetch of all refs for [%s] remote", source.fetchEvery(), source.getRemoteConfigName()); - return fetchAllCreator.create(source).start(queue.get()); + SourceFetchPeriodically sourceFetchAll = fetchAllCreator.create(source); + return new ScheduledSource(sourceFetchAll, sourceFetchAll.start(queue.get())); }) .collect(toList()); } void stop() { - scheduled.forEach(schedule -> schedule.cancel(true)); + scheduled.forEach(source -> source.schedule.cancel(true)); + } + + private static class ScheduledSource { + private final SourceFetchPeriodically sourceFetch; + private final ScheduledFuture<?> schedule; + + private ScheduledSource(SourceFetchPeriodically sourceFetch, ScheduledFuture<?> schedule) { + this.sourceFetch = sourceFetch; + this.schedule = schedule; + } + } + + /** + * The ReplicationFilter implementation that matches all projects that are not part of the + * internal set. + */ + private static class SkipProjectsFilter extends ReplicationFilter { + private final Set<Project.NameKey> projectsToSkip; + + private SkipProjectsFilter(Set<Project.NameKey> projectsToSkip) { + super(Collections.emptyList()); + this.projectsToSkip = projectsToSkip; + } + + @Override + public boolean matches(Project.NameKey name) { + return !projectsToSkip.contains(name); + } } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyTest.java new file mode 100644 index 0000000..47b891d --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/SourcesFetchPeriodicallyTest.java
@@ -0,0 +1,86 @@ +// Copyright (C) 2024 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.gerrit.entities.Project; +import com.google.gerrit.entities.Project.NameKey; +import com.google.gerrit.server.git.WorkQueue; +import com.google.inject.util.Providers; +import com.googlesource.gerrit.plugins.replication.ReplicationFilter; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class SourcesFetchPeriodicallyTest { + private static final NameKey RANDOM_PROJECT_NAME = Project.nameKey("random_project_name"); + + @Mock WorkQueue workQueueMock; + @Mock SourcesCollection sourcesMock; + @Mock SourceFetchPeriodically.Factory factoryMock; + + private SourcesFetchPeriodically objectUnderTest; + + @Before + public void setup() { + objectUnderTest = + new SourcesFetchPeriodically( + workQueueMock, Providers.of(sourcesMock), Providers.of(factoryMock)); + } + + @Test + public void shouldMatchAnyProjectWhenNoProjectsToFetchPeriodicallyAreConfigured() { + // given + when(sourcesMock.getAll()).thenReturn(Collections.emptyList()); + + // when + objectUnderTest.start(); + ReplicationFilter filter = objectUnderTest.skipFromReplicateAllOnPluginStart(); + + // then + assertThat(filter.matches(RANDOM_PROJECT_NAME)).isTrue(); + } + + @Test + public void shouldNotMatchProjectConfiguredToFetchPeriodically() { + // given + Project.NameKey projectToFetch = Project.nameKey("to_be_fetched"); + SourceFetchPeriodically fetchProjectForSource = mock(SourceFetchPeriodically.class); + when(fetchProjectForSource.projectsToFetch()).thenReturn(Stream.of(projectToFetch)); + + Source sourceWithPeriodicFetch = mock(Source.class); + when(sourceWithPeriodicFetch.fetchEvery()).thenReturn(1L); + when(sourcesMock.getAll()).thenReturn(List.of(sourceWithPeriodicFetch)); + when(factoryMock.create(any())).thenReturn(fetchProjectForSource); + + // when + objectUnderTest.start(); + ReplicationFilter filter = objectUnderTest.skipFromReplicateAllOnPluginStart(); + + // then + assertThat(filter.matches(projectToFetch)).isFalse(); + assertThat(filter.matches(RANDOM_PROJECT_NAME)).isTrue(); + } +}