Implement outstanding-tasks healthcheck

Mark an instance as healthy when all outstanding tasks (pending &
in-flight) have been completed across all sources. When a repo
filter is specified, then only the replication tasks that correspond
to the repos configured in the filter are checked.

Bug: Issue 312895374
Change-Id: Ia12ace9179860cd96fbdd81488aa2834f8ab29d6
diff --git a/BUILD b/BUILD
index 664b668..4070d7c 100644
--- a/BUILD
+++ b/BUILD
@@ -49,6 +49,7 @@
     tags = ["pull-replication"],
     visibility = ["//visibility:public"],
     deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
+        ":healthcheck-neverlink",
         ":pull-replication__plugin",
         ":pull_replication_util",
         "//plugins/replication",
@@ -68,6 +69,7 @@
     deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
         ":pull-replication__plugin",
         "//plugins/delete-project",
+        "//plugins/healthcheck",
         "//plugins/replication",
     ],
 )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 0a55043..9283033 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -934,14 +934,22 @@
     }
   }
 
-  long inflightTasksCount() {
+  public long inflightTasksCount() {
     return inFlight.size();
   }
 
-  long pendingTasksCount() {
+  public long pendingTasksCount() {
     return pending.size();
   }
 
+  public boolean zeroPendingTasksForRepo(Project.NameKey project) {
+    return pending.values().stream().noneMatch(fetch -> fetch.getProjectNameKey().equals(project));
+  }
+
+  public boolean zeroInflightTasksForRepo(Project.NameKey project) {
+    return inFlight.values().stream().noneMatch(fetch -> fetch.getProjectNameKey().equals(project));
+  }
+
   private static boolean matches(URIish uri, String urlMatch) {
     if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
       return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java
index 6a4eb9a..3d3be10 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java
@@ -16,6 +16,7 @@
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.server.config.ConfigUtil;
@@ -24,6 +25,9 @@
 import com.googlesource.gerrit.plugins.healthcheck.HealthCheckConfig;
 import com.googlesource.gerrit.plugins.healthcheck.check.AbstractHealthCheck;
 import com.googlesource.gerrit.plugins.replication.MergedConfigResource;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.eclipse.jgit.lib.Config;
@@ -36,6 +40,7 @@
   public static final String PERIOD_OF_TIME_FIELD = "periodOfTime";
   private final Set<String> projects;
   private final long periodOfTimeSec;
+  private final SourcesCollection sourcesCollection;
 
   @Inject
   public PullReplicationTasksHealthCheck(
@@ -43,7 +48,8 @@
       HealthCheckConfig healthCheckConfig,
       MergedConfigResource configResource,
       @PluginName String name,
-      MetricMaker metricMaker) {
+      MetricMaker metricMaker,
+      SourcesCollection sourcesCollection) {
     super(executor, healthCheckConfig, name + HEALTHCHECK_NAME_SUFFIX, metricMaker);
     String healthCheckName = name + HEALTHCHECK_NAME_SUFFIX;
 
@@ -60,6 +66,7 @@
             PERIOD_OF_TIME_FIELD,
             DEFAULT_PERIOD_OF_TIME_SECS,
             TimeUnit.SECONDS);
+    this.sourcesCollection = sourcesCollection;
   }
 
   public long getPeriodOfTimeSec() {
@@ -72,6 +79,21 @@
 
   @Override
   protected Result doCheck() throws Exception {
-    return Result.PASSED;
+    List<Source> sources = sourcesCollection.getAll();
+    boolean hasNoOutstandingTasks =
+        sources.stream()
+            .allMatch(
+                source -> {
+                  if (projects.isEmpty()) {
+                    return source.pendingTasksCount() == 0 && source.inflightTasksCount() == 0;
+                  } else {
+                    return projects.stream()
+                        .allMatch(
+                            project ->
+                                source.zeroPendingTasksForRepo(Project.nameKey(project))
+                                    && source.zeroInflightTasksForRepo(Project.nameKey(project)));
+                  }
+                });
+    return hasNoOutstandingTasks ? Result.PASSED : Result.FAILED;
   }
 }
diff --git a/src/main/resources/Documentation/healthcheck.md b/src/main/resources/Documentation/healthcheck.md
index aab68bd..f3001c9 100644
--- a/src/main/resources/Documentation/healthcheck.md
+++ b/src/main/resources/Documentation/healthcheck.md
@@ -2,7 +2,16 @@
 ==============
 
 The @PLUGIN@ plugin registers the `pull-replication-outstanding-tasks`
-healthcheck.
+healthcheck. This check will mark a gerrit instance as healthy
+only when the node has caught up with all the outstanding
+pull-replication tasks. The goal is to mark the node as healthy when it
+is ready to receive write traffic. "Caught up" means:
+
+- All pending & in-flight replication tasks across all sources (or
+across a configurable set of repos) have completed
+
+See [Healthcheck based on replication tasks](https://issues.gerritcodereview.com/issues/312895374) for more details.
+
 
 Health check configuration
 --------------------------
@@ -16,8 +25,8 @@
 
 The health check can be configured as follows:
 - `healthcheck.@PLUGIN@-outstanding-tasks.projects`: The repo(s) that
-the health check will track pending replication tasks against. Multiple
-entries are supported.
+the health check will track outstanding replication tasks against.
+Multiple entries are supported.
 - `healthcheck.@PLUGIN@-outstanding-tasks.periodOfTime`: The time for
 which the check needs to be successful, in order for the instance to be
 marked healthy. If the time unit is omitted it defaults to seconds.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java
new file mode 100644
index 0000000..226e509
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java
@@ -0,0 +1,223 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.truth.Truth.assertWithMessage;
+import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.HEALTHCHECK_NAME_SUFFIX;
+import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.PERIOD_OF_TIME_FIELD;
+
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.config.GerritConfig;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.events.ProjectEvent;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckConfig;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckExtensionApiModule;
+import com.googlesource.gerrit.plugins.healthcheck.check.HealthCheck;
+import com.googlesource.gerrit.plugins.replication.ApiModule;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfigModule;
+import com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.junit.Test;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule =
+        "com.googlesource.gerrit.plugins.replication.pull.PullReplicationHealthCheckIT$PullReplicationHealthCheckTestModule",
+    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class PullReplicationHealthCheckIT extends PullReplicationSetupBase {
+
+  public static class PullReplicationHealthCheckTestModule extends AbstractModule {
+    private final PullReplicationModule pullReplicationModule;
+
+    @Inject
+    PullReplicationHealthCheckTestModule(
+        ReplicationConfigModule configModule, InMemoryMetricMaker memMetric) {
+      this.pullReplicationModule = new PullReplicationModule(configModule, memMetric);
+    }
+
+    @Override
+    protected void configure() {
+      install(new ApiModule());
+      install(new HealthCheckExtensionApiModule());
+      install(pullReplicationModule);
+
+      DynamicSet.bind(binder(), EventListener.class)
+          .to(PullReplicationHealthCheckIT.BufferedEventListener.class)
+          .asEagerSingleton();
+    }
+  }
+
+  @Inject private SitePaths sitePaths;
+  private PullReplicationHealthCheckIT.BufferedEventListener eventListener;
+  private final int periodOfTimeSecs = 1;
+
+  @Singleton
+  public static class BufferedEventListener implements EventListener {
+
+    private final List<Event> eventsReceived;
+    private String eventTypeFilter;
+
+    @Inject
+    public BufferedEventListener() {
+      eventsReceived = new ArrayList<>();
+    }
+
+    @Override
+    public void onEvent(Event event) {
+      if (event.getType().equals(eventTypeFilter)) {
+        eventsReceived.add(event);
+      }
+    }
+
+    public void clearFilter(String expectedEventType) {
+      eventsReceived.clear();
+      eventTypeFilter = expectedEventType;
+    }
+
+    public int numEventsReceived() {
+      return eventsReceived.size();
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T extends Event> Stream<T> eventsStream() {
+      return (Stream<T>) eventsReceived.stream();
+    }
+  }
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+
+    FileBasedConfig config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    config.setString("replication", null, "syncRefs", "ALL REFS ASYNC");
+    config.setString(
+        HealthCheckConfig.HEALTHCHECK,
+        "pull-replication" + HEALTHCHECK_NAME_SUFFIX,
+        PERIOD_OF_TIME_FIELD,
+        periodOfTimeSecs + " sec");
+    config.save();
+
+    super.setUpTestPlugin(true);
+    eventListener = getInstance(PullReplicationHealthCheckIT.BufferedEventListener.class);
+  }
+
+  @Override
+  protected boolean useBatchRefUpdateEvent() {
+    return false;
+  }
+
+  @Override
+  protected void setReplicationSource(
+      String remoteName, List<String> replicaSuffixes, Optional<String> project)
+      throws IOException {
+    List<String> fetchUrls =
+        buildReplicaURLs(replicaSuffixes, s -> gitPath.resolve("${name}" + s + ".git").toString());
+    config.setStringList("remote", remoteName, "url", fetchUrls);
+    config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
+    config.setString("remote", remoteName, "fetch", "+refs/*:refs/*");
+    config.setInt("remote", remoteName, "timeout", 600);
+    config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+    config.setBoolean("gerrit", null, "autoReload", true);
+    config.setInt("replication", null, "maxApiPayloadSize", 1);
+    config.save();
+  }
+
+  @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
+  public void shouldReportInstanceHealthyWhenThereAreNoOutstandingReplicationTasks()
+      throws Exception {
+    testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+    PullReplicationTasksHealthCheck healthcheck =
+        getInstance(PullReplicationTasksHealthCheck.class);
+
+    Ref newRef = createNewRef();
+
+    ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
+    ProjectEvent event =
+        generateUpdateEvent(
+            project,
+            newRef.getName(),
+            ObjectId.zeroId().getName(),
+            newRef.getObjectId().getName(),
+            TEST_REPLICATION_REMOTE);
+
+    eventListener.clearFilter(FetchRefReplicatedEvent.TYPE);
+    pullReplicationQueue.onEvent(event);
+    // If replication hasn't started yet, the healthcheck returns PASSED
+    // but hasReplicationFinished() would be false, ending up in producing
+    // flaky failures.
+    waitUntil(() -> hasReplicationBeenScheduledOrStarted());
+
+    waitUntil(
+        () -> {
+          boolean replicationFinished = hasReplicationFinished();
+          if (!replicationFinished) {
+            boolean healthCheckPassed = healthcheck.run().result == HealthCheck.Result.PASSED;
+
+            assertWithMessage("Instance reported healthy while waiting for replication to finish")
+                // Racy condition: we need to make sure that this isn't a false alarm
+                // and accept the case when the replication finished between the
+                // if(!replicationFinished) check and the assertion here
+                .that(!healthCheckPassed || hasReplicationFinished())
+                .isTrue();
+          }
+          return replicationFinished;
+        });
+
+    assertThat(healthcheck.run().result).isEqualTo(HealthCheck.Result.PASSED);
+  }
+
+  private boolean hasReplicationFinished() {
+    return inMemoryMetrics()
+        .counterValue("tasks/completed", TEST_REPLICATION_REMOTE)
+        .filter(counter -> counter > 0)
+        .isPresent();
+  }
+
+  private boolean hasReplicationBeenScheduledOrStarted() {
+    return inMemoryMetrics()
+            .counterValue("tasks/scheduled", TEST_REPLICATION_REMOTE)
+            .filter(counter -> counter > 0)
+            .isPresent()
+        || inMemoryMetrics()
+            .counterValue("tasks/started", TEST_REPLICATION_REMOTE)
+            .filter(counter -> counter > 0)
+            .isPresent();
+  }
+
+  private InMemoryMetricMaker inMemoryMetrics() {
+    return getInstance(InMemoryMetricMaker.class);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
index c791770..5e44cb9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
@@ -39,6 +39,7 @@
 import com.google.gerrit.server.events.ProjectEvent;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckExtensionApiModule;
 import com.googlesource.gerrit.plugins.replication.ApiModule;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfigModule;
@@ -80,6 +81,7 @@
     protected void configure() {
       super.configure();
       install(new ApiModule());
+      install(new HealthCheckExtensionApiModule());
 
       DynamicSet.bind(binder(), EventListener.class)
           .to(BufferedEventListener.class)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
index 9cb2dc9..ed71545 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication.pull;
 
+import static com.google.gerrit.acceptance.GitUtil.assertPushOk;
+import static com.google.gerrit.acceptance.GitUtil.pushOne;
 import static java.util.stream.Collectors.toList;
 
 import com.google.common.base.Suppliers;
@@ -38,10 +40,12 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.util.FS;
 
@@ -179,4 +183,12 @@
     event.instanceId = instanceId;
     return event;
   }
+
+  protected Ref createNewRef() throws Exception {
+    String newRef = "refs/heads/" + UUID.randomUUID();
+    RevCommit newCommit = testRepo.branch("HEAD").commit().create();
+    testRepo.branch(newRef).update(newCommit);
+    assertPushOk(pushOne(testRepo, newRef, newRef, false, false, List.of()), newRef);
+    return testRepo.getRepository().exactRef(newRef);
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java
index a20f617..7bbd1e2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/TestPullReplicationModule.java
@@ -2,6 +2,7 @@
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.healthcheck.HealthCheckExtensionApiModule;
 import com.googlesource.gerrit.plugins.replication.ApiModule;
 
 public class TestPullReplicationModule extends AbstractModule {
@@ -16,6 +17,7 @@
   @Override
   protected void configure() {
     install(new ApiModule());
+    install(new HealthCheckExtensionApiModule());
     install(pullReplicationModule);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java
index 97eda7c..6c5b537 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java
@@ -18,7 +18,10 @@
 import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.HEALTHCHECK_NAME_SUFFIX;
 import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.PERIOD_OF_TIME_FIELD;
 import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.PROJECTS_FILTER_FIELD;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.when;
 
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.metrics.DisabledMetricMaker;
 import com.google.gerrit.metrics.MetricMaker;
@@ -30,28 +33,110 @@
 import com.googlesource.gerrit.plugins.healthcheck.check.HealthCheck;
 import com.googlesource.gerrit.plugins.replication.ConfigResource;
 import com.googlesource.gerrit.plugins.replication.MergedConfigResource;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.IntStream;
 import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class PullReplicationTasksHealthCheckTest {
   private static final String PLUGIN_NAME = "pull-replication";
   private static final String SECTION_NAME = PLUGIN_NAME + HEALTHCHECK_NAME_SUFFIX;
 
+  private final int periodOfCheckSec = 10;
+  @Mock private SourcesCollection sourcesCollection;
+
+  @Mock private Source source;
+  @Mock private Source anotherSource;
+
+  @Before
+  public void setUp() {
+    when(sourcesCollection.getAll()).thenReturn(List.of(source));
+  }
+
   @Test
-  public void shouldAlwaysPass() {
+  public void shouldReadConfig() {
     List<String> projectsToCheck = List.of("foo", "bar/baz");
-    int periodOfCheckSec = 10;
-    Injector injector = testInjector(new TestModule(projectsToCheck, 10 + " sec"));
+    Injector injector = testInjector(new TestModule(projectsToCheck, periodOfCheckSec + " sec"));
 
     PullReplicationTasksHealthCheck check =
         injector.getInstance(PullReplicationTasksHealthCheck.class);
 
-    assertThat(check.run().result).isEqualTo(HealthCheck.Result.PASSED);
     assertThat(check.getProjects()).containsExactlyElementsIn(projectsToCheck);
     assertThat(check.getPeriodOfTimeSec()).isEqualTo(periodOfCheckSec);
   }
 
+  @Test
+  public void shouldOnlyCheckTasksForReposThatMatchTheRepoFilter() {
+    String repo = "foo";
+    when(source.zeroPendingTasksForRepo(Project.nameKey(repo))).thenReturn(false).thenReturn(true);
+    when(source.zeroInflightTasksForRepo(Project.nameKey(repo))).thenReturn(false).thenReturn(true);
+    lenient().when(source.zeroInflightTasksForRepo(Project.nameKey("ignored"))).thenReturn(false);
+    lenient().when(source.pendingTasksCount()).thenReturn(10L);
+
+    Injector injector = testInjector(new TestModule(List.of(repo), periodOfCheckSec + " sec"));
+
+    PullReplicationTasksHealthCheck check =
+        injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+    List<HealthCheck.Result> checkResults = runNTimes(3, check);
+    assertThat(checkResults)
+        .containsExactly(
+            HealthCheck.Result.FAILED, HealthCheck.Result.FAILED, HealthCheck.Result.PASSED);
+  }
+
+  @Test
+  public void shouldCheckAllOutstandingTasksWhenRepoFilterIsNotConfigured() {
+    List<String> noRepoFilter = List.of();
+    when(source.pendingTasksCount()).thenReturn(1L).thenReturn(0L);
+    when(source.inflightTasksCount()).thenReturn(1L).thenReturn(0L);
+
+    Injector injector = testInjector(new TestModule(noRepoFilter, periodOfCheckSec + " sec"));
+    PullReplicationTasksHealthCheck check =
+        injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+    List<HealthCheck.Result> checkResults = runNTimes(3, check);
+    assertThat(checkResults)
+        .containsExactly(
+            HealthCheck.Result.FAILED, HealthCheck.Result.FAILED, HealthCheck.Result.PASSED);
+  }
+
+  @Test
+  public void shouldCheckTasksAcrossSources() {
+    when(sourcesCollection.getAll()).thenReturn(List.of(source, anotherSource));
+    when(source.pendingTasksCount()).thenReturn(1L).thenReturn(0L);
+    when(source.inflightTasksCount()).thenReturn(1L).thenReturn(0L);
+    when(anotherSource.pendingTasksCount()).thenReturn(1L).thenReturn(0L);
+    when(anotherSource.inflightTasksCount()).thenReturn(1L).thenReturn(0L);
+
+    Injector injector = testInjector(new TestModule(List.of(), periodOfCheckSec + " sec"));
+    PullReplicationTasksHealthCheck check =
+        injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+    List<HealthCheck.Result> checkResults = runNTimes(5, check);
+    assertThat(checkResults)
+        .containsExactly(
+            HealthCheck.Result.FAILED,
+            HealthCheck.Result.FAILED,
+            HealthCheck.Result.FAILED,
+            HealthCheck.Result.FAILED,
+            HealthCheck.Result.PASSED);
+  }
+
+  private List<HealthCheck.Result> runNTimes(int nTimes, PullReplicationTasksHealthCheck check) {
+    List<HealthCheck.Result> results = new ArrayList<>();
+    IntStream.rangeClosed(1, nTimes).mapToObj(i -> check.run().result).forEach(results::add);
+
+    return results;
+  }
+
   private Injector testInjector(AbstractModule testModule) {
     return Guice.createInjector(new HealthCheckExtensionApiModule(), testModule);
   }
@@ -89,6 +174,7 @@
       bind(MetricMaker.class).toInstance(new DisabledMetricMaker());
       bind(HealthCheckConfig.class).toInstance(healthCheckConfig);
       bind(String.class).annotatedWith(PluginName.class).toInstance(PLUGIN_NAME);
+      bind(SourcesCollection.class).toInstance(sourcesCollection);
     }
   }
 }