Expose replication lag metrics
Calculate difference between local version of the repository
and the one stored in the shared DB. This represents the replication
lag of the consumers.
Feaure: Issue 12321
Change-Id: I68b334e3eac48b577ebc68435ffe3c8980e723b5
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 6e95e66..c10e32a 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -9,8 +9,8 @@
maven_jar(
name = "global-refdb",
- artifact = "com.gerritforge:global-refdb:0.1.1",
- sha1 = "d6ab59906db7b20a52c8994502780b2a6ab23872",
+ artifact = "com.gerritforge:global-refdb:3.0.2",
+ sha1 = "293a807bd82a284c215213b442b3930258e01f5e"
)
maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/SharedRefDatabaseWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/SharedRefDatabaseWrapper.java
index 9d55e43..356bd69 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/SharedRefDatabaseWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/SharedRefDatabaseWrapper.java
@@ -21,6 +21,7 @@
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.reviewdb.client.Project;
import com.google.inject.Inject;
+import java.util.Optional;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
@@ -59,6 +60,12 @@
}
@Override
+ public <T> boolean compareAndPut(Project.NameKey project, String refName, T currValue, T newValue)
+ throws GlobalRefDbSystemError {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public AutoCloseable lockRef(Project.NameKey project, String refName)
throws GlobalRefDbLockException {
AutoCloseable locker = sharedRefDb().lockRef(project, refName);
@@ -77,6 +84,12 @@
sharedRefLogger.logProjectDelete(project.get());
}
+ @Override
+ public <T> Optional<T> get(Project.NameKey nameKey, String s, Class<T> clazz)
+ throws GlobalRefDbSystemError {
+ return sharedRefDb().get(nameKey, s, clazz);
+ }
+
private GlobalRefDatabase sharedRefDb() {
return sharedRefDbDynamicItem.get();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index ec6072c..816edc9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -74,6 +74,7 @@
msgLog.log(Direction.CONSUME, topic, event);
eventRouter.route(event.getEvent());
subscriberMetrics.incrementSubscriberConsumedMessage();
+ subscriberMetrics.updateReplicationStatusMetrics(event);
} catch (IOException e) {
logger.atSevere().withCause(e).log(
"Malformed event '%s': [Exception: %s]", event.getHeader());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
index 3a7cce7..fc29e8f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
@@ -14,25 +14,50 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.flogger.FluentLogger;
import com.google.gerrit.metrics.Counter1;
import com.google.gerrit.metrics.Description;
import com.google.gerrit.metrics.Field;
import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
+import com.googlesource.gerrit.plugins.replication.RefReplicatedEvent;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
@Singleton
public class SubscriberMetrics {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final String SUBSCRIBER_SUCCESS_COUNTER = "subscriber_msg_consumer_counter";
private static final String SUBSCRIBER_FAILURE_COUNTER =
"subscriber_msg_consumer_failure_counter";
+ private static final String PROJECT_REPLICATION_LAG_MS_PREFIX =
+ "multi_site/subscriber/subscriber_replication_status/ms_behind_for_";
private final Counter1<String> subscriberSuccessCounter;
private final Counter1<String> subscriberFailureCounter;
- @Inject
- public SubscriberMetrics(MetricMaker metricMaker) {
+ private MetricRegistry metricRegistry;
+ private MetricMaker metricMaker;
+ public Map<String, Long> replicationStatusPerProject = new HashMap<>();
+ private ProjectVersionRefUpdate projectVersionRefUpdate;
+
+ @Inject
+ public SubscriberMetrics(
+ MetricMaker metricMaker,
+ MetricRegistry metricRegistry,
+ ProjectVersionRefUpdate projectVersionRefUpdate) {
+
+ this.projectVersionRefUpdate = projectVersionRefUpdate;
+ this.metricMaker = metricMaker;
+ this.metricRegistry = metricRegistry;
this.subscriberSuccessCounter =
metricMaker.newCounter(
"multi_site/subscriber/subscriber_message_consumer_counter",
@@ -57,4 +82,43 @@
public void incrementSubscriberFailedToConsumeMessage() {
subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
}
+
+ public void updateReplicationStatusMetrics(EventMessage eventMessage) {
+ Event event = eventMessage.getEvent();
+ if (event instanceof RefReplicatedEvent) {
+ RefReplicatedEvent refReplicatedEvent = (RefReplicatedEvent) event;
+ String projectName = refReplicatedEvent.getProjectNameKey().get();
+ logger.atFine().log("Updating replication lag for %s", projectName);
+ Optional<Long> remoteVersion = projectVersionRefUpdate.getProjectRemoteVersion(projectName);
+ Optional<Long> localVersion = projectVersionRefUpdate.getProjectLocalVersion(projectName);
+ if (remoteVersion.isPresent() && localVersion.isPresent()) {
+ Long lag = remoteVersion.get() - localVersion.get();
+ logger.atFine().log("Calculated lag for project '%s' [%d]", projectName, lag);
+ replicationStatusPerProject.put(projectName, lag);
+ upsertMetricsForProject(projectName);
+ } else {
+ logger.atWarning().log(
+ "Didn't update metric for %s. Local [%b] or remote [%b] version is not defined",
+ projectName, localVersion.isPresent(), remoteVersion.isPresent());
+ }
+ } else {
+ logger.atInfo().log("Not a ref-replicated-event event [%s], skipping", event.type);
+ }
+ }
+
+ private void upsertMetricsForProject(String projectName) {
+ String metricName = PROJECT_REPLICATION_LAG_MS_PREFIX + projectName;
+ if (metricRegistry.getGauges(MetricFilter.contains(metricName)).isEmpty()) {
+ metricMaker.newCallbackMetric(
+ metricName,
+ Long.class,
+ new Description(String.format("%s replication lag (ms)", metricName))
+ .setGauge()
+ .setUnit(Description.Units.MILLISECONDS),
+ () -> replicationStatusPerProject.get(projectName));
+ logger.atFine().log("Added last replication timestamp callback metric for '%s'", projectName);
+ } else {
+ logger.atFine().log("Don't add metric since it already exists for project '%s'", projectName);
+ }
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
index a4866ea..bcdf63a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
@@ -18,22 +18,29 @@
import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
+import com.google.common.base.CharMatcher;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.FluentLogger;
+import com.google.common.primitives.Ints;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventListener;
import com.google.gerrit.server.events.RefUpdatedEvent;
import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.notedb.IntBlob;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.SharedRefDatabaseWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import com.googlesource.gerrit.plugins.replication.RefReplicatedEvent;
import java.io.IOException;
+import java.util.Optional;
import java.util.Set;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectIdRef;
import org.eclipse.jgit.lib.ObjectInserter;
+import org.eclipse.jgit.lib.ObjectLoader;
+import org.eclipse.jgit.lib.ObjectReader;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.RefUpdate;
import org.eclipse.jgit.lib.Repository;
@@ -41,6 +48,7 @@
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.transport.RemoteRefUpdate;
+@Singleton
public class ProjectVersionRefUpdate implements EventListener {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public static final String MULTI_SITE_VERSIONING_REF = "refs/multi-site/version";
@@ -194,6 +202,56 @@
}
}
+ public Optional<Long> getProjectLocalVersion(String projectName) {
+ try (Repository repository =
+ gitRepositoryManager.openRepository(Project.NameKey.parse(projectName))) {
+ Optional<IntBlob> blob = IntBlob.parse(repository, MULTI_SITE_VERSIONING_REF);
+ if (blob.isPresent()) {
+ Long repoVersion = Integer.toUnsignedLong(blob.get().value());
+ logger.atInfo().log("Local project '%s' has version %d", projectName, repoVersion);
+ return Optional.of(repoVersion);
+ }
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Cannot read local project '%s' version", projectName);
+ }
+ return Optional.empty();
+ }
+
+ public Optional<Long> getProjectRemoteVersion(String projectName) {
+ Optional<ObjectId> remoteObjectId =
+ sharedRefDb.get(
+ Project.NameKey.parse(projectName), MULTI_SITE_VERSIONING_REF, ObjectId.class);
+ if (remoteObjectId.isPresent()) {
+ return getLongFromObjectId(projectName, remoteObjectId.get());
+ } else {
+ logger.atSevere().log("Didn't find remote version for %s", projectName);
+ return Optional.empty();
+ }
+ }
+
+ private Optional<Long> getLongFromObjectId(String projectName, ObjectId objectId) {
+ try (Repository repository =
+ gitRepositoryManager.openRepository(Project.NameKey.parse(projectName))) {
+ ObjectReader or = repository.newObjectReader();
+ ObjectLoader ol = or.open(objectId, OBJ_BLOB);
+ if (ol.getType() != OBJ_BLOB) {
+ // In theory this should be thrown by open but not all implementations may do it properly
+ // (certainly InMemoryRepository doesn't).
+ logger.atSevere().log("Incorrect object type loaded for objectId %s", objectId.toString());
+ return Optional.empty();
+ }
+ String str = CharMatcher.whitespace().trimFrom(new String(ol.getCachedBytes(), UTF_8));
+ Integer value = Ints.tryParse(str);
+ logger.atInfo().log(
+ "Found remote version for project %s, value: %s - %d",
+ projectName, objectId.toString(), value);
+ return Optional.of(Integer.toUnsignedLong(value));
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Cannot parse objectId %s", objectId.toString());
+ return Optional.empty();
+ }
+ }
+
private ObjectId updateLocalProjectVersion(Project.NameKey projectNameKey, String refName)
throws LocalProjectVersionUpdateException {
Long lastRefUpdatedTimestamp = getLastRefUpdatedTimestamp(projectNameKey, refName);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoopSharedRefDatabase.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoopSharedRefDatabase.java
index aa15108..ef8d9b8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoopSharedRefDatabase.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoopSharedRefDatabase.java
@@ -18,6 +18,7 @@
import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException;
import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
import com.google.gerrit.reviewdb.client.Project;
+import java.util.Optional;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
@@ -35,6 +36,12 @@
}
@Override
+ public <T> boolean compareAndPut(Project.NameKey project, String refName, T currValue, T newValue)
+ throws GlobalRefDbSystemError {
+ return false;
+ }
+
+ @Override
public AutoCloseable lockRef(Project.NameKey project, String refName)
throws GlobalRefDbLockException {
return () -> {};
@@ -47,4 +54,10 @@
@Override
public void remove(Project.NameKey project) throws GlobalRefDbSystemError {}
+
+ @Override
+ public <T> Optional<T> get(Project.NameKey project, String refName, Class<T> clazz)
+ throws GlobalRefDbSystemError {
+ return Optional.empty();
+ }
}
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 42249b1..c1135bb 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -93,10 +93,6 @@
`metric=plugins/multi-site/multi_site/subscriber/subscriber_message_consumer_failure_counter/subscriber_msg_consumer_poll_failure_counter, type=com.codahale.metrics.Meter`
-* Subscriber replication status (latest replication Epoch time in seconds) per instance
+* Subscriber replication status per project (ms behind the producer)
-`metric=plugins/multi-site/multi_site/subscriber/subscriber_replication_status/instance_latest_replication_epochtime_secs, type=com.google.gerrit.metrics.dropwizard.CallbackMetricImpl`
-
-* Subscriber replication status (latest replication Epoch time in seconds) per project
-
-`metric=plugins/multi-site/multi_site/subscriber/subscriber_replication_status/latest_replication_epochtime_secs_<projectName>, type=com.google.gerrit.metrics.dropwizard.CallbackMetricImpl`
+`metric=site/multi_site/subscriber/subscriber_replication_status/ms_behind_for_<projectName>, type=com.google.gerrit.metrics.dropwizard.CallbackMetricImpl`
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
index ff1c31c..7d38ac1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
@@ -15,6 +15,7 @@
package com.googlesource.gerrit.plugins.multisite.validation;
import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@@ -31,6 +32,7 @@
import com.googlesource.gerrit.plugins.replication.ReplicationState;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
import org.eclipse.jgit.junit.TestRepository;
@@ -86,7 +88,7 @@
new ProjectVersionRefUpdate(repoManager, sharedRefDb).onEvent(refUpdatedEvent);
- Ref ref = repo.getRepository().findRef(ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF);
+ Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
verify(sharedRefDb, atMost(1))
.compareAndPut(any(Project.NameKey.class), any(Ref.class), any(ObjectId.class));
@@ -107,7 +109,7 @@
new ProjectVersionRefUpdate(repoManager, sharedRefDb).onEvent(refUpdatedEvent);
- Ref ref = repo.getRepository().findRef(ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF);
+ Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
assertThat(ref).isNull();
}
@@ -120,7 +122,7 @@
new ProjectVersionRefUpdate(repoManager, sharedRefDb).onEvent(refUpdatedEvent);
- Ref ref = repo.getRepository().findRef(ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF);
+ Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
assertThat(ref).isNull();
}
@@ -137,7 +139,7 @@
new ProjectVersionRefUpdate(repoManager, sharedRefDb).onEvent(refReplicatedEvent);
- Ref ref = repo.getRepository().findRef(ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF);
+ Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
assertThat(ref).isNotNull();
verify(sharedRefDb, never())
@@ -163,7 +165,7 @@
new ProjectVersionRefUpdate(repoManager, sharedRefDb).onEvent(refReplicatedEvent);
- Ref ref = repo.getRepository().findRef(ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF);
+ Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
assertThat(ref).isNull();
}
@@ -181,7 +183,47 @@
new ProjectVersionRefUpdate(repoManager, sharedRefDb).onEvent(refReplicatedEvent);
- Ref ref = repo.getRepository().findRef(ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF);
+ Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
assertThat(ref).isNull();
}
+
+ @Test
+ public void getRemoteProjectVersionShouldReturnCorrectValue() throws IOException {
+ updateLocalVersion();
+ Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
+ when(sharedRefDb.get(A_TEST_PROJECT_NAME_KEY, MULTI_SITE_VERSIONING_REF, ObjectId.class))
+ .thenReturn(Optional.of(ref.getObjectId()));
+
+ Optional<Long> version =
+ new ProjectVersionRefUpdate(repoManager, sharedRefDb)
+ .getProjectRemoteVersion(A_TEST_PROJECT_NAME);
+
+ assertThat(version.isPresent()).isTrue();
+ assertThat(version.get()).isEqualTo(masterCommit.getCommitTime());
+ }
+
+ @Test
+ public void getLocalProjectVersionShouldReturnCorrectValue() throws IOException {
+ updateLocalVersion();
+ Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
+
+ Optional<Long> version =
+ new ProjectVersionRefUpdate(repoManager, sharedRefDb)
+ .getProjectLocalVersion(A_TEST_PROJECT_NAME);
+
+ assertThat(version.isPresent()).isTrue();
+ assertThat(version.get()).isEqualTo(masterCommit.getCommitTime());
+ }
+
+ private void updateLocalVersion() {
+ Context.setForwardedEvent(true);
+ RefReplicatedEvent refReplicatedEvent =
+ new RefReplicatedEvent(
+ A_TEST_PROJECT_NAME,
+ A_TEST_REF_NAME,
+ "targetNode",
+ ReplicationState.RefPushResult.SUCCEEDED,
+ RemoteRefUpdate.Status.OK);
+ new ProjectVersionRefUpdate(repoManager, sharedRefDb).onEvent(refReplicatedEvent);
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
index 087259a..e2de613 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
@@ -31,6 +31,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectIdRef;
@@ -118,6 +119,13 @@
}
@Override
+ public <T> boolean compareAndPut(
+ Project.NameKey project, String refName, T currValue, T newValue)
+ throws GlobalRefDbSystemError {
+ return false;
+ }
+
+ @Override
public AutoCloseable lockRef(Project.NameKey project, String refName)
throws GlobalRefDbLockException {
return null;
@@ -125,6 +133,12 @@
@Override
public void remove(Project.NameKey project) throws GlobalRefDbSystemError {}
+
+ @Override
+ public <T> Optional<T> get(Project.NameKey project, String refName, Class<T> clazz)
+ throws GlobalRefDbSystemError {
+ return Optional.empty();
+ }
};
return new SharedRefDatabaseWrapper(
DynamicItem.itemOf(GlobalRefDatabase.class, sharedRefDatabase),