Expose replication lag metrics

Calculate difference between local version of the repository
and the one stored in the shared DB. This represents the replication
lag of the consumers.

Feaure: Issue 12321
Change-Id: I68b334e3eac48b577ebc68435ffe3c8980e723b5
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 6e95e66..c10e32a 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -9,8 +9,8 @@
 
     maven_jar(
         name = "global-refdb",
-        artifact = "com.gerritforge:global-refdb:0.1.1",
-        sha1 = "d6ab59906db7b20a52c8994502780b2a6ab23872",
+        artifact = "com.gerritforge:global-refdb:3.0.2",
+        sha1 = "293a807bd82a284c215213b442b3930258e01f5e"
     )
 
     maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/SharedRefDatabaseWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/SharedRefDatabaseWrapper.java
index 9d55e43..356bd69 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/SharedRefDatabaseWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/SharedRefDatabaseWrapper.java
@@ -21,6 +21,7 @@
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.reviewdb.client.Project;
 import com.google.inject.Inject;
+import java.util.Optional;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
 
@@ -59,6 +60,12 @@
   }
 
   @Override
+  public <T> boolean compareAndPut(Project.NameKey project, String refName, T currValue, T newValue)
+      throws GlobalRefDbSystemError {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public AutoCloseable lockRef(Project.NameKey project, String refName)
       throws GlobalRefDbLockException {
     AutoCloseable locker = sharedRefDb().lockRef(project, refName);
@@ -77,6 +84,12 @@
     sharedRefLogger.logProjectDelete(project.get());
   }
 
+  @Override
+  public <T> Optional<T> get(Project.NameKey nameKey, String s, Class<T> clazz)
+      throws GlobalRefDbSystemError {
+    return sharedRefDb().get(nameKey, s, clazz);
+  }
+
   private GlobalRefDatabase sharedRefDb() {
     return sharedRefDbDynamicItem.get();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index ec6072c..816edc9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -74,6 +74,7 @@
         msgLog.log(Direction.CONSUME, topic, event);
         eventRouter.route(event.getEvent());
         subscriberMetrics.incrementSubscriberConsumedMessage();
+        subscriberMetrics.updateReplicationStatusMetrics(event);
       } catch (IOException e) {
         logger.atSevere().withCause(e).log(
             "Malformed event '%s': [Exception: %s]", event.getHeader());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
index 3a7cce7..fc29e8f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
@@ -14,25 +14,50 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.metrics.Counter1;
 import com.google.gerrit.metrics.Description;
 import com.google.gerrit.metrics.Field;
 import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
+import com.googlesource.gerrit.plugins.replication.RefReplicatedEvent;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
 
 @Singleton
 public class SubscriberMetrics {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final String SUBSCRIBER_SUCCESS_COUNTER = "subscriber_msg_consumer_counter";
   private static final String SUBSCRIBER_FAILURE_COUNTER =
       "subscriber_msg_consumer_failure_counter";
+  private static final String PROJECT_REPLICATION_LAG_MS_PREFIX =
+      "multi_site/subscriber/subscriber_replication_status/ms_behind_for_";
 
   private final Counter1<String> subscriberSuccessCounter;
   private final Counter1<String> subscriberFailureCounter;
 
-  @Inject
-  public SubscriberMetrics(MetricMaker metricMaker) {
+  private MetricRegistry metricRegistry;
+  private MetricMaker metricMaker;
+  public Map<String, Long> replicationStatusPerProject = new HashMap<>();
 
+  private ProjectVersionRefUpdate projectVersionRefUpdate;
+
+  @Inject
+  public SubscriberMetrics(
+      MetricMaker metricMaker,
+      MetricRegistry metricRegistry,
+      ProjectVersionRefUpdate projectVersionRefUpdate) {
+
+    this.projectVersionRefUpdate = projectVersionRefUpdate;
+    this.metricMaker = metricMaker;
+    this.metricRegistry = metricRegistry;
     this.subscriberSuccessCounter =
         metricMaker.newCounter(
             "multi_site/subscriber/subscriber_message_consumer_counter",
@@ -57,4 +82,43 @@
   public void incrementSubscriberFailedToConsumeMessage() {
     subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
   }
+
+  public void updateReplicationStatusMetrics(EventMessage eventMessage) {
+    Event event = eventMessage.getEvent();
+    if (event instanceof RefReplicatedEvent) {
+      RefReplicatedEvent refReplicatedEvent = (RefReplicatedEvent) event;
+      String projectName = refReplicatedEvent.getProjectNameKey().get();
+      logger.atFine().log("Updating replication lag for %s", projectName);
+      Optional<Long> remoteVersion = projectVersionRefUpdate.getProjectRemoteVersion(projectName);
+      Optional<Long> localVersion = projectVersionRefUpdate.getProjectLocalVersion(projectName);
+      if (remoteVersion.isPresent() && localVersion.isPresent()) {
+        Long lag = remoteVersion.get() - localVersion.get();
+        logger.atFine().log("Calculated lag for project '%s' [%d]", projectName, lag);
+        replicationStatusPerProject.put(projectName, lag);
+        upsertMetricsForProject(projectName);
+      } else {
+        logger.atWarning().log(
+            "Didn't update metric for %s. Local [%b] or remote [%b] version is not defined",
+            projectName, localVersion.isPresent(), remoteVersion.isPresent());
+      }
+    } else {
+      logger.atInfo().log("Not a ref-replicated-event event [%s], skipping", event.type);
+    }
+  }
+
+  private void upsertMetricsForProject(String projectName) {
+    String metricName = PROJECT_REPLICATION_LAG_MS_PREFIX + projectName;
+    if (metricRegistry.getGauges(MetricFilter.contains(metricName)).isEmpty()) {
+      metricMaker.newCallbackMetric(
+          metricName,
+          Long.class,
+          new Description(String.format("%s replication lag (ms)", metricName))
+              .setGauge()
+              .setUnit(Description.Units.MILLISECONDS),
+          () -> replicationStatusPerProject.get(projectName));
+      logger.atFine().log("Added last replication timestamp callback metric for '%s'", projectName);
+    } else {
+      logger.atFine().log("Don't add metric since it already exists for project '%s'", projectName);
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
index a4866ea..bcdf63a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
@@ -18,22 +18,29 @@
 import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
 
 import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
+import com.google.common.base.CharMatcher;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.flogger.FluentLogger;
+import com.google.common.primitives.Ints;
 import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gerrit.server.events.RefUpdatedEvent;
 import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.notedb.IntBlob;
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.SharedRefDatabaseWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import com.googlesource.gerrit.plugins.replication.RefReplicatedEvent;
 import java.io.IOException;
+import java.util.Optional;
 import java.util.Set;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.ObjectIdRef;
 import org.eclipse.jgit.lib.ObjectInserter;
+import org.eclipse.jgit.lib.ObjectLoader;
+import org.eclipse.jgit.lib.ObjectReader;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.Repository;
@@ -41,6 +48,7 @@
 import org.eclipse.jgit.revwalk.RevWalk;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 
+@Singleton
 public class ProjectVersionRefUpdate implements EventListener {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   public static final String MULTI_SITE_VERSIONING_REF = "refs/multi-site/version";
@@ -194,6 +202,56 @@
     }
   }
 
+  public Optional<Long> getProjectLocalVersion(String projectName) {
+    try (Repository repository =
+        gitRepositoryManager.openRepository(Project.NameKey.parse(projectName))) {
+      Optional<IntBlob> blob = IntBlob.parse(repository, MULTI_SITE_VERSIONING_REF);
+      if (blob.isPresent()) {
+        Long repoVersion = Integer.toUnsignedLong(blob.get().value());
+        logger.atInfo().log("Local project '%s' has version %d", projectName, repoVersion);
+        return Optional.of(repoVersion);
+      }
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Cannot read local project '%s' version", projectName);
+    }
+    return Optional.empty();
+  }
+
+  public Optional<Long> getProjectRemoteVersion(String projectName) {
+    Optional<ObjectId> remoteObjectId =
+        sharedRefDb.get(
+            Project.NameKey.parse(projectName), MULTI_SITE_VERSIONING_REF, ObjectId.class);
+    if (remoteObjectId.isPresent()) {
+      return getLongFromObjectId(projectName, remoteObjectId.get());
+    } else {
+      logger.atSevere().log("Didn't find remote version for %s", projectName);
+      return Optional.empty();
+    }
+  }
+
+  private Optional<Long> getLongFromObjectId(String projectName, ObjectId objectId) {
+    try (Repository repository =
+        gitRepositoryManager.openRepository(Project.NameKey.parse(projectName))) {
+      ObjectReader or = repository.newObjectReader();
+      ObjectLoader ol = or.open(objectId, OBJ_BLOB);
+      if (ol.getType() != OBJ_BLOB) {
+        // In theory this should be thrown by open but not all implementations may do it properly
+        // (certainly InMemoryRepository doesn't).
+        logger.atSevere().log("Incorrect object type loaded for objectId %s", objectId.toString());
+        return Optional.empty();
+      }
+      String str = CharMatcher.whitespace().trimFrom(new String(ol.getCachedBytes(), UTF_8));
+      Integer value = Ints.tryParse(str);
+      logger.atInfo().log(
+          "Found remote version for project %s, value: %s - %d",
+          projectName, objectId.toString(), value);
+      return Optional.of(Integer.toUnsignedLong(value));
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Cannot parse objectId %s", objectId.toString());
+      return Optional.empty();
+    }
+  }
+
   private ObjectId updateLocalProjectVersion(Project.NameKey projectNameKey, String refName)
       throws LocalProjectVersionUpdateException {
     Long lastRefUpdatedTimestamp = getLastRefUpdatedTimestamp(projectNameKey, refName);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoopSharedRefDatabase.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoopSharedRefDatabase.java
index aa15108..ef8d9b8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoopSharedRefDatabase.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoopSharedRefDatabase.java
@@ -18,6 +18,7 @@
 import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException;
 import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
 import com.google.gerrit.reviewdb.client.Project;
+import java.util.Optional;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
 
@@ -35,6 +36,12 @@
   }
 
   @Override
+  public <T> boolean compareAndPut(Project.NameKey project, String refName, T currValue, T newValue)
+      throws GlobalRefDbSystemError {
+    return false;
+  }
+
+  @Override
   public AutoCloseable lockRef(Project.NameKey project, String refName)
       throws GlobalRefDbLockException {
     return () -> {};
@@ -47,4 +54,10 @@
 
   @Override
   public void remove(Project.NameKey project) throws GlobalRefDbSystemError {}
+
+  @Override
+  public <T> Optional<T> get(Project.NameKey project, String refName, Class<T> clazz)
+      throws GlobalRefDbSystemError {
+    return Optional.empty();
+  }
 }
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 42249b1..c1135bb 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -93,10 +93,6 @@
 
 `metric=plugins/multi-site/multi_site/subscriber/subscriber_message_consumer_failure_counter/subscriber_msg_consumer_poll_failure_counter, type=com.codahale.metrics.Meter`
 
-* Subscriber replication status (latest replication Epoch time in seconds) per instance
+* Subscriber replication status per project (ms behind the producer)
 
-`metric=plugins/multi-site/multi_site/subscriber/subscriber_replication_status/instance_latest_replication_epochtime_secs, type=com.google.gerrit.metrics.dropwizard.CallbackMetricImpl`
-
-* Subscriber replication status (latest replication Epoch time in seconds) per project
-
-`metric=plugins/multi-site/multi_site/subscriber/subscriber_replication_status/latest_replication_epochtime_secs_<projectName>, type=com.google.gerrit.metrics.dropwizard.CallbackMetricImpl`
+`metric=site/multi_site/subscriber/subscriber_replication_status/ms_behind_for_<projectName>, type=com.google.gerrit.metrics.dropwizard.CallbackMetricImpl`
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
index ff1c31c..7d38ac1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.multisite.validation;
 
 import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.*;
 
@@ -31,6 +32,7 @@
 import com.googlesource.gerrit.plugins.replication.ReplicationState;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.Optional;
 import org.apache.commons.io.IOUtils;
 import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
 import org.eclipse.jgit.junit.TestRepository;
@@ -86,7 +88,7 @@
 
     new ProjectVersionRefUpdate(repoManager, sharedRefDb).onEvent(refUpdatedEvent);
 
-    Ref ref = repo.getRepository().findRef(ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF);
+    Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
 
     verify(sharedRefDb, atMost(1))
         .compareAndPut(any(Project.NameKey.class), any(Ref.class), any(ObjectId.class));
@@ -107,7 +109,7 @@
 
     new ProjectVersionRefUpdate(repoManager, sharedRefDb).onEvent(refUpdatedEvent);
 
-    Ref ref = repo.getRepository().findRef(ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF);
+    Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
     assertThat(ref).isNull();
   }
 
@@ -120,7 +122,7 @@
 
     new ProjectVersionRefUpdate(repoManager, sharedRefDb).onEvent(refUpdatedEvent);
 
-    Ref ref = repo.getRepository().findRef(ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF);
+    Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
     assertThat(ref).isNull();
   }
 
@@ -137,7 +139,7 @@
 
     new ProjectVersionRefUpdate(repoManager, sharedRefDb).onEvent(refReplicatedEvent);
 
-    Ref ref = repo.getRepository().findRef(ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF);
+    Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
     assertThat(ref).isNotNull();
 
     verify(sharedRefDb, never())
@@ -163,7 +165,7 @@
 
     new ProjectVersionRefUpdate(repoManager, sharedRefDb).onEvent(refReplicatedEvent);
 
-    Ref ref = repo.getRepository().findRef(ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF);
+    Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
     assertThat(ref).isNull();
   }
 
@@ -181,7 +183,47 @@
 
     new ProjectVersionRefUpdate(repoManager, sharedRefDb).onEvent(refReplicatedEvent);
 
-    Ref ref = repo.getRepository().findRef(ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF);
+    Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
     assertThat(ref).isNull();
   }
+
+  @Test
+  public void getRemoteProjectVersionShouldReturnCorrectValue() throws IOException {
+    updateLocalVersion();
+    Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
+    when(sharedRefDb.get(A_TEST_PROJECT_NAME_KEY, MULTI_SITE_VERSIONING_REF, ObjectId.class))
+        .thenReturn(Optional.of(ref.getObjectId()));
+
+    Optional<Long> version =
+        new ProjectVersionRefUpdate(repoManager, sharedRefDb)
+            .getProjectRemoteVersion(A_TEST_PROJECT_NAME);
+
+    assertThat(version.isPresent()).isTrue();
+    assertThat(version.get()).isEqualTo(masterCommit.getCommitTime());
+  }
+
+  @Test
+  public void getLocalProjectVersionShouldReturnCorrectValue() throws IOException {
+    updateLocalVersion();
+    Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
+
+    Optional<Long> version =
+        new ProjectVersionRefUpdate(repoManager, sharedRefDb)
+            .getProjectLocalVersion(A_TEST_PROJECT_NAME);
+
+    assertThat(version.isPresent()).isTrue();
+    assertThat(version.get()).isEqualTo(masterCommit.getCommitTime());
+  }
+
+  private void updateLocalVersion() {
+    Context.setForwardedEvent(true);
+    RefReplicatedEvent refReplicatedEvent =
+        new RefReplicatedEvent(
+            A_TEST_PROJECT_NAME,
+            A_TEST_REF_NAME,
+            "targetNode",
+            ReplicationState.RefPushResult.SUCCEEDED,
+            RemoteRefUpdate.Status.OK);
+    new ProjectVersionRefUpdate(repoManager, sharedRefDb).onEvent(refReplicatedEvent);
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
index 087259a..e2de613 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
@@ -31,6 +31,7 @@
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.ObjectIdRef;
@@ -118,6 +119,13 @@
           }
 
           @Override
+          public <T> boolean compareAndPut(
+              Project.NameKey project, String refName, T currValue, T newValue)
+              throws GlobalRefDbSystemError {
+            return false;
+          }
+
+          @Override
           public AutoCloseable lockRef(Project.NameKey project, String refName)
               throws GlobalRefDbLockException {
             return null;
@@ -125,6 +133,12 @@
 
           @Override
           public void remove(Project.NameKey project) throws GlobalRefDbSystemError {}
+
+          @Override
+          public <T> Optional<T> get(Project.NameKey project, String refName, Class<T> clazz)
+              throws GlobalRefDbSystemError {
+            return Optional.empty();
+          }
         };
     return new SharedRefDatabaseWrapper(
         DynamicItem.itemOf(GlobalRefDatabase.class, sharedRefDatabase),