Merge changes from topic "pull-replication-healthcheck"
* changes:
Always report healthy when instance has caught up with outstanding tasks
Assess healthiness by factoring in periodOfTime
Implement outstanding-tasks healthcheck
Read heathcheck settings from plugin configuration
diff --git a/BUILD b/BUILD
index 9a37098..4070d7c 100644
--- a/BUILD
+++ b/BUILD
@@ -38,6 +38,7 @@
":pull_replication_util",
"//plugins/delete-project",
"//plugins/events-broker",
+ "//plugins/healthcheck",
"//plugins/replication",
],
)
@@ -48,6 +49,7 @@
tags = ["pull-replication"],
visibility = ["//visibility:public"],
deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
+ ":healthcheck-neverlink",
":pull-replication__plugin",
":pull_replication_util",
"//plugins/replication",
@@ -67,6 +69,7 @@
deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
":pull-replication__plugin",
"//plugins/delete-project",
+ "//plugins/healthcheck",
"//plugins/replication",
],
)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 0a55043..9283033 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -934,14 +934,22 @@
}
}
- long inflightTasksCount() {
+ public long inflightTasksCount() {
return inFlight.size();
}
- long pendingTasksCount() {
+ public long pendingTasksCount() {
return pending.size();
}
+ public boolean zeroPendingTasksForRepo(Project.NameKey project) {
+ return pending.values().stream().noneMatch(fetch -> fetch.getProjectNameKey().equals(project));
+ }
+
+ public boolean zeroInflightTasksForRepo(Project.NameKey project) {
+ return inFlight.values().stream().noneMatch(fetch -> fetch.getProjectNameKey().equals(project));
+ }
+
private static boolean matches(URIish uri, String urlMatch) {
if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java
index ea6fdb5..778a6cc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java
@@ -14,28 +14,107 @@
package com.googlesource.gerrit.plugins.replication.pull.health;
+import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.config.ConfigUtil;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.healthcheck.HealthCheckConfig;
import com.googlesource.gerrit.plugins.healthcheck.check.AbstractHealthCheck;
+import com.googlesource.gerrit.plugins.healthcheck.check.HealthCheck;
+import com.googlesource.gerrit.plugins.replication.MergedConfigResource;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jgit.lib.Config;
@Singleton
public class PullReplicationTasksHealthCheck extends AbstractHealthCheck {
+ private static final long DEFAULT_PERIOD_OF_TIME_SECS = 10L;
+ public static final String HEALTHCHECK_NAME_SUFFIX = "-outstanding-tasks";
+ public static final String PROJECTS_FILTER_FIELD = "projects";
+ public static final String PERIOD_OF_TIME_FIELD = "periodOfTime";
+ private final Set<String> projects;
+ private final long periodOfTimeNanos;
+ private final SourcesCollection sourcesCollection;
+ private final Ticker ticker;
+ private Optional<Long> successfulSince = Optional.empty();
+ private boolean isCaughtUp;
@Inject
public PullReplicationTasksHealthCheck(
ListeningExecutorService executor,
- HealthCheckConfig config,
+ HealthCheckConfig healthCheckConfig,
+ MergedConfigResource configResource,
@PluginName String name,
- MetricMaker metricMaker) {
- super(executor, config, name + "-tasks", metricMaker);
+ MetricMaker metricMaker,
+ SourcesCollection sourcesCollection,
+ Ticker ticker) {
+ super(executor, healthCheckConfig, name + HEALTHCHECK_NAME_SUFFIX, metricMaker);
+ String healthCheckName = name + HEALTHCHECK_NAME_SUFFIX;
+
+ Config replicationConfig = configResource.getConfig();
+ this.projects =
+ Set.of(
+ replicationConfig.getStringList(
+ HealthCheckConfig.HEALTHCHECK, healthCheckName, PROJECTS_FILTER_FIELD));
+ this.periodOfTimeNanos =
+ ConfigUtil.getTimeUnit(
+ replicationConfig,
+ HealthCheckConfig.HEALTHCHECK,
+ healthCheckName,
+ PERIOD_OF_TIME_FIELD,
+ DEFAULT_PERIOD_OF_TIME_SECS,
+ TimeUnit.NANOSECONDS);
+ this.sourcesCollection = sourcesCollection;
+ this.ticker = ticker;
+ }
+
+ public long getPeriodOfTimeNanos() {
+ return periodOfTimeNanos;
+ }
+
+ public Set<String> getProjects() {
+ return ImmutableSet.copyOf(projects);
}
@Override
protected Result doCheck() throws Exception {
- return Result.PASSED;
+ if (isCaughtUp) {
+ return Result.PASSED;
+ }
+ long checkTime = ticker.read();
+ List<Source> sources = sourcesCollection.getAll();
+ boolean hasNoOutstandingTasks =
+ sources.stream()
+ .allMatch(
+ source -> {
+ if (projects.isEmpty()) {
+ return source.pendingTasksCount() == 0 && source.inflightTasksCount() == 0;
+ } else {
+ return projects.stream()
+ .allMatch(
+ project ->
+ source.zeroPendingTasksForRepo(Project.nameKey(project))
+ && source.zeroInflightTasksForRepo(Project.nameKey(project)));
+ }
+ });
+ successfulSince =
+ hasNoOutstandingTasks ? successfulSince.or(() -> Optional.of(checkTime)) : Optional.empty();
+ return reportResult(checkTime);
+ }
+
+ private HealthCheck.Result reportResult(long checkTime) {
+ Optional<Result> maybePassed =
+ successfulSince.filter(ss -> checkTime >= ss + periodOfTimeNanos).map(ss -> Result.PASSED);
+ isCaughtUp = maybePassed.isPresent();
+ return maybePassed.orElse(Result.FAILED);
}
}
diff --git a/src/main/resources/Documentation/healthcheck.md b/src/main/resources/Documentation/healthcheck.md
new file mode 100644
index 0000000..afe773b
--- /dev/null
+++ b/src/main/resources/Documentation/healthcheck.md
@@ -0,0 +1,68 @@
+@PLUGIN@ health checks
+==============
+
+The @PLUGIN@ plugin registers the `pull-replication-outstanding-tasks`
+healthcheck. This check will mark a gerrit instance as healthy upon
+startup only when the node has caught up with all the outstanding
+pull-replication tasks. The goal is to mark the node as healthy when it
+is ready to receive write traffic. "Caught up" means:
+
+- All pending & in-flight replication tasks across all sources (or
+across a configurable set of repos) have completed
+- There are no queued replication tasks pending and the above condition
+lasts for at least N seconds (configurable)
+
+See [Healthcheck based on replication tasks](https://issues.gerritcodereview.com/issues/312895374) for more details.
+
+**It is worth noting that once the healthcheck eventually succeeds and
+the instance is marked healthy, the check is then skipped (ie any
+subsequent invocations will always mark the instance as healthy
+irrespective of any pending or inflight tasks being present).**
+
+Health check configuration
+--------------------------
+
+The configuration of the health check is split across two files.
+- The "standard" properties commonly available to all other checks
+of the `healthcheck` plugin. These are set in the `healthcheck` plugin's
+[config file](https://gerrit.googlesource.com/plugins/healthcheck/+/refs/heads/master/src/main/resources/Documentation/config.md#settings).
+- Settings specific to the check are set in the plugin's [config file](./config.md#file-pluginconfig).
+
+
+The health check can be configured as follows:
+- `healthcheck.@PLUGIN@-outstanding-tasks.projects`: The repo(s) that
+the health check will track outstanding replication tasks against.
+Multiple entries are supported. If not specified, all the outstanding
+replication tasks are tracked.
+- `healthcheck.@PLUGIN@-outstanding-tasks.periodOfTime`: The time for
+which the check needs to be successful, in order for the instance to be
+marked healthy. If the time unit is omitted it defaults to milliseconds.
+Values should use common unit suffixes to express their setting:
+
+* ms, milliseconds
+* s, sec, second, seconds
+* m, min, minute, minutes
+* h, hr, hour, hours
+
+Default is 10s.
+
+This example config will report the node healthy when there are no
+pending tasks for the `foo` and `bar/baz` repos continuously for a
+period of 5 seconds after the plugin startup.
+```
+[healthcheck "pull-replication-tasks"]
+ projects = foo
+ projects = bar/baz
+ periodOfTime = 5 sec
+```
+
+Useful information
+------------------
+
+- The health check is registered only when the [healthcheck](https://gerrit.googlesource.com/plugins/healthcheck) plugin
+is installed. If the `healthcheck` plugin is not installed, then the
+check registration is skipped during load of the pull-replication
+plugin.
+- Because the pull-replication healthcheck depends on the `healthcheck` plugin, renaming/removing the `healthcheck`
+jar file is not supported during runtime. Doing so can lead to unpredictable behaviour of your gerrit instance.
+
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java
new file mode 100644
index 0000000..346b7fa
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java
@@ -0,0 +1,242 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.truth.Truth.assertWithMessage;
+import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.HEALTHCHECK_NAME_SUFFIX;
+import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.PERIOD_OF_TIME_FIELD;
+
+import com.google.common.base.Stopwatch;
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.config.GerritConfig;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.events.ProjectEvent;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckConfig;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckExtensionApiModule;
+import com.googlesource.gerrit.plugins.healthcheck.check.HealthCheck;
+import com.googlesource.gerrit.plugins.replication.ApiModule;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfigModule;
+import com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.junit.Test;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+ name = "pull-replication",
+ sysModule =
+ "com.googlesource.gerrit.plugins.replication.pull.PullReplicationHealthCheckIT$PullReplicationHealthCheckTestModule",
+ httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class PullReplicationHealthCheckIT extends PullReplicationSetupBase {
+ private static final int TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC = 5;
+ private static final String TEST_PLUGIN_NAME = "pull-replication";
+
+ public static class PullReplicationHealthCheckTestModule extends AbstractModule {
+ private final PullReplicationModule pullReplicationModule;
+
+ @Inject
+ PullReplicationHealthCheckTestModule(
+ ReplicationConfigModule configModule, InMemoryMetricMaker memMetric) {
+ this.pullReplicationModule = new PullReplicationModule(configModule, memMetric);
+ }
+
+ @Override
+ protected void configure() {
+ install(new ApiModule());
+ install(new HealthCheckExtensionApiModule());
+ install(pullReplicationModule);
+
+ DynamicSet.bind(binder(), EventListener.class)
+ .to(PullReplicationHealthCheckIT.BufferedEventListener.class)
+ .asEagerSingleton();
+ }
+ }
+
+ @Inject private SitePaths sitePaths;
+ private PullReplicationHealthCheckIT.BufferedEventListener eventListener;
+ private final int periodOfTimeSecs = 1;
+
+ @Singleton
+ public static class BufferedEventListener implements EventListener {
+
+ private final List<Event> eventsReceived;
+ private String eventTypeFilter;
+
+ @Inject
+ public BufferedEventListener() {
+ eventsReceived = new ArrayList<>();
+ }
+
+ @Override
+ public void onEvent(Event event) {
+ if (event.getType().equals(eventTypeFilter)) {
+ eventsReceived.add(event);
+ }
+ }
+
+ public void clearFilter(String expectedEventType) {
+ eventsReceived.clear();
+ eventTypeFilter = expectedEventType;
+ }
+
+ public int numEventsReceived() {
+ return eventsReceived.size();
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends Event> Stream<T> eventsStream() {
+ return (Stream<T>) eventsReceived.stream();
+ }
+ }
+
+ @Override
+ public void setUpTestPlugin() throws Exception {
+
+ FileBasedConfig config =
+ new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+ config.setString("replication", null, "syncRefs", "ALL REFS ASYNC");
+ config.setString(
+ HealthCheckConfig.HEALTHCHECK,
+ "pull-replication" + HEALTHCHECK_NAME_SUFFIX,
+ PERIOD_OF_TIME_FIELD,
+ periodOfTimeSecs + " sec");
+ config.save();
+
+ super.setUpTestPlugin(true);
+ eventListener = getInstance(PullReplicationHealthCheckIT.BufferedEventListener.class);
+ }
+
+ @Override
+ protected boolean useBatchRefUpdateEvent() {
+ return false;
+ }
+
+ @Override
+ protected void setReplicationSource(
+ String remoteName, List<String> replicaSuffixes, Optional<String> project)
+ throws IOException {
+ List<String> fetchUrls =
+ buildReplicaURLs(replicaSuffixes, s -> gitPath.resolve("${name}" + s + ".git").toString());
+ config.setStringList("remote", remoteName, "url", fetchUrls);
+ config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
+ config.setString("remote", remoteName, "fetch", "+refs/*:refs/*");
+ config.setInt("remote", remoteName, "timeout", 600);
+ config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+ project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+ config.setBoolean("gerrit", null, "autoReload", true);
+ config.setInt("replication", null, "maxApiPayloadSize", 1);
+ config.setString(
+ HealthCheckConfig.HEALTHCHECK,
+ TEST_PLUGIN_NAME + HEALTHCHECK_NAME_SUFFIX,
+ PERIOD_OF_TIME_FIELD,
+ TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC + " sec");
+ config.save();
+ }
+
+ @Test
+ @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
+ public void shouldReportInstanceHealthyWhenThereAreNoOutstandingReplicationTasks()
+ throws Exception {
+ Stopwatch testStartStopwatch = Stopwatch.createUnstarted();
+ testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+ PullReplicationTasksHealthCheck healthcheck =
+ getInstance(PullReplicationTasksHealthCheck.class);
+
+ Ref newRef = createNewRef();
+
+ ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
+ ProjectEvent event =
+ generateUpdateEvent(
+ project,
+ newRef.getName(),
+ ObjectId.zeroId().getName(),
+ newRef.getObjectId().getName(),
+ TEST_REPLICATION_REMOTE);
+
+ eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
+ pullReplicationQueue.onEvent(event);
+ // If replication hasn't started yet, the healthcheck returns PASSED
+ // but hasReplicationFinished() would be false, ending up in producing
+ // flaky failures.
+ waitUntil(() -> hasReplicationBeenScheduledOrStarted());
+
+ waitUntil(
+ () -> {
+ boolean healthCheckPassed = healthcheck.run().result == HealthCheck.Result.PASSED;
+ boolean replicationFinished = hasReplicationFinished();
+ if (!replicationFinished) {
+ assertWithMessage("Instance reported healthy while waiting for replication to finish")
+ // Racy condition: we need to make sure that this isn't a false alarm
+ // and accept the case when the replication finished between the
+ // if(!replicationFinished) check and the assertion here
+ .that(!healthCheckPassed || hasReplicationFinished())
+ .isTrue();
+ } else {
+ if (!testStartStopwatch.isRunning()) {
+ testStartStopwatch.start();
+ }
+ }
+ return replicationFinished;
+ });
+
+ if (testStartStopwatch.elapsed(TimeUnit.SECONDS) < TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC) {
+ assertThat(healthcheck.run().result).isEqualTo(HealthCheck.Result.FAILED);
+ }
+
+ waitUntil(
+ () -> testStartStopwatch.elapsed(TimeUnit.SECONDS) > TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC);
+ assertThat(healthcheck.run().result).isEqualTo(HealthCheck.Result.PASSED);
+ }
+
+ private boolean hasReplicationFinished() {
+ return inMemoryMetrics()
+ .counterValue("tasks/completed", TEST_REPLICATION_REMOTE)
+ .filter(counter -> counter > 0)
+ .isPresent();
+ }
+
+ private boolean hasReplicationBeenScheduledOrStarted() {
+ return inMemoryMetrics()
+ .counterValue("tasks/scheduled", TEST_REPLICATION_REMOTE)
+ .filter(counter -> counter > 0)
+ .isPresent()
+ || inMemoryMetrics()
+ .counterValue("tasks/started", TEST_REPLICATION_REMOTE)
+ .filter(counter -> counter > 0)
+ .isPresent();
+ }
+
+ private InMemoryMetricMaker inMemoryMetrics() {
+ return getInstance(InMemoryMetricMaker.class);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
index c791770..5e44cb9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
@@ -39,6 +39,7 @@
import com.google.gerrit.server.events.ProjectEvent;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckExtensionApiModule;
import com.googlesource.gerrit.plugins.replication.ApiModule;
import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
import com.googlesource.gerrit.plugins.replication.ReplicationConfigModule;
@@ -80,6 +81,7 @@
protected void configure() {
super.configure();
install(new ApiModule());
+ install(new HealthCheckExtensionApiModule());
DynamicSet.bind(binder(), EventListener.class)
.to(BufferedEventListener.class)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
index 9cb2dc9..ed71545 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
@@ -14,6 +14,8 @@
package com.googlesource.gerrit.plugins.replication.pull;
+import static com.google.gerrit.acceptance.GitUtil.assertPushOk;
+import static com.google.gerrit.acceptance.GitUtil.pushOne;
import static java.util.stream.Collectors.toList;
import com.google.common.base.Suppliers;
@@ -38,10 +40,12 @@
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.util.FS;
@@ -179,4 +183,12 @@
event.instanceId = instanceId;
return event;
}
+
+ protected Ref createNewRef() throws Exception {
+ String newRef = "refs/heads/" + UUID.randomUUID();
+ RevCommit newCommit = testRepo.branch("HEAD").commit().create();
+ testRepo.branch(newRef).update(newCommit);
+ assertPushOk(pushOne(testRepo, newRef, newRef, false, false, List.of()), newRef);
+ return testRepo.getRepository().exactRef(newRef);
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java
index a20f617..7bbd1e2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java
@@ -2,6 +2,7 @@
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckExtensionApiModule;
import com.googlesource.gerrit.plugins.replication.ApiModule;
public class TestPullReplicationModule extends AbstractModule {
@@ -16,6 +17,7 @@
@Override
protected void configure() {
install(new ApiModule());
+ install(new HealthCheckExtensionApiModule());
install(pullReplicationModule);
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java
new file mode 100644
index 0000000..06fa996
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java
@@ -0,0 +1,275 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.health;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.HEALTHCHECK_NAME_SUFFIX;
+import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.PERIOD_OF_TIME_FIELD;
+import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.PROJECTS_FILTER_FIELD;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Ticker;
+import com.google.common.testing.FakeTicker;
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.metrics.DisabledMetricMaker;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckConfig;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckExtensionApiModule;
+import com.googlesource.gerrit.plugins.healthcheck.check.HealthCheck;
+import com.googlesource.gerrit.plugins.replication.ConfigResource;
+import com.googlesource.gerrit.plugins.replication.MergedConfigResource;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PullReplicationTasksHealthCheckTest {
+ private static final String PLUGIN_NAME = "pull-replication";
+ private static final String SECTION_NAME = PLUGIN_NAME + HEALTHCHECK_NAME_SUFFIX;
+ private static final String ZERO_PERIOD_OF_TIME = "0 sec";
+ private static final Optional<String> NO_PROJECT_FILTER = Optional.empty();
+
+ private final int periodOfTimeMillis = 10;
+ private final String periodOfTimeMillisStr = periodOfTimeMillis + " ms";
+ private final FakeTicker fakeTicker = new FakeTicker();
+ @Mock private SourcesCollection sourcesCollection;
+
+ @Mock private Source source;
+ @Mock private Source anotherSource;
+
+ @Before
+ public void setUp() {
+ when(sourcesCollection.getAll()).thenReturn(List.of(source));
+ }
+
+ @Test
+ public void shouldReadConfig() {
+ List<String> projectsToCheck = List.of("foo", "bar/baz");
+ Injector injector = testInjector(new TestModule(projectsToCheck, periodOfTimeMillisStr));
+
+ PullReplicationTasksHealthCheck check =
+ injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+ assertThat(check.getProjects()).containsExactlyElementsIn(projectsToCheck);
+ assertThat(check.getPeriodOfTimeNanos())
+ .isEqualTo(TimeUnit.MILLISECONDS.toNanos(periodOfTimeMillis));
+ }
+
+ @Test
+ public void shouldOnlyCheckTasksForReposThatMatchTheRepoFilter() {
+ int numIterations = 3;
+ String repo = "foo";
+ when(source.zeroPendingTasksForRepo(Project.nameKey(repo))).thenReturn(false).thenReturn(true);
+ when(source.zeroInflightTasksForRepo(Project.nameKey(repo))).thenReturn(false).thenReturn(true);
+ lenient().when(source.zeroInflightTasksForRepo(Project.nameKey("ignored"))).thenReturn(false);
+ lenient().when(source.pendingTasksCount()).thenReturn(10L);
+
+ PullReplicationTasksHealthCheck check = newPullReplicationTasksHealthCheck(Optional.of(repo));
+
+ List<HealthCheck.Result> checkResults = runNTimes(numIterations, check);
+ assertThat(checkResults)
+ .containsExactly(
+ HealthCheck.Result.FAILED, HealthCheck.Result.FAILED, HealthCheck.Result.PASSED);
+ }
+
+ @Test
+ public void shouldCheckAllOutstandingTasksWhenRepoFilterIsNotConfigured() {
+ when(source.pendingTasksCount()).thenReturn(1L).thenReturn(0L);
+ when(source.inflightTasksCount()).thenReturn(1L).thenReturn(0L);
+
+ PullReplicationTasksHealthCheck check = newPullReplicationTasksHealthCheck(NO_PROJECT_FILTER);
+
+ List<HealthCheck.Result> checkResults = runNTimes(3, check);
+ assertThat(checkResults)
+ .containsExactly(
+ HealthCheck.Result.FAILED, HealthCheck.Result.FAILED, HealthCheck.Result.PASSED);
+ }
+
+ @Test
+ public void shouldCheckTasksAcrossSources() {
+ when(sourcesCollection.getAll()).thenReturn(List.of(source, anotherSource));
+ when(source.pendingTasksCount()).thenReturn(1L).thenReturn(0L);
+ when(source.inflightTasksCount()).thenReturn(1L).thenReturn(0L);
+ when(anotherSource.pendingTasksCount()).thenReturn(1L).thenReturn(0L);
+ when(anotherSource.inflightTasksCount()).thenReturn(1L).thenReturn(0L);
+
+ PullReplicationTasksHealthCheck check = newPullReplicationTasksHealthCheck(NO_PROJECT_FILTER);
+
+ List<HealthCheck.Result> checkResults = runNTimes(5, check);
+ assertThat(checkResults)
+ .containsExactly(
+ HealthCheck.Result.FAILED,
+ HealthCheck.Result.FAILED,
+ HealthCheck.Result.FAILED,
+ HealthCheck.Result.FAILED,
+ HealthCheck.Result.PASSED);
+ }
+
+ @Test
+ public void shouldFailIfCheckDoesNotReportHealthyConsistentlyOverPeriodOfTime() {
+ long healthCheckPeriodOfTimeMsec = 50L;
+ String healthCheckPeriodOfTime = healthCheckPeriodOfTimeMsec + " ms";
+ int checkInterations = 3;
+ Duration checkInterval =
+ Duration.ofMillis(healthCheckPeriodOfTimeMsec).dividedBy(checkInterations);
+ long fakeTimerStartNanos = fakeTicker.read();
+ when(source.pendingTasksCount()).thenReturn(0L).thenReturn(1L).thenReturn(0L);
+ when(source.inflightTasksCount()).thenReturn(0L);
+
+ Injector injector = testInjector(new TestModule(List.of(), healthCheckPeriodOfTime));
+ PullReplicationTasksHealthCheck check =
+ injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+ List<HealthCheck.Result> checkResults =
+ runNTimes(checkInterations, check, () -> fakeTicker.advance(checkInterval));
+
+ assertThat(checkResults).doesNotContain(HealthCheck.Result.PASSED);
+ assertThat(fakeTicker.read())
+ .isAtLeast(Duration.ofMillis(periodOfTimeMillis).plusNanos(fakeTimerStartNanos).toNanos());
+ }
+
+ @Test
+ public void shouldFailOnFirstInvocationEvenIfThereAreNoOutstandingTasksAndANonZeroPeriodOfTime() {
+ mockSourceWithNoOutstandingTasks();
+
+ Injector injector = testInjector(new TestModule(List.of(), periodOfTimeMillisStr));
+ PullReplicationTasksHealthCheck check =
+ injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+ assertThat(check.run().result).isEqualTo(HealthCheck.Result.FAILED);
+ }
+
+ @Test
+ public void
+ shouldReportHealthyOnFirstInvocationIfThereAreNoOutstandingTasksAndAZeroPeriodOfTime() {
+ mockSourceWithNoOutstandingTasks();
+
+ Injector injector = testInjector(new TestModule(List.of(), ZERO_PERIOD_OF_TIME));
+ PullReplicationTasksHealthCheck check =
+ injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+ assertThat(check.run().result).isEqualTo(HealthCheck.Result.PASSED);
+ }
+
+ @Test
+ public void shouldAlwaysReportHealthyAfterItHasCaughtUpWithOutstandingTasks() {
+ int numCheckIterations = 2;
+ when(source.pendingTasksCount()).thenReturn(0L, 0L, 1L);
+ when(source.inflightTasksCount()).thenReturn(0L);
+
+ Injector injector = testInjector(new TestModule(List.of(), periodOfTimeMillisStr));
+ PullReplicationTasksHealthCheck check =
+ injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+ assertThat(
+ runNTimes(
+ numCheckIterations,
+ check,
+ () -> fakeTicker.advance(Duration.ofMillis(periodOfTimeMillis))))
+ .containsExactly(HealthCheck.Result.FAILED, HealthCheck.Result.PASSED);
+
+ assertThat(check.run().result).isEqualTo(HealthCheck.Result.PASSED);
+ }
+
+ private Injector testInjector(AbstractModule testModule) {
+ return Guice.createInjector(new HealthCheckExtensionApiModule(), testModule);
+ }
+
+ private List<HealthCheck.Result> runNTimes(int nTimes, PullReplicationTasksHealthCheck check) {
+ return runNTimes(nTimes, check, null);
+ }
+
+ private PullReplicationTasksHealthCheck newPullReplicationTasksHealthCheck(
+ Optional<String> projectNameToCheck) {
+ Injector injector =
+ testInjector(new TestModule(projectNameToCheck.stream().toList(), ZERO_PERIOD_OF_TIME));
+
+ PullReplicationTasksHealthCheck check =
+ injector.getInstance(PullReplicationTasksHealthCheck.class);
+ return check;
+ }
+
+ private List<HealthCheck.Result> runNTimes(
+ int nTimes, PullReplicationTasksHealthCheck check, @Nullable Runnable postRunFunc) {
+ List<HealthCheck.Result> results = new ArrayList<>();
+ for (int i = 0; i < nTimes; i++) {
+ results.add(check.run().result);
+ if (postRunFunc != null) {
+ postRunFunc.run();
+ }
+ }
+
+ return results;
+ }
+
+ private void mockSourceWithNoOutstandingTasks() {
+ when(source.pendingTasksCount()).thenReturn(0L);
+ when(source.inflightTasksCount()).thenReturn(0L);
+ }
+
+ private class TestModule extends AbstractModule {
+ Config config;
+ MergedConfigResource configResource;
+ private final HealthCheckConfig healthCheckConfig = HealthCheckConfig.DEFAULT_CONFIG;
+
+ public TestModule(List<String> projects, String periodOfTime) {
+ this.config = new Config();
+ config.setStringList(
+ HealthCheckConfig.HEALTHCHECK, SECTION_NAME, PROJECTS_FILTER_FIELD, projects);
+ config.setString(
+ HealthCheckConfig.HEALTHCHECK, SECTION_NAME, PERIOD_OF_TIME_FIELD, periodOfTime);
+ configResource =
+ MergedConfigResource.withBaseOnly(
+ new ConfigResource() {
+ @Override
+ public Config getConfig() {
+ return config;
+ }
+
+ @Override
+ public String getVersion() {
+ return "";
+ }
+ });
+ }
+
+ @Override
+ protected void configure() {
+ bind(Config.class).toInstance(config);
+ bind(MergedConfigResource.class).toInstance(configResource);
+ bind(MetricMaker.class).toInstance(new DisabledMetricMaker());
+ bind(HealthCheckConfig.class).toInstance(healthCheckConfig);
+ bind(String.class).annotatedWith(PluginName.class).toInstance(PLUGIN_NAME);
+ bind(SourcesCollection.class).toInstance(sourcesCollection);
+ bind(Ticker.class).toInstance(fakeTicker);
+ }
+ }
+}