Merge changes from topic "pull-replication-healthcheck"

* changes:
  Always report healthy when instance has caught up with outstanding tasks
  Assess healthiness by factoring in periodOfTime
  Implement outstanding-tasks healthcheck
  Read heathcheck settings from plugin configuration
diff --git a/BUILD b/BUILD
index 9a37098..4070d7c 100644
--- a/BUILD
+++ b/BUILD
@@ -38,6 +38,7 @@
         ":pull_replication_util",
         "//plugins/delete-project",
         "//plugins/events-broker",
+        "//plugins/healthcheck",
         "//plugins/replication",
     ],
 )
@@ -48,6 +49,7 @@
     tags = ["pull-replication"],
     visibility = ["//visibility:public"],
     deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
+        ":healthcheck-neverlink",
         ":pull-replication__plugin",
         ":pull_replication_util",
         "//plugins/replication",
@@ -67,6 +69,7 @@
     deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
         ":pull-replication__plugin",
         "//plugins/delete-project",
+        "//plugins/healthcheck",
         "//plugins/replication",
     ],
 )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 0a55043..9283033 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -934,14 +934,22 @@
     }
   }
 
-  long inflightTasksCount() {
+  public long inflightTasksCount() {
     return inFlight.size();
   }
 
-  long pendingTasksCount() {
+  public long pendingTasksCount() {
     return pending.size();
   }
 
+  public boolean zeroPendingTasksForRepo(Project.NameKey project) {
+    return pending.values().stream().noneMatch(fetch -> fetch.getProjectNameKey().equals(project));
+  }
+
+  public boolean zeroInflightTasksForRepo(Project.NameKey project) {
+    return inFlight.values().stream().noneMatch(fetch -> fetch.getProjectNameKey().equals(project));
+  }
+
   private static boolean matches(URIish uri, String urlMatch) {
     if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
       return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java
index ea6fdb5..778a6cc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java
@@ -14,28 +14,107 @@
 
 package com.googlesource.gerrit.plugins.replication.pull.health;
 
+import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.config.ConfigUtil;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.healthcheck.HealthCheckConfig;
 import com.googlesource.gerrit.plugins.healthcheck.check.AbstractHealthCheck;
+import com.googlesource.gerrit.plugins.healthcheck.check.HealthCheck;
+import com.googlesource.gerrit.plugins.replication.MergedConfigResource;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jgit.lib.Config;
 
 @Singleton
 public class PullReplicationTasksHealthCheck extends AbstractHealthCheck {
+  private static final long DEFAULT_PERIOD_OF_TIME_SECS = 10L;
+  public static final String HEALTHCHECK_NAME_SUFFIX = "-outstanding-tasks";
+  public static final String PROJECTS_FILTER_FIELD = "projects";
+  public static final String PERIOD_OF_TIME_FIELD = "periodOfTime";
+  private final Set<String> projects;
+  private final long periodOfTimeNanos;
+  private final SourcesCollection sourcesCollection;
+  private final Ticker ticker;
+  private Optional<Long> successfulSince = Optional.empty();
+  private boolean isCaughtUp;
 
   @Inject
   public PullReplicationTasksHealthCheck(
       ListeningExecutorService executor,
-      HealthCheckConfig config,
+      HealthCheckConfig healthCheckConfig,
+      MergedConfigResource configResource,
       @PluginName String name,
-      MetricMaker metricMaker) {
-    super(executor, config, name + "-tasks", metricMaker);
+      MetricMaker metricMaker,
+      SourcesCollection sourcesCollection,
+      Ticker ticker) {
+    super(executor, healthCheckConfig, name + HEALTHCHECK_NAME_SUFFIX, metricMaker);
+    String healthCheckName = name + HEALTHCHECK_NAME_SUFFIX;
+
+    Config replicationConfig = configResource.getConfig();
+    this.projects =
+        Set.of(
+            replicationConfig.getStringList(
+                HealthCheckConfig.HEALTHCHECK, healthCheckName, PROJECTS_FILTER_FIELD));
+    this.periodOfTimeNanos =
+        ConfigUtil.getTimeUnit(
+            replicationConfig,
+            HealthCheckConfig.HEALTHCHECK,
+            healthCheckName,
+            PERIOD_OF_TIME_FIELD,
+            DEFAULT_PERIOD_OF_TIME_SECS,
+            TimeUnit.NANOSECONDS);
+    this.sourcesCollection = sourcesCollection;
+    this.ticker = ticker;
+  }
+
+  public long getPeriodOfTimeNanos() {
+    return periodOfTimeNanos;
+  }
+
+  public Set<String> getProjects() {
+    return ImmutableSet.copyOf(projects);
   }
 
   @Override
   protected Result doCheck() throws Exception {
-    return Result.PASSED;
+    if (isCaughtUp) {
+      return Result.PASSED;
+    }
+    long checkTime = ticker.read();
+    List<Source> sources = sourcesCollection.getAll();
+    boolean hasNoOutstandingTasks =
+        sources.stream()
+            .allMatch(
+                source -> {
+                  if (projects.isEmpty()) {
+                    return source.pendingTasksCount() == 0 && source.inflightTasksCount() == 0;
+                  } else {
+                    return projects.stream()
+                        .allMatch(
+                            project ->
+                                source.zeroPendingTasksForRepo(Project.nameKey(project))
+                                    && source.zeroInflightTasksForRepo(Project.nameKey(project)));
+                  }
+                });
+    successfulSince =
+        hasNoOutstandingTasks ? successfulSince.or(() -> Optional.of(checkTime)) : Optional.empty();
+    return reportResult(checkTime);
+  }
+
+  private HealthCheck.Result reportResult(long checkTime) {
+    Optional<Result> maybePassed =
+        successfulSince.filter(ss -> checkTime >= ss + periodOfTimeNanos).map(ss -> Result.PASSED);
+    isCaughtUp = maybePassed.isPresent();
+    return maybePassed.orElse(Result.FAILED);
   }
 }
diff --git a/src/main/resources/Documentation/healthcheck.md b/src/main/resources/Documentation/healthcheck.md
new file mode 100644
index 0000000..afe773b
--- /dev/null
+++ b/src/main/resources/Documentation/healthcheck.md
@@ -0,0 +1,68 @@
+@PLUGIN@ health checks
+==============
+
+The @PLUGIN@ plugin registers the `pull-replication-outstanding-tasks`
+healthcheck. This check will mark a gerrit instance as healthy upon
+startup only when the node has caught up with all the outstanding
+pull-replication tasks. The goal is to mark the node as healthy when it
+is ready to receive write traffic. "Caught up" means:
+
+- All pending & in-flight replication tasks across all sources (or
+across a configurable set of repos) have completed
+- There are no queued replication tasks pending and the above condition
+lasts for at least N seconds (configurable)
+
+See [Healthcheck based on replication tasks](https://issues.gerritcodereview.com/issues/312895374) for more details.
+
+**It is worth noting that once the healthcheck eventually succeeds and
+the instance is marked healthy, the check is then skipped (ie any
+subsequent invocations will always mark the instance as healthy
+irrespective of any pending or inflight tasks being present).**
+
+Health check configuration
+--------------------------
+
+The configuration of the health check is split across two files.
+- The "standard" properties commonly available to all other checks
+of the `healthcheck` plugin. These are set in the `healthcheck` plugin's
+[config file](https://gerrit.googlesource.com/plugins/healthcheck/+/refs/heads/master/src/main/resources/Documentation/config.md#settings).
+- Settings specific to the check are set in the plugin's [config file](./config.md#file-pluginconfig).
+
+
+The health check can be configured as follows:
+- `healthcheck.@PLUGIN@-outstanding-tasks.projects`: The repo(s) that
+the health check will track outstanding replication tasks against.
+Multiple entries are supported. If not specified, all the outstanding
+replication tasks are tracked.
+- `healthcheck.@PLUGIN@-outstanding-tasks.periodOfTime`: The time for
+which the check needs to be successful, in order for the instance to be
+marked healthy. If the time unit is omitted it defaults to milliseconds.
+Values should use common unit suffixes to express their setting:
+
+* ms, milliseconds
+* s, sec, second, seconds
+* m, min, minute, minutes
+* h, hr, hour, hours
+
+Default is 10s.
+
+This example config will report the node healthy when there are no
+pending tasks for the `foo` and `bar/baz` repos continuously for a
+period of 5 seconds after the plugin startup.
+```
+[healthcheck "pull-replication-tasks"]
+    projects = foo
+    projects = bar/baz
+    periodOfTime = 5 sec
+```
+
+Useful information
+------------------
+
+- The health check is registered only when the [healthcheck](https://gerrit.googlesource.com/plugins/healthcheck) plugin
+is installed. If the `healthcheck` plugin is not installed, then the
+check registration is skipped during load of the pull-replication
+plugin.
+- Because the pull-replication healthcheck depends on the `healthcheck` plugin, renaming/removing the `healthcheck`
+jar file is not supported during runtime. Doing so can lead to unpredictable behaviour of your gerrit instance.
+
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java
new file mode 100644
index 0000000..346b7fa
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java
@@ -0,0 +1,242 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.truth.Truth.assertWithMessage;
+import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.HEALTHCHECK_NAME_SUFFIX;
+import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.PERIOD_OF_TIME_FIELD;
+
+import com.google.common.base.Stopwatch;
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.config.GerritConfig;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.events.ProjectEvent;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckConfig;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckExtensionApiModule;
+import com.googlesource.gerrit.plugins.healthcheck.check.HealthCheck;
+import com.googlesource.gerrit.plugins.replication.ApiModule;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfigModule;
+import com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.junit.Test;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule =
+        "com.googlesource.gerrit.plugins.replication.pull.PullReplicationHealthCheckIT$PullReplicationHealthCheckTestModule",
+    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class PullReplicationHealthCheckIT extends PullReplicationSetupBase {
+  private static final int TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC = 5;
+  private static final String TEST_PLUGIN_NAME = "pull-replication";
+
+  public static class PullReplicationHealthCheckTestModule extends AbstractModule {
+    private final PullReplicationModule pullReplicationModule;
+
+    @Inject
+    PullReplicationHealthCheckTestModule(
+        ReplicationConfigModule configModule, InMemoryMetricMaker memMetric) {
+      this.pullReplicationModule = new PullReplicationModule(configModule, memMetric);
+    }
+
+    @Override
+    protected void configure() {
+      install(new ApiModule());
+      install(new HealthCheckExtensionApiModule());
+      install(pullReplicationModule);
+
+      DynamicSet.bind(binder(), EventListener.class)
+          .to(PullReplicationHealthCheckIT.BufferedEventListener.class)
+          .asEagerSingleton();
+    }
+  }
+
+  @Inject private SitePaths sitePaths;
+  private PullReplicationHealthCheckIT.BufferedEventListener eventListener;
+  private final int periodOfTimeSecs = 1;
+
+  @Singleton
+  public static class BufferedEventListener implements EventListener {
+
+    private final List<Event> eventsReceived;
+    private String eventTypeFilter;
+
+    @Inject
+    public BufferedEventListener() {
+      eventsReceived = new ArrayList<>();
+    }
+
+    @Override
+    public void onEvent(Event event) {
+      if (event.getType().equals(eventTypeFilter)) {
+        eventsReceived.add(event);
+      }
+    }
+
+    public void clearFilter(String expectedEventType) {
+      eventsReceived.clear();
+      eventTypeFilter = expectedEventType;
+    }
+
+    public int numEventsReceived() {
+      return eventsReceived.size();
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T extends Event> Stream<T> eventsStream() {
+      return (Stream<T>) eventsReceived.stream();
+    }
+  }
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+
+    FileBasedConfig config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    config.setString("replication", null, "syncRefs", "ALL REFS ASYNC");
+    config.setString(
+        HealthCheckConfig.HEALTHCHECK,
+        "pull-replication" + HEALTHCHECK_NAME_SUFFIX,
+        PERIOD_OF_TIME_FIELD,
+        periodOfTimeSecs + " sec");
+    config.save();
+
+    super.setUpTestPlugin(true);
+    eventListener = getInstance(PullReplicationHealthCheckIT.BufferedEventListener.class);
+  }
+
+  @Override
+  protected boolean useBatchRefUpdateEvent() {
+    return false;
+  }
+
+  @Override
+  protected void setReplicationSource(
+      String remoteName, List<String> replicaSuffixes, Optional<String> project)
+      throws IOException {
+    List<String> fetchUrls =
+        buildReplicaURLs(replicaSuffixes, s -> gitPath.resolve("${name}" + s + ".git").toString());
+    config.setStringList("remote", remoteName, "url", fetchUrls);
+    config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
+    config.setString("remote", remoteName, "fetch", "+refs/*:refs/*");
+    config.setInt("remote", remoteName, "timeout", 600);
+    config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+    config.setBoolean("gerrit", null, "autoReload", true);
+    config.setInt("replication", null, "maxApiPayloadSize", 1);
+    config.setString(
+        HealthCheckConfig.HEALTHCHECK,
+        TEST_PLUGIN_NAME + HEALTHCHECK_NAME_SUFFIX,
+        PERIOD_OF_TIME_FIELD,
+        TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC + " sec");
+    config.save();
+  }
+
+  @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
+  public void shouldReportInstanceHealthyWhenThereAreNoOutstandingReplicationTasks()
+      throws Exception {
+    Stopwatch testStartStopwatch = Stopwatch.createUnstarted();
+    testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+    PullReplicationTasksHealthCheck healthcheck =
+        getInstance(PullReplicationTasksHealthCheck.class);
+
+    Ref newRef = createNewRef();
+
+    ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
+    ProjectEvent event =
+        generateUpdateEvent(
+            project,
+            newRef.getName(),
+            ObjectId.zeroId().getName(),
+            newRef.getObjectId().getName(),
+            TEST_REPLICATION_REMOTE);
+
+    eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
+    pullReplicationQueue.onEvent(event);
+    // If replication hasn't started yet, the healthcheck returns PASSED
+    // but hasReplicationFinished() would be false, ending up in producing
+    // flaky failures.
+    waitUntil(() -> hasReplicationBeenScheduledOrStarted());
+
+    waitUntil(
+        () -> {
+          boolean healthCheckPassed = healthcheck.run().result == HealthCheck.Result.PASSED;
+          boolean replicationFinished = hasReplicationFinished();
+          if (!replicationFinished) {
+            assertWithMessage("Instance reported healthy while waiting for replication to finish")
+                // Racy condition: we need to make sure that this isn't a false alarm
+                // and accept the case when the replication finished between the
+                // if(!replicationFinished) check and the assertion here
+                .that(!healthCheckPassed || hasReplicationFinished())
+                .isTrue();
+          } else {
+            if (!testStartStopwatch.isRunning()) {
+              testStartStopwatch.start();
+            }
+          }
+          return replicationFinished;
+        });
+
+    if (testStartStopwatch.elapsed(TimeUnit.SECONDS) < TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC) {
+      assertThat(healthcheck.run().result).isEqualTo(HealthCheck.Result.FAILED);
+    }
+
+    waitUntil(
+        () -> testStartStopwatch.elapsed(TimeUnit.SECONDS) > TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC);
+    assertThat(healthcheck.run().result).isEqualTo(HealthCheck.Result.PASSED);
+  }
+
+  private boolean hasReplicationFinished() {
+    return inMemoryMetrics()
+        .counterValue("tasks/completed", TEST_REPLICATION_REMOTE)
+        .filter(counter -> counter > 0)
+        .isPresent();
+  }
+
+  private boolean hasReplicationBeenScheduledOrStarted() {
+    return inMemoryMetrics()
+            .counterValue("tasks/scheduled", TEST_REPLICATION_REMOTE)
+            .filter(counter -> counter > 0)
+            .isPresent()
+        || inMemoryMetrics()
+            .counterValue("tasks/started", TEST_REPLICATION_REMOTE)
+            .filter(counter -> counter > 0)
+            .isPresent();
+  }
+
+  private InMemoryMetricMaker inMemoryMetrics() {
+    return getInstance(InMemoryMetricMaker.class);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
index c791770..5e44cb9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
@@ -39,6 +39,7 @@
 import com.google.gerrit.server.events.ProjectEvent;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckExtensionApiModule;
 import com.googlesource.gerrit.plugins.replication.ApiModule;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfigModule;
@@ -80,6 +81,7 @@
     protected void configure() {
       super.configure();
       install(new ApiModule());
+      install(new HealthCheckExtensionApiModule());
 
       DynamicSet.bind(binder(), EventListener.class)
           .to(BufferedEventListener.class)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
index 9cb2dc9..ed71545 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication.pull;
 
+import static com.google.gerrit.acceptance.GitUtil.assertPushOk;
+import static com.google.gerrit.acceptance.GitUtil.pushOne;
 import static java.util.stream.Collectors.toList;
 
 import com.google.common.base.Suppliers;
@@ -38,10 +40,12 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.util.FS;
 
@@ -179,4 +183,12 @@
     event.instanceId = instanceId;
     return event;
   }
+
+  protected Ref createNewRef() throws Exception {
+    String newRef = "refs/heads/" + UUID.randomUUID();
+    RevCommit newCommit = testRepo.branch("HEAD").commit().create();
+    testRepo.branch(newRef).update(newCommit);
+    assertPushOk(pushOne(testRepo, newRef, newRef, false, false, List.of()), newRef);
+    return testRepo.getRepository().exactRef(newRef);
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java
index a20f617..7bbd1e2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java
@@ -2,6 +2,7 @@
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckExtensionApiModule;
 import com.googlesource.gerrit.plugins.replication.ApiModule;
 
 public class TestPullReplicationModule extends AbstractModule {
@@ -16,6 +17,7 @@
   @Override
   protected void configure() {
     install(new ApiModule());
+    install(new HealthCheckExtensionApiModule());
     install(pullReplicationModule);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java
new file mode 100644
index 0000000..06fa996
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java
@@ -0,0 +1,275 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.health;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.HEALTHCHECK_NAME_SUFFIX;
+import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.PERIOD_OF_TIME_FIELD;
+import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.PROJECTS_FILTER_FIELD;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Ticker;
+import com.google.common.testing.FakeTicker;
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.metrics.DisabledMetricMaker;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckConfig;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckExtensionApiModule;
+import com.googlesource.gerrit.plugins.healthcheck.check.HealthCheck;
+import com.googlesource.gerrit.plugins.replication.ConfigResource;
+import com.googlesource.gerrit.plugins.replication.MergedConfigResource;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PullReplicationTasksHealthCheckTest {
+  private static final String PLUGIN_NAME = "pull-replication";
+  private static final String SECTION_NAME = PLUGIN_NAME + HEALTHCHECK_NAME_SUFFIX;
+  private static final String ZERO_PERIOD_OF_TIME = "0 sec";
+  private static final Optional<String> NO_PROJECT_FILTER = Optional.empty();
+
+  private final int periodOfTimeMillis = 10;
+  private final String periodOfTimeMillisStr = periodOfTimeMillis + " ms";
+  private final FakeTicker fakeTicker = new FakeTicker();
+  @Mock private SourcesCollection sourcesCollection;
+
+  @Mock private Source source;
+  @Mock private Source anotherSource;
+
+  @Before
+  public void setUp() {
+    when(sourcesCollection.getAll()).thenReturn(List.of(source));
+  }
+
+  @Test
+  public void shouldReadConfig() {
+    List<String> projectsToCheck = List.of("foo", "bar/baz");
+    Injector injector = testInjector(new TestModule(projectsToCheck, periodOfTimeMillisStr));
+
+    PullReplicationTasksHealthCheck check =
+        injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+    assertThat(check.getProjects()).containsExactlyElementsIn(projectsToCheck);
+    assertThat(check.getPeriodOfTimeNanos())
+        .isEqualTo(TimeUnit.MILLISECONDS.toNanos(periodOfTimeMillis));
+  }
+
+  @Test
+  public void shouldOnlyCheckTasksForReposThatMatchTheRepoFilter() {
+    int numIterations = 3;
+    String repo = "foo";
+    when(source.zeroPendingTasksForRepo(Project.nameKey(repo))).thenReturn(false).thenReturn(true);
+    when(source.zeroInflightTasksForRepo(Project.nameKey(repo))).thenReturn(false).thenReturn(true);
+    lenient().when(source.zeroInflightTasksForRepo(Project.nameKey("ignored"))).thenReturn(false);
+    lenient().when(source.pendingTasksCount()).thenReturn(10L);
+
+    PullReplicationTasksHealthCheck check = newPullReplicationTasksHealthCheck(Optional.of(repo));
+
+    List<HealthCheck.Result> checkResults = runNTimes(numIterations, check);
+    assertThat(checkResults)
+        .containsExactly(
+            HealthCheck.Result.FAILED, HealthCheck.Result.FAILED, HealthCheck.Result.PASSED);
+  }
+
+  @Test
+  public void shouldCheckAllOutstandingTasksWhenRepoFilterIsNotConfigured() {
+    when(source.pendingTasksCount()).thenReturn(1L).thenReturn(0L);
+    when(source.inflightTasksCount()).thenReturn(1L).thenReturn(0L);
+
+    PullReplicationTasksHealthCheck check = newPullReplicationTasksHealthCheck(NO_PROJECT_FILTER);
+
+    List<HealthCheck.Result> checkResults = runNTimes(3, check);
+    assertThat(checkResults)
+        .containsExactly(
+            HealthCheck.Result.FAILED, HealthCheck.Result.FAILED, HealthCheck.Result.PASSED);
+  }
+
+  @Test
+  public void shouldCheckTasksAcrossSources() {
+    when(sourcesCollection.getAll()).thenReturn(List.of(source, anotherSource));
+    when(source.pendingTasksCount()).thenReturn(1L).thenReturn(0L);
+    when(source.inflightTasksCount()).thenReturn(1L).thenReturn(0L);
+    when(anotherSource.pendingTasksCount()).thenReturn(1L).thenReturn(0L);
+    when(anotherSource.inflightTasksCount()).thenReturn(1L).thenReturn(0L);
+
+    PullReplicationTasksHealthCheck check = newPullReplicationTasksHealthCheck(NO_PROJECT_FILTER);
+
+    List<HealthCheck.Result> checkResults = runNTimes(5, check);
+    assertThat(checkResults)
+        .containsExactly(
+            HealthCheck.Result.FAILED,
+            HealthCheck.Result.FAILED,
+            HealthCheck.Result.FAILED,
+            HealthCheck.Result.FAILED,
+            HealthCheck.Result.PASSED);
+  }
+
+  @Test
+  public void shouldFailIfCheckDoesNotReportHealthyConsistentlyOverPeriodOfTime() {
+    long healthCheckPeriodOfTimeMsec = 50L;
+    String healthCheckPeriodOfTime = healthCheckPeriodOfTimeMsec + " ms";
+    int checkInterations = 3;
+    Duration checkInterval =
+        Duration.ofMillis(healthCheckPeriodOfTimeMsec).dividedBy(checkInterations);
+    long fakeTimerStartNanos = fakeTicker.read();
+    when(source.pendingTasksCount()).thenReturn(0L).thenReturn(1L).thenReturn(0L);
+    when(source.inflightTasksCount()).thenReturn(0L);
+
+    Injector injector = testInjector(new TestModule(List.of(), healthCheckPeriodOfTime));
+    PullReplicationTasksHealthCheck check =
+        injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+    List<HealthCheck.Result> checkResults =
+        runNTimes(checkInterations, check, () -> fakeTicker.advance(checkInterval));
+
+    assertThat(checkResults).doesNotContain(HealthCheck.Result.PASSED);
+    assertThat(fakeTicker.read())
+        .isAtLeast(Duration.ofMillis(periodOfTimeMillis).plusNanos(fakeTimerStartNanos).toNanos());
+  }
+
+  @Test
+  public void shouldFailOnFirstInvocationEvenIfThereAreNoOutstandingTasksAndANonZeroPeriodOfTime() {
+    mockSourceWithNoOutstandingTasks();
+
+    Injector injector = testInjector(new TestModule(List.of(), periodOfTimeMillisStr));
+    PullReplicationTasksHealthCheck check =
+        injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+    assertThat(check.run().result).isEqualTo(HealthCheck.Result.FAILED);
+  }
+
+  @Test
+  public void
+      shouldReportHealthyOnFirstInvocationIfThereAreNoOutstandingTasksAndAZeroPeriodOfTime() {
+    mockSourceWithNoOutstandingTasks();
+
+    Injector injector = testInjector(new TestModule(List.of(), ZERO_PERIOD_OF_TIME));
+    PullReplicationTasksHealthCheck check =
+        injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+    assertThat(check.run().result).isEqualTo(HealthCheck.Result.PASSED);
+  }
+
+  @Test
+  public void shouldAlwaysReportHealthyAfterItHasCaughtUpWithOutstandingTasks() {
+    int numCheckIterations = 2;
+    when(source.pendingTasksCount()).thenReturn(0L, 0L, 1L);
+    when(source.inflightTasksCount()).thenReturn(0L);
+
+    Injector injector = testInjector(new TestModule(List.of(), periodOfTimeMillisStr));
+    PullReplicationTasksHealthCheck check =
+        injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+    assertThat(
+            runNTimes(
+                numCheckIterations,
+                check,
+                () -> fakeTicker.advance(Duration.ofMillis(periodOfTimeMillis))))
+        .containsExactly(HealthCheck.Result.FAILED, HealthCheck.Result.PASSED);
+
+    assertThat(check.run().result).isEqualTo(HealthCheck.Result.PASSED);
+  }
+
+  private Injector testInjector(AbstractModule testModule) {
+    return Guice.createInjector(new HealthCheckExtensionApiModule(), testModule);
+  }
+
+  private List<HealthCheck.Result> runNTimes(int nTimes, PullReplicationTasksHealthCheck check) {
+    return runNTimes(nTimes, check, null);
+  }
+
+  private PullReplicationTasksHealthCheck newPullReplicationTasksHealthCheck(
+      Optional<String> projectNameToCheck) {
+    Injector injector =
+        testInjector(new TestModule(projectNameToCheck.stream().toList(), ZERO_PERIOD_OF_TIME));
+
+    PullReplicationTasksHealthCheck check =
+        injector.getInstance(PullReplicationTasksHealthCheck.class);
+    return check;
+  }
+
+  private List<HealthCheck.Result> runNTimes(
+      int nTimes, PullReplicationTasksHealthCheck check, @Nullable Runnable postRunFunc) {
+    List<HealthCheck.Result> results = new ArrayList<>();
+    for (int i = 0; i < nTimes; i++) {
+      results.add(check.run().result);
+      if (postRunFunc != null) {
+        postRunFunc.run();
+      }
+    }
+
+    return results;
+  }
+
+  private void mockSourceWithNoOutstandingTasks() {
+    when(source.pendingTasksCount()).thenReturn(0L);
+    when(source.inflightTasksCount()).thenReturn(0L);
+  }
+
+  private class TestModule extends AbstractModule {
+    Config config;
+    MergedConfigResource configResource;
+    private final HealthCheckConfig healthCheckConfig = HealthCheckConfig.DEFAULT_CONFIG;
+
+    public TestModule(List<String> projects, String periodOfTime) {
+      this.config = new Config();
+      config.setStringList(
+          HealthCheckConfig.HEALTHCHECK, SECTION_NAME, PROJECTS_FILTER_FIELD, projects);
+      config.setString(
+          HealthCheckConfig.HEALTHCHECK, SECTION_NAME, PERIOD_OF_TIME_FIELD, periodOfTime);
+      configResource =
+          MergedConfigResource.withBaseOnly(
+              new ConfigResource() {
+                @Override
+                public Config getConfig() {
+                  return config;
+                }
+
+                @Override
+                public String getVersion() {
+                  return "";
+                }
+              });
+    }
+
+    @Override
+    protected void configure() {
+      bind(Config.class).toInstance(config);
+      bind(MergedConfigResource.class).toInstance(configResource);
+      bind(MetricMaker.class).toInstance(new DisabledMetricMaker());
+      bind(HealthCheckConfig.class).toInstance(healthCheckConfig);
+      bind(String.class).annotatedWith(PluginName.class).toInstance(PLUGIN_NAME);
+      bind(SourcesCollection.class).toInstance(sourcesCollection);
+      bind(Ticker.class).toInstance(fakeTicker);
+    }
+  }
+}