Assess healthiness by factoring in periodOfTime

Modify the health check so that it reports healthy only when there are
no outstanding tasks for a duration of at least N, where N corresponds to the
value set in the `periodOfTime` setting.

Bug: Issue 312895374
Change-Id: Id3fe47605e6a82ca09e79f52b224af7beb77e2f4
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java
index 3d3be10..a621778 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheck.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication.pull.health;
 
+import com.google.common.base.Ticker;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.gerrit.entities.Project;
@@ -24,10 +25,12 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.healthcheck.HealthCheckConfig;
 import com.googlesource.gerrit.plugins.healthcheck.check.AbstractHealthCheck;
+import com.googlesource.gerrit.plugins.healthcheck.check.HealthCheck;
 import com.googlesource.gerrit.plugins.replication.MergedConfigResource;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
 import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.eclipse.jgit.lib.Config;
@@ -39,8 +42,10 @@
   public static final String PROJECTS_FILTER_FIELD = "projects";
   public static final String PERIOD_OF_TIME_FIELD = "periodOfTime";
   private final Set<String> projects;
-  private final long periodOfTimeSec;
+  private final long periodOfTimeNanos;
   private final SourcesCollection sourcesCollection;
+  private final Ticker ticker;
+  private Optional<Long> successfulSince = Optional.empty();
 
   @Inject
   public PullReplicationTasksHealthCheck(
@@ -49,7 +54,8 @@
       MergedConfigResource configResource,
       @PluginName String name,
       MetricMaker metricMaker,
-      SourcesCollection sourcesCollection) {
+      SourcesCollection sourcesCollection,
+      Ticker ticker) {
     super(executor, healthCheckConfig, name + HEALTHCHECK_NAME_SUFFIX, metricMaker);
     String healthCheckName = name + HEALTHCHECK_NAME_SUFFIX;
 
@@ -58,19 +64,20 @@
         Set.of(
             replicationConfig.getStringList(
                 HealthCheckConfig.HEALTHCHECK, healthCheckName, PROJECTS_FILTER_FIELD));
-    this.periodOfTimeSec =
+    this.periodOfTimeNanos =
         ConfigUtil.getTimeUnit(
             replicationConfig,
             HealthCheckConfig.HEALTHCHECK,
             healthCheckName,
             PERIOD_OF_TIME_FIELD,
             DEFAULT_PERIOD_OF_TIME_SECS,
-            TimeUnit.SECONDS);
+            TimeUnit.NANOSECONDS);
     this.sourcesCollection = sourcesCollection;
+    this.ticker = ticker;
   }
 
-  public long getPeriodOfTimeSec() {
-    return periodOfTimeSec;
+  public long getPeriodOfTimeNanos() {
+    return periodOfTimeNanos;
   }
 
   public Set<String> getProjects() {
@@ -79,6 +86,7 @@
 
   @Override
   protected Result doCheck() throws Exception {
+    long checkTime = ticker.read();
     List<Source> sources = sourcesCollection.getAll();
     boolean hasNoOutstandingTasks =
         sources.stream()
@@ -94,6 +102,15 @@
                                     && source.zeroInflightTasksForRepo(Project.nameKey(project)));
                   }
                 });
-    return hasNoOutstandingTasks ? Result.PASSED : Result.FAILED;
+    successfulSince =
+        hasNoOutstandingTasks ? successfulSince.or(() -> Optional.of(checkTime)) : Optional.empty();
+    return reportResult(checkTime);
+  }
+
+  private HealthCheck.Result reportResult(long checkTime) {
+    return successfulSince
+        .filter(ss -> checkTime >= ss + periodOfTimeNanos)
+        .map(ss -> Result.PASSED)
+        .orElse(Result.FAILED);
   }
 }
diff --git a/src/main/resources/Documentation/healthcheck.md b/src/main/resources/Documentation/healthcheck.md
index f3001c9..3b84642 100644
--- a/src/main/resources/Documentation/healthcheck.md
+++ b/src/main/resources/Documentation/healthcheck.md
@@ -9,6 +9,8 @@
 
 - All pending & in-flight replication tasks across all sources (or
 across a configurable set of repos) have completed
+- There are no queued replication tasks pending and the above condition
+lasts for at least N seconds (configurable)
 
 See [Healthcheck based on replication tasks](https://issues.gerritcodereview.com/issues/312895374) for more details.
 
@@ -26,10 +28,11 @@
 The health check can be configured as follows:
 - `healthcheck.@PLUGIN@-outstanding-tasks.projects`: The repo(s) that
 the health check will track outstanding replication tasks against.
-Multiple entries are supported.
+Multiple entries are supported. If not specified, all the outstanding
+replication tasks are tracked.
 - `healthcheck.@PLUGIN@-outstanding-tasks.periodOfTime`: The time for
 which the check needs to be successful, in order for the instance to be
-marked healthy. If the time unit is omitted it defaults to seconds.
+marked healthy. If the time unit is omitted it defaults to milliseconds.
 Values should use common unit suffixes to express their setting:
 
 * ms, milliseconds
@@ -37,6 +40,17 @@
 * m, min, minute, minutes
 * h, hr, hour, hours
 
+Default is 10s.
+
+This example config will report the node healthy when there are no
+pending tasks for the `foo` and `bar/baz` repos continuously for a
+period of 5 seconds after the plugin startup.
+```
+[healthcheck "pull-replication-tasks"]
+    projects = foo
+    projects = bar/baz
+    periodOfTime = 5 sec
+```
 
 Useful information
 ------------------
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java
index 226e509..346b7fa 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationHealthCheckIT.java
@@ -19,6 +19,7 @@
 import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.HEALTHCHECK_NAME_SUFFIX;
 import static com.googlesource.gerrit.plugins.replication.pull.health.PullReplicationTasksHealthCheck.PERIOD_OF_TIME_FIELD;
 
+import com.google.common.base.Stopwatch;
 import com.google.gerrit.acceptance.SkipProjectClone;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
@@ -41,6 +42,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
@@ -56,6 +58,8 @@
         "com.googlesource.gerrit.plugins.replication.pull.PullReplicationHealthCheckIT$PullReplicationHealthCheckTestModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
 public class PullReplicationHealthCheckIT extends PullReplicationSetupBase {
+  private static final int TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC = 5;
+  private static final String TEST_PLUGIN_NAME = "pull-replication";
 
   public static class PullReplicationHealthCheckTestModule extends AbstractModule {
     private final PullReplicationModule pullReplicationModule;
@@ -151,6 +155,11 @@
     project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
     config.setBoolean("gerrit", null, "autoReload", true);
     config.setInt("replication", null, "maxApiPayloadSize", 1);
+    config.setString(
+        HealthCheckConfig.HEALTHCHECK,
+        TEST_PLUGIN_NAME + HEALTHCHECK_NAME_SUFFIX,
+        PERIOD_OF_TIME_FIELD,
+        TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC + " sec");
     config.save();
   }
 
@@ -158,6 +167,7 @@
   @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReportInstanceHealthyWhenThereAreNoOutstandingReplicationTasks()
       throws Exception {
+    Stopwatch testStartStopwatch = Stopwatch.createUnstarted();
     testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
     PullReplicationTasksHealthCheck healthcheck =
         getInstance(PullReplicationTasksHealthCheck.class);
@@ -182,20 +192,29 @@
 
     waitUntil(
         () -> {
+          boolean healthCheckPassed = healthcheck.run().result == HealthCheck.Result.PASSED;
           boolean replicationFinished = hasReplicationFinished();
           if (!replicationFinished) {
-            boolean healthCheckPassed = healthcheck.run().result == HealthCheck.Result.PASSED;
-
             assertWithMessage("Instance reported healthy while waiting for replication to finish")
                 // Racy condition: we need to make sure that this isn't a false alarm
                 // and accept the case when the replication finished between the
                 // if(!replicationFinished) check and the assertion here
                 .that(!healthCheckPassed || hasReplicationFinished())
                 .isTrue();
+          } else {
+            if (!testStartStopwatch.isRunning()) {
+              testStartStopwatch.start();
+            }
           }
           return replicationFinished;
         });
 
+    if (testStartStopwatch.elapsed(TimeUnit.SECONDS) < TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC) {
+      assertThat(healthcheck.run().result).isEqualTo(HealthCheck.Result.FAILED);
+    }
+
+    waitUntil(
+        () -> testStartStopwatch.elapsed(TimeUnit.SECONDS) > TEST_HEALTHCHECK_PERIOD_OF_TIME_SEC);
     assertThat(healthcheck.run().result).isEqualTo(HealthCheck.Result.PASSED);
   }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java
index 6c5b537..6dfbc03 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/health/PullReplicationTasksHealthCheckTest.java
@@ -21,6 +21,9 @@
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.when;
 
+import com.google.common.base.Ticker;
+import com.google.common.testing.FakeTicker;
+import com.google.gerrit.common.Nullable;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.metrics.DisabledMetricMaker;
@@ -35,9 +38,11 @@
 import com.googlesource.gerrit.plugins.replication.MergedConfigResource;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
 import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.IntStream;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import org.eclipse.jgit.lib.Config;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,8 +54,12 @@
 public class PullReplicationTasksHealthCheckTest {
   private static final String PLUGIN_NAME = "pull-replication";
   private static final String SECTION_NAME = PLUGIN_NAME + HEALTHCHECK_NAME_SUFFIX;
+  private static final String ZERO_PERIOD_OF_TIME = "0 sec";
+  private static final Optional<String> NO_PROJECT_FILTER = Optional.empty();
 
-  private final int periodOfCheckSec = 10;
+  private final int periodOfTimeMillis = 10;
+  private final String periodOfTimeMillisStr = periodOfTimeMillis + " ms";
+  private final FakeTicker fakeTicker = new FakeTicker();
   @Mock private SourcesCollection sourcesCollection;
 
   @Mock private Source source;
@@ -64,29 +73,28 @@
   @Test
   public void shouldReadConfig() {
     List<String> projectsToCheck = List.of("foo", "bar/baz");
-    Injector injector = testInjector(new TestModule(projectsToCheck, periodOfCheckSec + " sec"));
+    Injector injector = testInjector(new TestModule(projectsToCheck, periodOfTimeMillisStr));
 
     PullReplicationTasksHealthCheck check =
         injector.getInstance(PullReplicationTasksHealthCheck.class);
 
     assertThat(check.getProjects()).containsExactlyElementsIn(projectsToCheck);
-    assertThat(check.getPeriodOfTimeSec()).isEqualTo(periodOfCheckSec);
+    assertThat(check.getPeriodOfTimeNanos())
+        .isEqualTo(TimeUnit.MILLISECONDS.toNanos(periodOfTimeMillis));
   }
 
   @Test
   public void shouldOnlyCheckTasksForReposThatMatchTheRepoFilter() {
+    int numIterations = 3;
     String repo = "foo";
     when(source.zeroPendingTasksForRepo(Project.nameKey(repo))).thenReturn(false).thenReturn(true);
     when(source.zeroInflightTasksForRepo(Project.nameKey(repo))).thenReturn(false).thenReturn(true);
     lenient().when(source.zeroInflightTasksForRepo(Project.nameKey("ignored"))).thenReturn(false);
     lenient().when(source.pendingTasksCount()).thenReturn(10L);
 
-    Injector injector = testInjector(new TestModule(List.of(repo), periodOfCheckSec + " sec"));
+    PullReplicationTasksHealthCheck check = newPullReplicationTasksHealthCheck(Optional.of(repo));
 
-    PullReplicationTasksHealthCheck check =
-        injector.getInstance(PullReplicationTasksHealthCheck.class);
-
-    List<HealthCheck.Result> checkResults = runNTimes(3, check);
+    List<HealthCheck.Result> checkResults = runNTimes(numIterations, check);
     assertThat(checkResults)
         .containsExactly(
             HealthCheck.Result.FAILED, HealthCheck.Result.FAILED, HealthCheck.Result.PASSED);
@@ -94,13 +102,10 @@
 
   @Test
   public void shouldCheckAllOutstandingTasksWhenRepoFilterIsNotConfigured() {
-    List<String> noRepoFilter = List.of();
     when(source.pendingTasksCount()).thenReturn(1L).thenReturn(0L);
     when(source.inflightTasksCount()).thenReturn(1L).thenReturn(0L);
 
-    Injector injector = testInjector(new TestModule(noRepoFilter, periodOfCheckSec + " sec"));
-    PullReplicationTasksHealthCheck check =
-        injector.getInstance(PullReplicationTasksHealthCheck.class);
+    PullReplicationTasksHealthCheck check = newPullReplicationTasksHealthCheck(NO_PROJECT_FILTER);
 
     List<HealthCheck.Result> checkResults = runNTimes(3, check);
     assertThat(checkResults)
@@ -116,9 +121,7 @@
     when(anotherSource.pendingTasksCount()).thenReturn(1L).thenReturn(0L);
     when(anotherSource.inflightTasksCount()).thenReturn(1L).thenReturn(0L);
 
-    Injector injector = testInjector(new TestModule(List.of(), periodOfCheckSec + " sec"));
-    PullReplicationTasksHealthCheck check =
-        injector.getInstance(PullReplicationTasksHealthCheck.class);
+    PullReplicationTasksHealthCheck check = newPullReplicationTasksHealthCheck(NO_PROJECT_FILTER);
 
     List<HealthCheck.Result> checkResults = runNTimes(5, check);
     assertThat(checkResults)
@@ -130,17 +133,88 @@
             HealthCheck.Result.PASSED);
   }
 
-  private List<HealthCheck.Result> runNTimes(int nTimes, PullReplicationTasksHealthCheck check) {
-    List<HealthCheck.Result> results = new ArrayList<>();
-    IntStream.rangeClosed(1, nTimes).mapToObj(i -> check.run().result).forEach(results::add);
+  @Test
+  public void shouldFailIfCheckDoesNotReportHealthyConsistentlyOverPeriodOfTime() {
+    long healthCheckPeriodOfTimeMsec = 50L;
+    String healthCheckPeriodOfTime = healthCheckPeriodOfTimeMsec + " ms";
+    int checkInterations = 3;
+    Duration checkInterval =
+        Duration.ofMillis(healthCheckPeriodOfTimeMsec).dividedBy(checkInterations);
+    long fakeTimerStartNanos = fakeTicker.read();
+    when(source.pendingTasksCount()).thenReturn(0L).thenReturn(1L).thenReturn(0L);
+    when(source.inflightTasksCount()).thenReturn(0L);
 
-    return results;
+    Injector injector = testInjector(new TestModule(List.of(), healthCheckPeriodOfTime));
+    PullReplicationTasksHealthCheck check =
+        injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+    List<HealthCheck.Result> checkResults =
+        runNTimes(checkInterations, check, () -> fakeTicker.advance(checkInterval));
+
+    assertThat(checkResults).doesNotContain(HealthCheck.Result.PASSED);
+    assertThat(fakeTicker.read())
+        .isAtLeast(Duration.ofMillis(periodOfTimeMillis).plusNanos(fakeTimerStartNanos).toNanos());
+  }
+
+  @Test
+  public void shouldFailOnFirstInvocationEvenIfThereAreNoOutstandingTasksAndANonZeroPeriodOfTime() {
+    mockSourceWithNoOutstandingTasks();
+
+    Injector injector = testInjector(new TestModule(List.of(), periodOfTimeMillisStr));
+    PullReplicationTasksHealthCheck check =
+        injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+    assertThat(check.run().result).isEqualTo(HealthCheck.Result.FAILED);
+  }
+
+  @Test
+  public void
+      shouldReportHealthyOnFirstInvocationIfThereAreNoOutstandingTasksAndAZeroPeriodOfTime() {
+    mockSourceWithNoOutstandingTasks();
+
+    Injector injector = testInjector(new TestModule(List.of(), ZERO_PERIOD_OF_TIME));
+    PullReplicationTasksHealthCheck check =
+        injector.getInstance(PullReplicationTasksHealthCheck.class);
+
+    assertThat(check.run().result).isEqualTo(HealthCheck.Result.PASSED);
   }
 
   private Injector testInjector(AbstractModule testModule) {
     return Guice.createInjector(new HealthCheckExtensionApiModule(), testModule);
   }
 
+  private List<HealthCheck.Result> runNTimes(int nTimes, PullReplicationTasksHealthCheck check) {
+    return runNTimes(nTimes, check, null);
+  }
+
+  private PullReplicationTasksHealthCheck newPullReplicationTasksHealthCheck(
+      Optional<String> projectNameToCheck) {
+    Injector injector =
+        testInjector(new TestModule(projectNameToCheck.stream().toList(), ZERO_PERIOD_OF_TIME));
+
+    PullReplicationTasksHealthCheck check =
+        injector.getInstance(PullReplicationTasksHealthCheck.class);
+    return check;
+  }
+
+  private List<HealthCheck.Result> runNTimes(
+      int nTimes, PullReplicationTasksHealthCheck check, @Nullable Runnable postRunFunc) {
+    List<HealthCheck.Result> results = new ArrayList<>();
+    for (int i = 0; i < nTimes; i++) {
+      results.add(check.run().result);
+      if (postRunFunc != null) {
+        postRunFunc.run();
+      }
+    }
+
+    return results;
+  }
+
+  private void mockSourceWithNoOutstandingTasks() {
+    when(source.pendingTasksCount()).thenReturn(0L);
+    when(source.inflightTasksCount()).thenReturn(0L);
+  }
+
   private class TestModule extends AbstractModule {
     Config config;
     MergedConfigResource configResource;
@@ -175,6 +249,7 @@
       bind(HealthCheckConfig.class).toInstance(healthCheckConfig);
       bind(String.class).annotatedWith(PluginName.class).toInstance(PLUGIN_NAME);
       bind(SourcesCollection.class).toInstance(sourcesCollection);
+      bind(Ticker.class).toInstance(fakeTicker);
     }
   }
 }