Implement outstanding-tasks healthcheck
Mark an instance as healthy when all outstanding tasks (pending &
in-flight) have been completed across all sources. When a repo
filter is specified, then only the replication tasks that correspond
to the repos configured in the filter are checked.
Bug: Issue 312895374
Change-Id: Ia12ace9179860cd96fbdd81488aa2834f8ab29d6
diff --git a/BUILD b/BUILD
index 664b668..4070d7c 100644
--- a/BUILD
+++ b/BUILD
@@ -49,6 +49,7 @@
tags = ["pull-replication"],
visibility = ["//visibility:public"],
deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
+ ":healthcheck-neverlink",
":pull-replication__plugin",
":pull_replication_util",
"//plugins/replication",
@@ -68,6 +69,7 @@
deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
":pull-replication__plugin",
"//plugins/delete-project",
+ "//plugins/healthcheck",
"//plugins/replication",
],
)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 0a55043..9283033 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -934,14 +934,22 @@
}
}
- long inflightTasksCount() {
+ public long inflightTasksCount() {
return inFlight.size();
}
- long pendingTasksCount() {
+ public long pendingTasksCount() {
return pending.size();
}
+ public boolean zeroPendingTasksForRepo(Project.NameKey project) {
+ return pending.values().stream().noneMatch(fetch -> fetch.getProjectNameKey().equals(project));
+ }
+
+ public boolean zeroInflightTasksForRepo(Project.NameKey project) {
+ return inFlight.values().stream().noneMatch(fetch -> fetch.getProjectNameKey().equals(project));
+ }
+
private static boolean matches(URIish uri, String urlMatch) {
if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java
index 6a4eb9a..3d3be10 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java
@@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.metrics.MetricMaker;
import com.google.gerrit.server.config.ConfigUtil;
@@ -24,6 +25,9 @@
import com.googlesource.gerrit.plugins.healthcheck.HealthCheckConfig;
import com.googlesource.gerrit.plugins.healthcheck.check.AbstractHealthCheck;
import com.googlesource.gerrit.plugins.replication.MergedConfigResource;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.lib.Config;
@@ -36,6 +40,7 @@
public static final String PERIOD_OF_TIME_FIELD = "periodOfTime";
private final Set<String> projects;
private final long periodOfTimeSec;
+ private final SourcesCollection sourcesCollection;
@Inject
public PullReplicationTasksHealthCheck(
@@ -43,7 +48,8 @@
HealthCheckConfig healthCheckConfig,
MergedConfigResource configResource,
@PluginName String name,
- MetricMaker metricMaker) {
+ MetricMaker metricMaker,
+ SourcesCollection sourcesCollection) {
super(executor, healthCheckConfig, name + HEALTHCHECK_NAME_SUFFIX, metricMaker);
String healthCheckName = name + HEALTHCHECK_NAME_SUFFIX;
@@ -60,6 +66,7 @@
PERIOD_OF_TIME_FIELD,
DEFAULT_PERIOD_OF_TIME_SECS,
TimeUnit.SECONDS);
+ this.sourcesCollection = sourcesCollection;
}
public long getPeriodOfTimeSec() {
@@ -72,6 +79,21 @@
@Override
protected Result doCheck() throws Exception {
- return Result.PASSED;
+ List<Source> sources = sourcesCollection.getAll();
+ boolean hasNoOutstandingTasks =
+ sources.stream()
+ .allMatch(
+ source -> {
+ if (projects.isEmpty()) {
+ return source.pendingTasksCount() == 0 && source.inflightTasksCount() == 0;
+ } else {
+ return projects.stream()
+ .allMatch(
+ project ->
+ source.zeroPendingTasksForRepo(Project.nameKey(project))
+ && source.zeroInflightTasksForRepo(Project.nameKey(project)));
+ }
+ });
+ return hasNoOutstandingTasks ? Result.PASSED : Result.FAILED;
}
}
diff --git a/src/main/resources/Documentation/healthcheck.md b/src/main/resources/Documentation/healthcheck.md
index aab68bd..f3001c9 100644
--- a/src/main/resources/Documentation/healthcheck.md
+++ b/src/main/resources/Documentation/healthcheck.md
@@ -2,7 +2,16 @@
==============
The @PLUGIN@ plugin registers the `pull-replication-outstanding-tasks`
-healthcheck.
+healthcheck. This check will mark a gerrit instance as healthy
+only when the node has caught up with all the outstanding
+pull-replication tasks. The goal is to mark the node as healthy when it
+is ready to receive write traffic. "Caught up" means:
+
+- All pending & in-flight replication tasks across all sources (or
+across a configurable set of repos) have completed
+
+See [Healthcheck based on replication tasks](https://issues.gerritcodereview.com/issues/312895374) for more details.
+
Health check configuration
--------------------------
@@ -16,8 +25,8 @@
The health check can be configured as follows:
- `healthcheck.@PLUGIN@-outstanding-tasks.projects`: The repo(s) that
-the health check will track pending replication tasks against. Multiple
-entries are supported.
+the health check will track outstanding replication tasks against.
+Multiple entries are supported.
- `healthcheck.@PLUGIN@-outstanding-tasks.periodOfTime`: The time for
which the check needs to be successful, in order for the instance to be
marked healthy. If the time unit is omitted it defaults to seconds.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java
new file mode 100644
index 0000000..226e509
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java
@@ -0,0 +1,223 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.truth.Truth.assertWithMessage;
+import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.HEALTHCHECK_NAME_SUFFIX;
+import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.PERIOD_OF_TIME_FIELD;
+
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.config.GerritConfig;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.events.ProjectEvent;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckConfig;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckExtensionApiModule;
+import com.googlesource.gerrit.plugins.healthcheck.check.HealthCheck;
+import com.googlesource.gerrit.plugins.replication.ApiModule;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfigModule;
+import com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.junit.Test;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+ name = "pull-replication",
+ sysModule =
+ "com.googlesource.gerrit.plugins.replication.pull.PullReplicationHealthCheckIT$PullReplicationHealthCheckTestModule",
+ httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class PullReplicationHealthCheckIT extends PullReplicationSetupBase {
+
+ public static class PullReplicationHealthCheckTestModule extends AbstractModule {
+ private final PullReplicationModule pullReplicationModule;
+
+ @Inject
+ PullReplicationHealthCheckTestModule(
+ ReplicationConfigModule configModule, InMemoryMetricMaker memMetric) {
+ this.pullReplicationModule = new PullReplicationModule(configModule, memMetric);
+ }
+
+ @Override
+ protected void configure() {
+ install(new ApiModule());
+ install(new HealthCheckExtensionApiModule());
+ install(pullReplicationModule);
+
+ DynamicSet.bind(binder(), EventListener.class)
+ .to(PullReplicationHealthCheckIT.BufferedEventListener.class)
+ .asEagerSingleton();
+ }
+ }
+
+ @Inject private SitePaths sitePaths;
+ private PullReplicationHealthCheckIT.BufferedEventListener eventListener;
+ private final int periodOfTimeSecs = 1;
+
+ @Singleton
+ public static class BufferedEventListener implements EventListener {
+
+ private final List<Event> eventsReceived;
+ private String eventTypeFilter;
+
+ @Inject
+ public BufferedEventListener() {
+ eventsReceived = new ArrayList<>();
+ }
+
+ @Override
+ public void onEvent(Event event) {
+ if (event.getType().equals(eventTypeFilter)) {
+ eventsReceived.add(event);
+ }
+ }
+
+ public void clearFilter(String expectedEventType) {
+ eventsReceived.clear();
+ eventTypeFilter = expectedEventType;
+ }
+
+ public int numEventsReceived() {
+ return eventsReceived.size();
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends Event> Stream<T> eventsStream() {
+ return (Stream<T>) eventsReceived.stream();
+ }
+ }
+
+ @Override
+ public void setUpTestPlugin() throws Exception {
+
+ FileBasedConfig config =
+ new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+ config.setString("replication", null, "syncRefs", "ALL REFS ASYNC");
+ config.setString(
+ HealthCheckConfig.HEALTHCHECK,
+ "pull-replication" + HEALTHCHECK_NAME_SUFFIX,
+ PERIOD_OF_TIME_FIELD,
+ periodOfTimeSecs + " sec");
+ config.save();
+
+ super.setUpTestPlugin(true);
+ eventListener = getInstance(PullReplicationHealthCheckIT.BufferedEventListener.class);
+ }
+
+ @Override
+ protected boolean useBatchRefUpdateEvent() {
+ return false;
+ }
+
+ @Override
+ protected void setReplicationSource(
+ String remoteName, List<String> replicaSuffixes, Optional<String> project)
+ throws IOException {
+ List<String> fetchUrls =
+ buildReplicaURLs(replicaSuffixes, s -> gitPath.resolve("${name}" + s + ".git").toString());
+ config.setStringList("remote", remoteName, "url", fetchUrls);
+ config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
+ config.setString("remote", remoteName, "fetch", "+refs/*:refs/*");
+ config.setInt("remote", remoteName, "timeout", 600);
+ config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+ project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+ config.setBoolean("gerrit", null, "autoReload", true);
+ config.setInt("replication", null, "maxApiPayloadSize", 1);
+ config.save();
+ }
+
+ @Test
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
+ public void shouldReportInstanceHealthyWhenThereAreNoOutstandingReplicationTasks()
+ throws Exception {
+ testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+ PullReplicationTasksHealthCheck healthcheck =
+ getInstance(PullReplicationTasksHealthCheck.class);
+
+ Ref newRef = createNewRef();
+
+ ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
+ ProjectEvent event =
+ generateUpdateEvent(
+ project,
+ newRef.getName(),
+ ObjectId.zeroId().getName(),
+ newRef.getObjectId().getName(),
+ TEST_REPLICATION_REMOTE);
+
+ eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
+ pullReplicationQueue.onEvent(event);
+ // If replication hasn't started yet, the healthcheck returns PASSED
+ // but hasReplicationFinished() would be false, ending up in producing
+ // flaky failures.
+ waitUntil(() -> hasReplicationBeenScheduledOrStarted());
+
+ waitUntil(
+ () -> {
+ boolean replicationFinished = hasReplicationFinished();
+ if (!replicationFinished) {
+ boolean healthCheckPassed = healthcheck.run().result == HealthCheck.Result.PASSED;
+
+ assertWithMessage("Instance reported healthy while waiting for replication to finish")
+ // Racy condition: we need to make sure that this isn't a false alarm
+ // and accept the case when the replication finished between the
+ // if(!replicationFinished) check and the assertion here
+ .that(!healthCheckPassed || hasReplicationFinished())
+ .isTrue();
+ }
+ return replicationFinished;
+ });
+
+ assertThat(healthcheck.run().result).isEqualTo(HealthCheck.Result.PASSED);
+ }
+
+ private boolean hasReplicationFinished() {
+ return inMemoryMetrics()
+ .counterValue("tasks/completed", TEST_REPLICATION_REMOTE)
+ .filter(counter -> counter > 0)
+ .isPresent();
+ }
+
+ private boolean hasReplicationBeenScheduledOrStarted() {
+ return inMemoryMetrics()
+ .counterValue("tasks/scheduled", TEST_REPLICATION_REMOTE)
+ .filter(counter -> counter > 0)
+ .isPresent()
+ || inMemoryMetrics()
+ .counterValue("tasks/started", TEST_REPLICATION_REMOTE)
+ .filter(counter -> counter > 0)
+ .isPresent();
+ }
+
+ private InMemoryMetricMaker inMemoryMetrics() {
+ return getInstance(InMemoryMetricMaker.class);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
index c791770..5e44cb9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
@@ -39,6 +39,7 @@
import com.google.gerrit.server.events.ProjectEvent;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckExtensionApiModule;
import com.googlesource.gerrit.plugins.replication.ApiModule;
import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
import com.googlesource.gerrit.plugins.replication.ReplicationConfigModule;
@@ -80,6 +81,7 @@
protected void configure() {
super.configure();
install(new ApiModule());
+ install(new HealthCheckExtensionApiModule());
DynamicSet.bind(binder(), EventListener.class)
.to(BufferedEventListener.class)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
index 9cb2dc9..ed71545 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
@@ -14,6 +14,8 @@
package com.googlesource.gerrit.plugins.replication.pull;
+import static com.google.gerrit.acceptance.GitUtil.assertPushOk;
+import static com.google.gerrit.acceptance.GitUtil.pushOne;
import static java.util.stream.Collectors.toList;
import com.google.common.base.Suppliers;
@@ -38,10 +40,12 @@
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.util.FS;
@@ -179,4 +183,12 @@
event.instanceId = instanceId;
return event;
}
+
+ protected Ref createNewRef() throws Exception {
+ String newRef = "refs/heads/" + UUID.randomUUID();
+ RevCommit newCommit = testRepo.branch("HEAD").commit().create();
+ testRepo.branch(newRef).update(newCommit);
+ assertPushOk(pushOne(testRepo, newRef, newRef, false, false, List.of()), newRef);
+ return testRepo.getRepository().exactRef(newRef);
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java
index a20f617..7bbd1e2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java
@@ -2,6 +2,7 @@
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckExtensionApiModule;
import com.googlesource.gerrit.plugins.replication.ApiModule;
public class TestPullReplicationModule extends AbstractModule {
@@ -16,6 +17,7 @@
@Override
protected void configure() {
install(new ApiModule());
+ install(new HealthCheckExtensionApiModule());
install(pullReplicationModule);
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java
index 97eda7c..6c5b537 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java
@@ -18,7 +18,10 @@
import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.HEALTHCHECK_NAME_SUFFIX;
import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.PERIOD_OF_TIME_FIELD;
import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.PROJECTS_FILTER_FIELD;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.when;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.metrics.DisabledMetricMaker;
import com.google.gerrit.metrics.MetricMaker;
@@ -30,28 +33,110 @@
import com.googlesource.gerrit.plugins.healthcheck.check.HealthCheck;
import com.googlesource.gerrit.plugins.replication.ConfigResource;
import com.googlesource.gerrit.plugins.replication.MergedConfigResource;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
+import java.util.ArrayList;
import java.util.List;
+import java.util.stream.IntStream;
import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+@RunWith(MockitoJUnitRunner.class)
public class PullReplicationTasksHealthCheckTest {
private static final String PLUGIN_NAME = "pull-replication";
private static final String SECTION_NAME = PLUGIN_NAME + HEALTHCHECK_NAME_SUFFIX;
+ private final int periodOfCheckSec = 10;
+ @Mock private SourcesCollection sourcesCollection;
+
+ @Mock private Source source;
+ @Mock private Source anotherSource;
+
+ @Before
+ public void setUp() {
+ when(sourcesCollection.getAll()).thenReturn(List.of(source));
+ }
+
@Test
- public void shouldAlwaysPass() {
+ public void shouldReadConfig() {
List<String> projectsToCheck = List.of("foo", "bar/baz");
- int periodOfCheckSec = 10;
- Injector injector = testInjector(new TestModule(projectsToCheck, 10 + " sec"));
+ Injector injector = testInjector(new TestModule(projectsToCheck, periodOfCheckSec + " sec"));
PullReplicationTasksHealthCheck check =
injector.getInstance(PullReplicationTasksHealthCheck.class);
- assertThat(check.run().result).isEqualTo(HealthCheck.Result.PASSED);
assertThat(check.getProjects()).containsExactlyElementsIn(projectsToCheck);
assertThat(check.getPeriodOfTimeSec()).isEqualTo(periodOfCheckSec);
}
+ @Test
+ public void shouldOnlyCheckTasksForReposThatMatchTheRepoFilter() {
+ String repo = "foo";
+ when(source.zeroPendingTasksForRepo(Project.nameKey(repo))).thenReturn(false).thenReturn(true);
+ when(source.zeroInflightTasksForRepo(Project.nameKey(repo))).thenReturn(false).thenReturn(true);
+ lenient().when(source.zeroInflightTasksForRepo(Project.nameKey("ignored"))).thenReturn(false);
+ lenient().when(source.pendingTasksCount()).thenReturn(10L);
+
+ Injector injector = testInjector(new TestModule(List.of(repo), periodOfCheckSec + " sec"));
+
+ PullReplicationTasksHealthCheck check =
+ injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+ List<HealthCheck.Result> checkResults = runNTimes(3, check);
+ assertThat(checkResults)
+ .containsExactly(
+ HealthCheck.Result.FAILED, HealthCheck.Result.FAILED, HealthCheck.Result.PASSED);
+ }
+
+ @Test
+ public void shouldCheckAllOutstandingTasksWhenRepoFilterIsNotConfigured() {
+ List<String> noRepoFilter = List.of();
+ when(source.pendingTasksCount()).thenReturn(1L).thenReturn(0L);
+ when(source.inflightTasksCount()).thenReturn(1L).thenReturn(0L);
+
+ Injector injector = testInjector(new TestModule(noRepoFilter, periodOfCheckSec + " sec"));
+ PullReplicationTasksHealthCheck check =
+ injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+ List<HealthCheck.Result> checkResults = runNTimes(3, check);
+ assertThat(checkResults)
+ .containsExactly(
+ HealthCheck.Result.FAILED, HealthCheck.Result.FAILED, HealthCheck.Result.PASSED);
+ }
+
+ @Test
+ public void shouldCheckTasksAcrossSources() {
+ when(sourcesCollection.getAll()).thenReturn(List.of(source, anotherSource));
+ when(source.pendingTasksCount()).thenReturn(1L).thenReturn(0L);
+ when(source.inflightTasksCount()).thenReturn(1L).thenReturn(0L);
+ when(anotherSource.pendingTasksCount()).thenReturn(1L).thenReturn(0L);
+ when(anotherSource.inflightTasksCount()).thenReturn(1L).thenReturn(0L);
+
+ Injector injector = testInjector(new TestModule(List.of(), periodOfCheckSec + " sec"));
+ PullReplicationTasksHealthCheck check =
+ injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+ List<HealthCheck.Result> checkResults = runNTimes(5, check);
+ assertThat(checkResults)
+ .containsExactly(
+ HealthCheck.Result.FAILED,
+ HealthCheck.Result.FAILED,
+ HealthCheck.Result.FAILED,
+ HealthCheck.Result.FAILED,
+ HealthCheck.Result.PASSED);
+ }
+
+ private List<HealthCheck.Result> runNTimes(int nTimes, PullReplicationTasksHealthCheck check) {
+ List<HealthCheck.Result> results = new ArrayList<>();
+ IntStream.rangeClosed(1, nTimes).mapToObj(i -> check.run().result).forEach(results::add);
+
+ return results;
+ }
+
private Injector testInjector(AbstractModule testModule) {
return Guice.createInjector(new HealthCheckExtensionApiModule(), testModule);
}
@@ -89,6 +174,7 @@
bind(MetricMaker.class).toInstance(new DisabledMetricMaker());
bind(HealthCheckConfig.class).toInstance(healthCheckConfig);
bind(String.class).annotatedWith(PluginName.class).toInstance(PLUGIN_NAME);
+ bind(SourcesCollection.class).toInstance(sourcesCollection);
}
}
}