Merge "Add events-broker to local setup" into stable-3.0
diff --git a/BUILD b/BUILD
index eb2eeba..ec555d5 100644
--- a/BUILD
+++ b/BUILD
@@ -19,10 +19,16 @@
     deps = [
         "@global-refdb//jar",
         "@events-broker//jar",
-        "//plugins/replication",
+        ":replication-neverlink",
     ],
 )
 
+java_library(
+    name = "replication-neverlink",
+    neverlink = 1,
+    exports = ["//plugins/replication"],
+)
+
 junit_tests(
     name = "multi_site_tests",
     srcs = glob(["src/test/java/**/*.java"]),
@@ -45,5 +51,6 @@
         "@wiremock//jar",
         "@global-refdb//jar",
         "@events-broker//jar",
+        "//plugins/replication",
     ],
 )
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index 50ffb61..dc58d00 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -288,7 +288,7 @@
 REPLICATION_SSH_USER=${REPLICATION_SSH_USER:-$(whoami)}
 export REPLICATION_DELAY_SEC=${REPLICATION_DELAY_SEC:-"5"}
 export SSH_ADVERTISED_PORT=${SSH_ADVERTISED_PORT:-"29418"}
-HTTPS_ENABLED=${HTTPS_ENABLED:-"true"}
+HTTPS_ENABLED=${HTTPS_ENABLED:-"false"}
 
 COMMON_LOCATION=$DEPLOYMENT_LOCATION/gerrit_setup
 LOCATION_TEST_SITE_1=$COMMON_LOCATION/instance-1
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectReplicationStatus.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectReplicationStatus.java
new file mode 100644
index 0000000..e8b92d2
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectReplicationStatus.java
@@ -0,0 +1,34 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+/**
+ * This class contains the replication status of a Gerrit project
+ *
+ * <p>NOTE: Currently, the status is only represented by the last replication timestamp, but it
+ * could be extended (i.e.: the last replicated commit can be an interesting information to add)
+ */
+public class ProjectReplicationStatus {
+
+  private Long lastReplicationTimestamp;
+
+  public ProjectReplicationStatus(Long lastReplicationTimestamp) {
+    this.lastReplicationTimestamp = lastReplicationTimestamp;
+  }
+
+  public Long getLastReplicationTimestamp() {
+    return this.lastReplicationTimestamp;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/ReplicationStatusStore.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/ReplicationStatusStore.java
new file mode 100644
index 0000000..5f5d393
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/ReplicationStatusStore.java
@@ -0,0 +1,52 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import com.google.inject.Singleton;
+import java.util.*;
+
+/**
+ * This class stores the replication status of a Gerrit instance
+ *
+ * <p>The status is represented per project but also globally. The global replication status is, for
+ * example, the max replication timestamp of all the projects. The replication Status of a project
+ * is represented by {@see com.googlesource.gerrit.plugins.multisite.ProjectReplicationStatus}
+ */
+@Singleton
+public class ReplicationStatusStore {
+
+  private Map<String, ProjectReplicationStatus> statusPerProject;
+  private Long globalLastReplicationTime = 0L;
+
+  public ReplicationStatusStore() {
+    this.statusPerProject = new HashMap<String, ProjectReplicationStatus>();
+  }
+
+  public void updateLastReplicationTime(String projectName, Long timestamp) {
+    ProjectReplicationStatus projectReplicationStatus = new ProjectReplicationStatus(timestamp);
+    this.statusPerProject.put(projectName, projectReplicationStatus);
+    this.globalLastReplicationTime = timestamp;
+  }
+
+  public Optional<Long> getLastReplicationTime(String projectName) {
+    Optional<ProjectReplicationStatus> maybeProjectReplicationStatus =
+        Optional.ofNullable(this.statusPerProject.get(projectName));
+    return maybeProjectReplicationStatus.map(ProjectReplicationStatus::getLastReplicationTimestamp);
+  }
+
+  public Long getGlobalLastReplicationTime() {
+    return this.globalLastReplicationTime;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 9fdf87c..d0271d7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -75,6 +75,7 @@
         msgLog.log(Direction.CONSUME, topic, event);
         eventRouter.route(event.getEvent());
         subscriberMetrics.incrementSubscriberConsumedMessage();
+        subscriberMetrics.updateReplicationStatusMetrics(event);
       } catch (IOException e) {
         logger.atSevere().withCause(e).log(
             "Malformed event '%s': [Exception: %s]", event.getHeader());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
index 3a7cce7..6402e4a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
@@ -14,24 +14,47 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.metrics.Counter1;
 import com.google.gerrit.metrics.Description;
 import com.google.gerrit.metrics.Field;
 import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.ReplicationStatusStore;
+import com.googlesource.gerrit.plugins.replication.RefReplicatedEvent;
 
 @Singleton
 public class SubscriberMetrics {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final String SUBSCRIBER_SUCCESS_COUNTER = "subscriber_msg_consumer_counter";
   private static final String SUBSCRIBER_FAILURE_COUNTER =
       "subscriber_msg_consumer_failure_counter";
+  private static final String INSTANCE_LATEST_REPLICATION_TIME_METRIC =
+      "multi_site/subscriber/subscriber_replication_status/instance_latest_replication_epochtime_secs";
+  private static final String PROJECT_LATEST_REPLICATION_TIME_METRIC_PREFIX =
+      "multi_site/subscriber/subscriber_replication_status/latest_replication_epochtime_secs_";
+
+  private ReplicationStatusStore replicationStatusStore;
+  private MetricMaker metricMaker;
+  private MetricRegistry metricRegistry;
 
   private final Counter1<String> subscriberSuccessCounter;
   private final Counter1<String> subscriberFailureCounter;
 
   @Inject
-  public SubscriberMetrics(MetricMaker metricMaker) {
+  public SubscriberMetrics(
+      MetricMaker metricMaker,
+      MetricRegistry metricRegistry,
+      ReplicationStatusStore replicationStatusStore) {
+
+    this.replicationStatusStore = replicationStatusStore;
+    this.metricMaker = metricMaker;
+    this.metricRegistry = metricRegistry;
 
     this.subscriberSuccessCounter =
         metricMaker.newCounter(
@@ -48,6 +71,16 @@
                 .setUnit("errors"),
             Field.ofString(
                 SUBSCRIBER_FAILURE_COUNTER, "Subscriber failed to consume messages count"));
+
+    metricMaker.newCallbackMetric(
+        INSTANCE_LATEST_REPLICATION_TIME_METRIC,
+        Long.class,
+        new Description(
+                String.format(
+                    "%s last replication timestamp (ms)", INSTANCE_LATEST_REPLICATION_TIME_METRIC))
+            .setGauge()
+            .setUnit(Description.Units.MILLISECONDS),
+        () -> replicationStatusStore.getGlobalLastReplicationTime());
   }
 
   public void incrementSubscriberConsumedMessage() {
@@ -57,4 +90,33 @@
   public void incrementSubscriberFailedToConsumeMessage() {
     subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
   }
+
+  public void updateReplicationStatusMetrics(EventMessage eventMessage) {
+    Event event = eventMessage.getEvent();
+    if (event instanceof RefReplicatedEvent) {
+      RefReplicatedEvent refReplicatedEvent = (RefReplicatedEvent) event;
+      String projectName = refReplicatedEvent.getProjectNameKey().get();
+      logger.atInfo().log("Updating last replication time for %s", projectName);
+      replicationStatusStore.updateLastReplicationTime(projectName, event.eventCreatedOn);
+      upsertMetricsForProject(projectName);
+    } else {
+      logger.atInfo().log("Not a ref-replicated-event event [%s], skipping", event.type);
+    }
+  }
+
+  private void upsertMetricsForProject(String projectName) {
+    String metricName = PROJECT_LATEST_REPLICATION_TIME_METRIC_PREFIX + projectName;
+    if (metricRegistry.getGauges(MetricFilter.contains(metricName)).isEmpty()) {
+      metricMaker.newCallbackMetric(
+          metricName,
+          Long.class,
+          new Description(String.format("%s last replication timestamp (ms)", metricName))
+              .setGauge()
+              .setUnit(Description.Units.MILLISECONDS),
+          () -> replicationStatusStore.getLastReplicationTime(projectName).orElse(0L));
+      logger.atInfo().log("Added last replication timestamp callback metric for " + projectName);
+    } else {
+      logger.atInfo().log("Don't add metric since it already exists for project " + projectName);
+    }
+  }
 }
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 841b0d4..a868213 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -83,12 +83,20 @@
 ### Message subscriber
 * Subscriber message consumed count
 
-`multi_site/subscriber/subscriber_message_consumer_counter/subscriber_msg_consumer_counter, type=com.codahale.metrics.Meter`
+`metric=multi_site/subscriber/subscriber_message_consumer_counter/subscriber_msg_consumer_counter, type=com.codahale.metrics.Meter`
 
 * Subscriber failed to consume message count
 
-`multi_site/subscriber/subscriber_message_consumer_failure_counter/subscriber_msg_consumer_failure_counter, type=com.codahale.metrics.Meter`
+`metric=multi_site/subscriber/subscriber_message_consumer_failure_counter/subscriber_msg_consumer_failure_counter, type=com.codahale.metrics.Meter`
 
 * Subscriber failed to poll messages count
 
-`multi_site/subscriber/subscriber_message_consumer_poll_failure_counter/subscriber_msg_consumer_poll_failure_counter, type=com.codahale.metrics.Meter`
+`metric=multi_site/subscriber/subscriber_message_consumer_poll_failure_counter/subscriber_msg_consumer_poll_failure_counter, type=com.codahale.metrics.Meter`
+
+* Subscriber replication status (latest replication Epoch time in seconds) per instance
+
+`metric=multi_site/subscriber/subscriber_replication_status/instance_latest_replication_epochtime_secs, type=com.google.gerrit.metrics.dropwizard.CallbackMetricImpl`
+
+* Subscriber replication status (latest replication Epoch time in seconds) per project
+
+`metric=multi_site/subscriber/subscriber_replication_status/latest_replication_epochtime_secs_<projectName>, type=com.google.gerrit.metrics.dropwizard.CallbackMetricImpl`
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ReplicationStatusStoreTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ReplicationStatusStoreTest.java
new file mode 100644
index 0000000..314e4b4
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ReplicationStatusStoreTest.java
@@ -0,0 +1,56 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
+
+import java.time.Instant;
+import org.junit.Test;
+
+public class ReplicationStatusStoreTest {
+
+  @Test
+  public void shouldUpdateGlobalStatus() {
+    Instant instant = Instant.now();
+    Long nowish = instant.toEpochMilli();
+    ReplicationStatusStore replicationStatusStore = new ReplicationStatusStore();
+
+    replicationStatusStore.updateLastReplicationTime("myProject", nowish);
+
+    assertThat(replicationStatusStore.getGlobalLastReplicationTime()).isEqualTo(nowish);
+  }
+
+  @Test
+  public void shouldUpdateProjectStatus() {
+    String projectName = "myProject";
+    Instant instant = Instant.now();
+    Long nowish = instant.toEpochMilli();
+    ReplicationStatusStore replicationStatusStore = new ReplicationStatusStore();
+
+    replicationStatusStore.updateLastReplicationTime(projectName, nowish);
+
+    assertThat(replicationStatusStore.getLastReplicationTime(projectName)).isEqualTo(of(nowish));
+  }
+
+  @Test
+  public void shouldNotReturnProjectStatus() {
+    ReplicationStatusStore replicationStatusStore = new ReplicationStatusStore();
+
+    assertThat(replicationStatusStore.getLastReplicationTime("nonExistentProject"))
+        .isEqualTo(empty());
+  }
+}