Expose replication status metrics

Expose metrics about the replication status of the instance and of each project.
Currently only the timestamp of the last ref replicated of each project is exposed.

This can be useful to monitor instances going out of sync in the cluster.

Feature: Issue 12121
Change-Id: If0935a1932d4bd3fa7e6a0674510bb8fbb48c836
(cherry picked from commit c8b01ddb08ac89cbab47cd05b1f2da45f75120a9)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectReplicationStatus.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectReplicationStatus.java
new file mode 100644
index 0000000..e8b92d2
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectReplicationStatus.java
@@ -0,0 +1,34 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+/**
+ * This class contains the replication status of a Gerrit project
+ *
+ * <p>NOTE: Currently, the status is only represented by the last replication timestamp, but it
+ * could be extended (i.e.: the last replicated commit can be an interesting information to add)
+ */
+public class ProjectReplicationStatus {
+
+  private Long lastReplicationTimestamp;
+
+  public ProjectReplicationStatus(Long lastReplicationTimestamp) {
+    this.lastReplicationTimestamp = lastReplicationTimestamp;
+  }
+
+  public Long getLastReplicationTimestamp() {
+    return this.lastReplicationTimestamp;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/ReplicationStatusStore.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/ReplicationStatusStore.java
new file mode 100644
index 0000000..5f5d393
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/ReplicationStatusStore.java
@@ -0,0 +1,52 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import com.google.inject.Singleton;
+import java.util.*;
+
+/**
+ * This class stores the replication status of a Gerrit instance
+ *
+ * <p>The status is represented per project but also globally. The global replication status is, for
+ * example, the max replication timestamp of all the projects. The replication Status of a project
+ * is represented by {@see com.googlesource.gerrit.plugins.multisite.ProjectReplicationStatus}
+ */
+@Singleton
+public class ReplicationStatusStore {
+
+  private Map<String, ProjectReplicationStatus> statusPerProject;
+  private Long globalLastReplicationTime = 0L;
+
+  public ReplicationStatusStore() {
+    this.statusPerProject = new HashMap<String, ProjectReplicationStatus>();
+  }
+
+  public void updateLastReplicationTime(String projectName, Long timestamp) {
+    ProjectReplicationStatus projectReplicationStatus = new ProjectReplicationStatus(timestamp);
+    this.statusPerProject.put(projectName, projectReplicationStatus);
+    this.globalLastReplicationTime = timestamp;
+  }
+
+  public Optional<Long> getLastReplicationTime(String projectName) {
+    Optional<ProjectReplicationStatus> maybeProjectReplicationStatus =
+        Optional.ofNullable(this.statusPerProject.get(projectName));
+    return maybeProjectReplicationStatus.map(ProjectReplicationStatus::getLastReplicationTimestamp);
+  }
+
+  public Long getGlobalLastReplicationTime() {
+    return this.globalLastReplicationTime;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 9fdf87c..d0271d7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -75,6 +75,7 @@
         msgLog.log(Direction.CONSUME, topic, event);
         eventRouter.route(event.getEvent());
         subscriberMetrics.incrementSubscriberConsumedMessage();
+        subscriberMetrics.updateReplicationStatusMetrics(event);
       } catch (IOException e) {
         logger.atSevere().withCause(e).log(
             "Malformed event '%s': [Exception: %s]", event.getHeader());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
index 3a7cce7..6402e4a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
@@ -14,24 +14,47 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.metrics.Counter1;
 import com.google.gerrit.metrics.Description;
 import com.google.gerrit.metrics.Field;
 import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.ReplicationStatusStore;
+import com.googlesource.gerrit.plugins.replication.RefReplicatedEvent;
 
 @Singleton
 public class SubscriberMetrics {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final String SUBSCRIBER_SUCCESS_COUNTER = "subscriber_msg_consumer_counter";
   private static final String SUBSCRIBER_FAILURE_COUNTER =
       "subscriber_msg_consumer_failure_counter";
+  private static final String INSTANCE_LATEST_REPLICATION_TIME_METRIC =
+      "multi_site/subscriber/subscriber_replication_status/instance_latest_replication_epochtime_secs";
+  private static final String PROJECT_LATEST_REPLICATION_TIME_METRIC_PREFIX =
+      "multi_site/subscriber/subscriber_replication_status/latest_replication_epochtime_secs_";
+
+  private ReplicationStatusStore replicationStatusStore;
+  private MetricMaker metricMaker;
+  private MetricRegistry metricRegistry;
 
   private final Counter1<String> subscriberSuccessCounter;
   private final Counter1<String> subscriberFailureCounter;
 
   @Inject
-  public SubscriberMetrics(MetricMaker metricMaker) {
+  public SubscriberMetrics(
+      MetricMaker metricMaker,
+      MetricRegistry metricRegistry,
+      ReplicationStatusStore replicationStatusStore) {
+
+    this.replicationStatusStore = replicationStatusStore;
+    this.metricMaker = metricMaker;
+    this.metricRegistry = metricRegistry;
 
     this.subscriberSuccessCounter =
         metricMaker.newCounter(
@@ -48,6 +71,16 @@
                 .setUnit("errors"),
             Field.ofString(
                 SUBSCRIBER_FAILURE_COUNTER, "Subscriber failed to consume messages count"));
+
+    metricMaker.newCallbackMetric(
+        INSTANCE_LATEST_REPLICATION_TIME_METRIC,
+        Long.class,
+        new Description(
+                String.format(
+                    "%s last replication timestamp (ms)", INSTANCE_LATEST_REPLICATION_TIME_METRIC))
+            .setGauge()
+            .setUnit(Description.Units.MILLISECONDS),
+        () -> replicationStatusStore.getGlobalLastReplicationTime());
   }
 
   public void incrementSubscriberConsumedMessage() {
@@ -57,4 +90,33 @@
   public void incrementSubscriberFailedToConsumeMessage() {
     subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
   }
+
+  public void updateReplicationStatusMetrics(EventMessage eventMessage) {
+    Event event = eventMessage.getEvent();
+    if (event instanceof RefReplicatedEvent) {
+      RefReplicatedEvent refReplicatedEvent = (RefReplicatedEvent) event;
+      String projectName = refReplicatedEvent.getProjectNameKey().get();
+      logger.atInfo().log("Updating last replication time for %s", projectName);
+      replicationStatusStore.updateLastReplicationTime(projectName, event.eventCreatedOn);
+      upsertMetricsForProject(projectName);
+    } else {
+      logger.atInfo().log("Not a ref-replicated-event event [%s], skipping", event.type);
+    }
+  }
+
+  private void upsertMetricsForProject(String projectName) {
+    String metricName = PROJECT_LATEST_REPLICATION_TIME_METRIC_PREFIX + projectName;
+    if (metricRegistry.getGauges(MetricFilter.contains(metricName)).isEmpty()) {
+      metricMaker.newCallbackMetric(
+          metricName,
+          Long.class,
+          new Description(String.format("%s last replication timestamp (ms)", metricName))
+              .setGauge()
+              .setUnit(Description.Units.MILLISECONDS),
+          () -> replicationStatusStore.getLastReplicationTime(projectName).orElse(0L));
+      logger.atInfo().log("Added last replication timestamp callback metric for " + projectName);
+    } else {
+      logger.atInfo().log("Don't add metric since it already exists for project " + projectName);
+    }
+  }
 }
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index f26ee92..ff6cdff 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -81,12 +81,20 @@
 ### Message subscriber
 * Subscriber message consumed count
 
-`multi_site/subscriber/subscriber_message_consumer_counter/subscriber_msg_consumer_counter, type=com.codahale.metrics.Meter`
+`metric=multi_site/subscriber/subscriber_message_consumer_counter/subscriber_msg_consumer_counter, type=com.codahale.metrics.Meter`
 
 * Subscriber failed to consume message count
 
-`multi_site/subscriber/subscriber_message_consumer_failure_counter/subscriber_msg_consumer_failure_counter, type=com.codahale.metrics.Meter`
+`metric=multi_site/subscriber/subscriber_message_consumer_failure_counter/subscriber_msg_consumer_failure_counter, type=com.codahale.metrics.Meter`
 
 * Subscriber failed to poll messages count
 
-`multi_site/subscriber/subscriber_message_consumer_poll_failure_counter/subscriber_msg_consumer_poll_failure_counter, type=com.codahale.metrics.Meter`
+`metric=multi_site/subscriber/subscriber_message_consumer_poll_failure_counter/subscriber_msg_consumer_poll_failure_counter, type=com.codahale.metrics.Meter`
+
+* Subscriber replication status (latest replication Epoch time in seconds) per instance
+
+`metric=multi_site/subscriber/subscriber_replication_status/instance_latest_replication_epochtime_secs, type=com.google.gerrit.metrics.dropwizard.CallbackMetricImpl`
+
+* Subscriber replication status (latest replication Epoch time in seconds) per project
+
+`metric=multi_site/subscriber/subscriber_replication_status/latest_replication_epochtime_secs_<projectName>, type=com.google.gerrit.metrics.dropwizard.CallbackMetricImpl`
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ReplicationStatusStoreTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ReplicationStatusStoreTest.java
new file mode 100644
index 0000000..314e4b4
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ReplicationStatusStoreTest.java
@@ -0,0 +1,56 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
+
+import java.time.Instant;
+import org.junit.Test;
+
+public class ReplicationStatusStoreTest {
+
+  @Test
+  public void shouldUpdateGlobalStatus() {
+    Instant instant = Instant.now();
+    Long nowish = instant.toEpochMilli();
+    ReplicationStatusStore replicationStatusStore = new ReplicationStatusStore();
+
+    replicationStatusStore.updateLastReplicationTime("myProject", nowish);
+
+    assertThat(replicationStatusStore.getGlobalLastReplicationTime()).isEqualTo(nowish);
+  }
+
+  @Test
+  public void shouldUpdateProjectStatus() {
+    String projectName = "myProject";
+    Instant instant = Instant.now();
+    Long nowish = instant.toEpochMilli();
+    ReplicationStatusStore replicationStatusStore = new ReplicationStatusStore();
+
+    replicationStatusStore.updateLastReplicationTime(projectName, nowish);
+
+    assertThat(replicationStatusStore.getLastReplicationTime(projectName)).isEqualTo(of(nowish));
+  }
+
+  @Test
+  public void shouldNotReturnProjectStatus() {
+    ReplicationStatusStore replicationStatusStore = new ReplicationStatusStore();
+
+    assertThat(replicationStatusStore.getLastReplicationTime("nonExistentProject"))
+        .isEqualTo(empty());
+  }
+}