Avoid dropping stats collection tasks

The current back-pressure mechanism for stats collection incorrectly
drops tasks.

Instead of discarding tasks, the logic should compact multiple pending
tasks into a single one, reducing pressure on the filesystem while still
ensuring data is collected.

Bug: Issue 410584024
Change-Id: Ie82379e33041c67ee41932e62c0facf04a3b43ae
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCache.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCache.java
index 2001c56..95e736d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCache.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCache.java
@@ -18,7 +18,6 @@
 import static java.util.stream.Collectors.toList;
 
 import com.codahale.metrics.MetricRegistry;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.flogger.FluentLogger;
@@ -29,7 +28,6 @@
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.gitrepometrics.collectors.GitRepoMetric;
 import com.googlesource.gerrit.plugins.gitrepometrics.collectors.MetricsCollector;
-import java.time.Clock;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
@@ -43,22 +41,17 @@
   private final MetricMaker metricMaker;
   private final MetricRegistry metricRegistry;
   private final Set<String> projects;
-  private Map<String, Long> collectedAt;
-  private final long gracePeriodMs;
   private final boolean collectAllRepositories;
   private ImmutableList<GitRepoMetric> metricsNames;
   private DynamicSet<MetricsCollector> collectors;
   private Set<String> staleStatsProjects;
 
-  private final Clock clock;
-
-  @VisibleForTesting
+  @Inject
   GitRepoMetricsCache(
       DynamicSet<MetricsCollector> collectors,
       MetricMaker metricMaker,
       MetricRegistry metricRegistry,
-      GitRepoMetricsConfig config,
-      Clock clock) {
+      GitRepoMetricsConfig config) {
     this.collectors = collectors;
     this.metricMaker = metricMaker;
     this.metricRegistry = metricRegistry;
@@ -70,22 +63,10 @@
 
     this.projects = new HashSet<>(config.getRepositoryNames());
     this.metrics = Maps.newHashMap();
-    this.collectedAt = Maps.newHashMap();
-    this.clock = clock;
-    this.gracePeriodMs = config.getGracePeriodMs();
     this.collectAllRepositories = config.collectAllRepositories();
     this.staleStatsProjects = ConcurrentHashMap.newKeySet();
   }
 
-  @Inject
-  GitRepoMetricsCache(
-      DynamicSet<MetricsCollector> collectors,
-      MetricMaker metricMaker,
-      MetricRegistry metricRegistry,
-      GitRepoMetricsConfig config) {
-    this(collectors, metricMaker, metricRegistry, config, Clock.systemDefaultZone());
-  }
-
   public Map<String, Long> getMetrics() {
     return metrics;
   }
@@ -102,7 +83,6 @@
             createNewCallbackMetric(repoMetric, projectName);
           }
         });
-    collectedAt.put(projectName, clock.millis());
   }
 
   private boolean metricExists(String metricName) {
@@ -130,27 +110,11 @@
     return collectors;
   }
 
-  @VisibleForTesting
-  public Map<String, Long> getCollectedAt() {
-    return collectedAt;
-  }
-
   public static String getMetricName(String metricName, String projectName) {
     return String.format("%s_%s", metricName, projectName).toLowerCase(Locale.ROOT);
   }
 
   public boolean shouldCollectStats(String projectName) {
-    long lastCollectionTime = collectedAt.getOrDefault(projectName, 0L);
-    long currentTimeMs = System.currentTimeMillis();
-    boolean doCollectStats = lastCollectionTime + gracePeriodMs <= currentTimeMs;
-    if (!doCollectStats) {
-      logger.atFine().log(
-          "Skip stats collection for %s (grace period: %d, last collection time: %d, current time:"
-              + " %d",
-          projectName, gracePeriodMs, lastCollectionTime, currentTimeMs);
-      return false;
-    }
-
     return (collectAllRepositories || projects.contains(projectName))
         && !staleStatsProjects.contains(projectName);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoUpdateListener.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoUpdateListener.java
index 470232b..850b008 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoUpdateListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoUpdateListener.java
@@ -32,17 +32,20 @@
   private final UpdateGitMetricsTask.Factory updateGitMetricsTaskFactory;
   private final GitRepoMetricsCache gitRepoMetricsCache;
   private final String instanceId;
+  private final ProjectMetricsLimiter projectMetricsLimiter;
 
   @Inject
   protected GitRepoUpdateListener(
       @GerritInstanceId String instanceId,
       @UpdateGitMetricsExecutor ScheduledExecutorService executor,
       UpdateGitMetricsTask.Factory updateGitMetricsTaskFactory,
-      GitRepoMetricsCache gitRepoMetricsCache) {
+      GitRepoMetricsCache gitRepoMetricsCache,
+      ProjectMetricsLimiter projectMetricsLimiter) {
     this.instanceId = instanceId;
     this.executor = executor;
     this.updateGitMetricsTaskFactory = updateGitMetricsTaskFactory;
     this.gitRepoMetricsCache = gitRepoMetricsCache;
+    this.projectMetricsLimiter = projectMetricsLimiter;
   }
 
   @Override
@@ -58,6 +61,7 @@
         gitRepoMetricsCache.setStale(projectName);
         executor.execute(
             () -> {
+              projectMetricsLimiter.acquire(projectName);
               gitRepoMetricsCache.unsetStale(projectName);
               updateGitMetricsTask.run();
             });
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/Module.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/Module.java
index 27ad5c5..b8d390e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/Module.java
@@ -51,6 +51,10 @@
       listener().to(MetricsInitializer.class);
     }
 
+    if (config.getGracePeriodMs() > 0) {
+      bind(ProjectMetricsLimiter.class).to(ProjectMetricsThrottler.class).in(Scopes.SINGLETON);
+    }
+
     DynamicSet.setOf(binder(), MetricsCollector.class);
     DynamicSet.bind(binder(), MetricsCollector.class).to(GitStatsMetricsCollector.class);
     DynamicSet.bind(binder(), MetricsCollector.class).to(FSMetricsCollector.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsLimiter.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsLimiter.java
new file mode 100644
index 0000000..a275a66
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsLimiter.java
@@ -0,0 +1,39 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.gitrepometrics;
+
+import com.google.inject.ImplementedBy;
+
+/**
+ * A limiter interface for controlling the collection of Git repository metrics per project.
+ *
+ * <p>Implementations of this interface can apply throttling policy to regulate how often metrics
+ * collection tasks are run concurrently for a given project.
+ *
+ * <p>By default, this interface is implemented by {@link ProjectMetricsUnlimited}, which imposes no
+ * restrictions.
+ */
+@ImplementedBy(ProjectMetricsUnlimited.class)
+public interface ProjectMetricsLimiter {
+  /**
+   * Acquires permission to collect metrics for the given project.
+   *
+   * <p>This method may block, if the metrics collection for the specified project is currently
+   * throttled.
+   *
+   * @param projectName the name of the project for which metrics collection is being triggered
+   */
+  void acquire(String projectName);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsThrottler.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsThrottler.java
new file mode 100644
index 0000000..96c2532
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsThrottler.java
@@ -0,0 +1,35 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.gitrepometrics;
+
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.inject.Inject;
+import java.util.concurrent.ConcurrentHashMap;
+
+class ProjectMetricsThrottler implements ProjectMetricsLimiter {
+  private double rate;
+
+  @Inject
+  ProjectMetricsThrottler(GitRepoMetricsConfig repoMetricsConfig) {
+    this.rate = (double) 1000L / repoMetricsConfig.getGracePeriodMs();
+  }
+
+  private ConcurrentHashMap<String, RateLimiter> projectsRateLimiters = new ConcurrentHashMap<>();
+
+  @Override
+  public void acquire(String projectName) {
+    projectsRateLimiters.computeIfAbsent(projectName, (p) -> RateLimiter.create(rate)).acquire();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsUnlimited.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsUnlimited.java
new file mode 100644
index 0000000..e5bc999
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsUnlimited.java
@@ -0,0 +1,21 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.gitrepometrics;
+
+class ProjectMetricsUnlimited implements ProjectMetricsLimiter {
+
+  @Override
+  public void acquire(String projectName) {}
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCacheTest.java b/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCacheTest.java
index 1ca7b81..acf6a4d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCacheTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCacheTest.java
@@ -23,9 +23,6 @@
 import com.googlesource.gerrit.plugins.gitrepometrics.collectors.GitRepoMetric;
 import com.googlesource.gerrit.plugins.gitrepometrics.collectors.MetricsCollector;
 import java.io.IOException;
-import java.time.Clock;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -123,53 +120,6 @@
     assertThat(gitRepoMetricsCache.shouldCollectStats(enabledRepo)).isTrue();
   }
 
-  @Test
-  public void shouldSkipCollectionWhenGracePeriodIsNotExpired() throws IOException {
-    ConfigSetupUtils configSetupUtils =
-        new ConfigSetupUtils(Collections.singletonList(enabledRepo), "5 m");
-    gitRepoMetricsConfig = configSetupUtils.getGitRepoMetricsConfig();
-    gitRepoMetricsCache =
-        new GitRepoMetricsCache(ds, fakeMetricMaker, new MetricRegistry(), gitRepoMetricsConfig);
-
-    gitRepoMetricsCache.setMetrics(getCollectedMetrics(), enabledRepo);
-
-    assertThat(gitRepoMetricsCache.shouldCollectStats(enabledRepo)).isFalse();
-  }
-
-  @Test
-  public void shouldCollectStatsWhenGracePeriodIsExpired() throws IOException {
-    ConfigSetupUtils configSetupUtils =
-        new ConfigSetupUtils(Collections.singletonList(enabledRepo), "1 s");
-    gitRepoMetricsConfig = configSetupUtils.getGitRepoMetricsConfig();
-    gitRepoMetricsCache =
-        new GitRepoMetricsCache(
-            ds,
-            fakeMetricMaker,
-            metricRegistry,
-            gitRepoMetricsConfig,
-            Clock.fixed(
-                Instant.now().minus(2, ChronoUnit.SECONDS), Clock.systemDefaultZone().getZone()));
-
-    gitRepoMetricsCache.setMetrics(getCollectedMetrics(), enabledRepo);
-
-    assertThat(gitRepoMetricsCache.shouldCollectStats(enabledRepo)).isTrue();
-  }
-
-  @Test
-  public void shouldSetCollectionTime() throws IOException {
-    ConfigSetupUtils configSetupUtils =
-        new ConfigSetupUtils(Collections.singletonList(enabledRepo));
-    gitRepoMetricsConfig = configSetupUtils.getGitRepoMetricsConfig();
-    gitRepoMetricsCache =
-        new GitRepoMetricsCache(ds, fakeMetricMaker, metricRegistry, gitRepoMetricsConfig);
-
-    long currentTimeStamp = System.currentTimeMillis();
-
-    gitRepoMetricsCache.setMetrics(getCollectedMetrics(), enabledRepo);
-
-    assertThat(gitRepoMetricsCache.getCollectedAt().get(enabledRepo)).isAtLeast(currentTimeStamp);
-  }
-
   private HashMap<GitRepoMetric, Long> getCollectedMetrics() {
     return Maps.newHashMap(
         ImmutableMap.of(new GitRepoMetric("anyMetrics", "anyMetric description", "Count"), 1L));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitUpdateListenerTest.java b/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitUpdateListenerTest.java
index 1a3fced..e8aa086 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitUpdateListenerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitUpdateListenerTest.java
@@ -16,12 +16,14 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static com.googlesource.gerrit.plugins.gitrepometrics.GitRepoUpdateListener.REF_REPLICATED_EVENT_SUFFIX;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verifyNoInteractions;
 
 import com.codahale.metrics.MetricRegistry;
+import com.google.gerrit.acceptance.WaitUtil;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.extensions.registration.DynamicSet;
@@ -38,13 +40,17 @@
 import com.google.inject.Injector;
 import com.google.inject.TypeLiteral;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collections;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 public class GitUpdateListenerTest {
+  private final int MAX_WAIT_TIME_FOR_METRICS_SECS = 5;
   private final GitRepositoryManager repoManager = new InMemoryRepositoryManager();
   private final ScheduledExecutorService mockedExecutorService =
       mock(ScheduledExecutorService.class);
@@ -96,7 +102,8 @@
             producerInstanceId,
             mockedExecutorService,
             updateGitMetricsTaskFactory,
-            gitRepoMetricsCache);
+            gitRepoMetricsCache,
+            new ProjectMetricsUnlimited());
   }
 
   @Test
@@ -153,6 +160,26 @@
     assertMetricsUpdateTaskIsExecuted();
   }
 
+  @Test
+  public void shouldTriggerRateLimiter() {
+    AtomicInteger acquireCount = new AtomicInteger();
+    GitRepoUpdateListener limitedGitRepoUpdateListener =
+        new GitRepoUpdateListener(
+            producerInstanceId,
+            Executors.newSingleThreadScheduledExecutor(),
+            updateGitMetricsTaskFactory,
+            gitRepoMetricsCache,
+            (project) -> acquireCount.incrementAndGet());
+    limitedGitRepoUpdateListener.onEvent(getRefUpdatedEvent(enabledProject));
+
+    try {
+      WaitUtil.waitUntil(
+          () -> acquireCount.get() == 1, Duration.ofSeconds(MAX_WAIT_TIME_FOR_METRICS_SECS));
+    } catch (InterruptedException e) {
+      fail(String.format("Rate limiter not triggered for project %s", enabledProject));
+    }
+  }
+
   private RefUpdatedEvent getRefUpdatedEvent(String projectName) {
     return getRefUpdatedEvent(projectName, producerInstanceId);
   }