Avoid dropping stats collection tasks
The current back-pressure mechanism for stats collection incorrectly
drops tasks.
Instead of discarding tasks, the logic should compact multiple pending
tasks into a single one, reducing pressure on the filesystem while still
ensuring data is collected.
Bug: Issue 410584024
Change-Id: Ie82379e33041c67ee41932e62c0facf04a3b43ae
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCache.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCache.java
index 2001c56..95e736d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCache.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCache.java
@@ -18,7 +18,6 @@
import static java.util.stream.Collectors.toList;
import com.codahale.metrics.MetricRegistry;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.flogger.FluentLogger;
@@ -29,7 +28,6 @@
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.gitrepometrics.collectors.GitRepoMetric;
import com.googlesource.gerrit.plugins.gitrepometrics.collectors.MetricsCollector;
-import java.time.Clock;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
@@ -43,22 +41,17 @@
private final MetricMaker metricMaker;
private final MetricRegistry metricRegistry;
private final Set<String> projects;
- private Map<String, Long> collectedAt;
- private final long gracePeriodMs;
private final boolean collectAllRepositories;
private ImmutableList<GitRepoMetric> metricsNames;
private DynamicSet<MetricsCollector> collectors;
private Set<String> staleStatsProjects;
- private final Clock clock;
-
- @VisibleForTesting
+ @Inject
GitRepoMetricsCache(
DynamicSet<MetricsCollector> collectors,
MetricMaker metricMaker,
MetricRegistry metricRegistry,
- GitRepoMetricsConfig config,
- Clock clock) {
+ GitRepoMetricsConfig config) {
this.collectors = collectors;
this.metricMaker = metricMaker;
this.metricRegistry = metricRegistry;
@@ -70,22 +63,10 @@
this.projects = new HashSet<>(config.getRepositoryNames());
this.metrics = Maps.newHashMap();
- this.collectedAt = Maps.newHashMap();
- this.clock = clock;
- this.gracePeriodMs = config.getGracePeriodMs();
this.collectAllRepositories = config.collectAllRepositories();
this.staleStatsProjects = ConcurrentHashMap.newKeySet();
}
- @Inject
- GitRepoMetricsCache(
- DynamicSet<MetricsCollector> collectors,
- MetricMaker metricMaker,
- MetricRegistry metricRegistry,
- GitRepoMetricsConfig config) {
- this(collectors, metricMaker, metricRegistry, config, Clock.systemDefaultZone());
- }
-
public Map<String, Long> getMetrics() {
return metrics;
}
@@ -102,7 +83,6 @@
createNewCallbackMetric(repoMetric, projectName);
}
});
- collectedAt.put(projectName, clock.millis());
}
private boolean metricExists(String metricName) {
@@ -130,27 +110,11 @@
return collectors;
}
- @VisibleForTesting
- public Map<String, Long> getCollectedAt() {
- return collectedAt;
- }
-
public static String getMetricName(String metricName, String projectName) {
return String.format("%s_%s", metricName, projectName).toLowerCase(Locale.ROOT);
}
public boolean shouldCollectStats(String projectName) {
- long lastCollectionTime = collectedAt.getOrDefault(projectName, 0L);
- long currentTimeMs = System.currentTimeMillis();
- boolean doCollectStats = lastCollectionTime + gracePeriodMs <= currentTimeMs;
- if (!doCollectStats) {
- logger.atFine().log(
- "Skip stats collection for %s (grace period: %d, last collection time: %d, current time:"
- + " %d",
- projectName, gracePeriodMs, lastCollectionTime, currentTimeMs);
- return false;
- }
-
return (collectAllRepositories || projects.contains(projectName))
&& !staleStatsProjects.contains(projectName);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoUpdateListener.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoUpdateListener.java
index 470232b..850b008 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoUpdateListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoUpdateListener.java
@@ -32,17 +32,20 @@
private final UpdateGitMetricsTask.Factory updateGitMetricsTaskFactory;
private final GitRepoMetricsCache gitRepoMetricsCache;
private final String instanceId;
+ private final ProjectMetricsLimiter projectMetricsLimiter;
@Inject
protected GitRepoUpdateListener(
@GerritInstanceId String instanceId,
@UpdateGitMetricsExecutor ScheduledExecutorService executor,
UpdateGitMetricsTask.Factory updateGitMetricsTaskFactory,
- GitRepoMetricsCache gitRepoMetricsCache) {
+ GitRepoMetricsCache gitRepoMetricsCache,
+ ProjectMetricsLimiter projectMetricsLimiter) {
this.instanceId = instanceId;
this.executor = executor;
this.updateGitMetricsTaskFactory = updateGitMetricsTaskFactory;
this.gitRepoMetricsCache = gitRepoMetricsCache;
+ this.projectMetricsLimiter = projectMetricsLimiter;
}
@Override
@@ -58,6 +61,7 @@
gitRepoMetricsCache.setStale(projectName);
executor.execute(
() -> {
+ projectMetricsLimiter.acquire(projectName);
gitRepoMetricsCache.unsetStale(projectName);
updateGitMetricsTask.run();
});
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/Module.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/Module.java
index 27ad5c5..b8d390e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/Module.java
@@ -51,6 +51,10 @@
listener().to(MetricsInitializer.class);
}
+ if (config.getGracePeriodMs() > 0) {
+ bind(ProjectMetricsLimiter.class).to(ProjectMetricsThrottler.class).in(Scopes.SINGLETON);
+ }
+
DynamicSet.setOf(binder(), MetricsCollector.class);
DynamicSet.bind(binder(), MetricsCollector.class).to(GitStatsMetricsCollector.class);
DynamicSet.bind(binder(), MetricsCollector.class).to(FSMetricsCollector.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsLimiter.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsLimiter.java
new file mode 100644
index 0000000..a275a66
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsLimiter.java
@@ -0,0 +1,39 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.gitrepometrics;
+
+import com.google.inject.ImplementedBy;
+
+/**
+ * A limiter interface for controlling the collection of Git repository metrics per project.
+ *
+ * <p>Implementations of this interface can apply throttling policy to regulate how often metrics
+ * collection tasks are run concurrently for a given project.
+ *
+ * <p>By default, this interface is implemented by {@link ProjectMetricsUnlimited}, which imposes no
+ * restrictions.
+ */
+@ImplementedBy(ProjectMetricsUnlimited.class)
+public interface ProjectMetricsLimiter {
+ /**
+ * Acquires permission to collect metrics for the given project.
+ *
+ * <p>This method may block, if the metrics collection for the specified project is currently
+ * throttled.
+ *
+ * @param projectName the name of the project for which metrics collection is being triggered
+ */
+ void acquire(String projectName);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsThrottler.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsThrottler.java
new file mode 100644
index 0000000..96c2532
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsThrottler.java
@@ -0,0 +1,35 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.gitrepometrics;
+
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.inject.Inject;
+import java.util.concurrent.ConcurrentHashMap;
+
+class ProjectMetricsThrottler implements ProjectMetricsLimiter {
+ private double rate;
+
+ @Inject
+ ProjectMetricsThrottler(GitRepoMetricsConfig repoMetricsConfig) {
+ this.rate = (double) 1000L / repoMetricsConfig.getGracePeriodMs();
+ }
+
+ private ConcurrentHashMap<String, RateLimiter> projectsRateLimiters = new ConcurrentHashMap<>();
+
+ @Override
+ public void acquire(String projectName) {
+ projectsRateLimiters.computeIfAbsent(projectName, (p) -> RateLimiter.create(rate)).acquire();
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsUnlimited.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsUnlimited.java
new file mode 100644
index 0000000..e5bc999
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/ProjectMetricsUnlimited.java
@@ -0,0 +1,21 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.gitrepometrics;
+
+class ProjectMetricsUnlimited implements ProjectMetricsLimiter {
+
+ @Override
+ public void acquire(String projectName) {}
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCacheTest.java b/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCacheTest.java
index 1ca7b81..acf6a4d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCacheTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCacheTest.java
@@ -23,9 +23,6 @@
import com.googlesource.gerrit.plugins.gitrepometrics.collectors.GitRepoMetric;
import com.googlesource.gerrit.plugins.gitrepometrics.collectors.MetricsCollector;
import java.io.IOException;
-import java.time.Clock;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -123,53 +120,6 @@
assertThat(gitRepoMetricsCache.shouldCollectStats(enabledRepo)).isTrue();
}
- @Test
- public void shouldSkipCollectionWhenGracePeriodIsNotExpired() throws IOException {
- ConfigSetupUtils configSetupUtils =
- new ConfigSetupUtils(Collections.singletonList(enabledRepo), "5 m");
- gitRepoMetricsConfig = configSetupUtils.getGitRepoMetricsConfig();
- gitRepoMetricsCache =
- new GitRepoMetricsCache(ds, fakeMetricMaker, new MetricRegistry(), gitRepoMetricsConfig);
-
- gitRepoMetricsCache.setMetrics(getCollectedMetrics(), enabledRepo);
-
- assertThat(gitRepoMetricsCache.shouldCollectStats(enabledRepo)).isFalse();
- }
-
- @Test
- public void shouldCollectStatsWhenGracePeriodIsExpired() throws IOException {
- ConfigSetupUtils configSetupUtils =
- new ConfigSetupUtils(Collections.singletonList(enabledRepo), "1 s");
- gitRepoMetricsConfig = configSetupUtils.getGitRepoMetricsConfig();
- gitRepoMetricsCache =
- new GitRepoMetricsCache(
- ds,
- fakeMetricMaker,
- metricRegistry,
- gitRepoMetricsConfig,
- Clock.fixed(
- Instant.now().minus(2, ChronoUnit.SECONDS), Clock.systemDefaultZone().getZone()));
-
- gitRepoMetricsCache.setMetrics(getCollectedMetrics(), enabledRepo);
-
- assertThat(gitRepoMetricsCache.shouldCollectStats(enabledRepo)).isTrue();
- }
-
- @Test
- public void shouldSetCollectionTime() throws IOException {
- ConfigSetupUtils configSetupUtils =
- new ConfigSetupUtils(Collections.singletonList(enabledRepo));
- gitRepoMetricsConfig = configSetupUtils.getGitRepoMetricsConfig();
- gitRepoMetricsCache =
- new GitRepoMetricsCache(ds, fakeMetricMaker, metricRegistry, gitRepoMetricsConfig);
-
- long currentTimeStamp = System.currentTimeMillis();
-
- gitRepoMetricsCache.setMetrics(getCollectedMetrics(), enabledRepo);
-
- assertThat(gitRepoMetricsCache.getCollectedAt().get(enabledRepo)).isAtLeast(currentTimeStamp);
- }
-
private HashMap<GitRepoMetric, Long> getCollectedMetrics() {
return Maps.newHashMap(
ImmutableMap.of(new GitRepoMetric("anyMetrics", "anyMetric description", "Count"), 1L));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitUpdateListenerTest.java b/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitUpdateListenerTest.java
index 1a3fced..e8aa086 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitUpdateListenerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitUpdateListenerTest.java
@@ -16,12 +16,14 @@
import static com.google.common.truth.Truth.assertThat;
import static com.googlesource.gerrit.plugins.gitrepometrics.GitRepoUpdateListener.REF_REPLICATED_EVENT_SUFFIX;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verifyNoInteractions;
import com.codahale.metrics.MetricRegistry;
+import com.google.gerrit.acceptance.WaitUtil;
import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.extensions.registration.DynamicSet;
@@ -38,13 +40,17 @@
import com.google.inject.Injector;
import com.google.inject.TypeLiteral;
import java.io.IOException;
+import java.time.Duration;
import java.util.Collections;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class GitUpdateListenerTest {
+ private final int MAX_WAIT_TIME_FOR_METRICS_SECS = 5;
private final GitRepositoryManager repoManager = new InMemoryRepositoryManager();
private final ScheduledExecutorService mockedExecutorService =
mock(ScheduledExecutorService.class);
@@ -96,7 +102,8 @@
producerInstanceId,
mockedExecutorService,
updateGitMetricsTaskFactory,
- gitRepoMetricsCache);
+ gitRepoMetricsCache,
+ new ProjectMetricsUnlimited());
}
@Test
@@ -153,6 +160,26 @@
assertMetricsUpdateTaskIsExecuted();
}
+ @Test
+ public void shouldTriggerRateLimiter() {
+ AtomicInteger acquireCount = new AtomicInteger();
+ GitRepoUpdateListener limitedGitRepoUpdateListener =
+ new GitRepoUpdateListener(
+ producerInstanceId,
+ Executors.newSingleThreadScheduledExecutor(),
+ updateGitMetricsTaskFactory,
+ gitRepoMetricsCache,
+ (project) -> acquireCount.incrementAndGet());
+ limitedGitRepoUpdateListener.onEvent(getRefUpdatedEvent(enabledProject));
+
+ try {
+ WaitUtil.waitUntil(
+ () -> acquireCount.get() == 1, Duration.ofSeconds(MAX_WAIT_TIME_FOR_METRICS_SECS));
+ } catch (InterruptedException e) {
+ fail(String.format("Rate limiter not triggered for project %s", enabledProject));
+ }
+ }
+
private RefUpdatedEvent getRefUpdatedEvent(String projectName) {
return getRefUpdatedEvent(projectName, producerInstanceId);
}