Merge branch 'stable-3.4'

* stable-3.4:
  Delete replication metrics upon project deletion
  Introduce persistent cache for replication-status
  Fix issue with skipping events from high-availability
  Introduce endpoint to query replication-lag
  Extract replication status logic from subscriber metrics
  Documentation: fix default replication type for local setup
  Fix healthcheck dependency name in local setup script
  Introduce local (docker) setup for e2e tests
  Bump up localstack from 0.12.8 to 0.12.17.5

Change-Id: Iffff8d6c0689e804a7db00f8d0e888bbcd78f2ea
diff --git a/BUILD b/BUILD
index 39e1bc3..d7bd3d8 100644
--- a/BUILD
+++ b/BUILD
@@ -12,6 +12,7 @@
     manifest_entries = [
         "Gerrit-PluginName: multi-site",
         "Gerrit-Module: com.googlesource.gerrit.plugins.multisite.PluginModule",
+        "Gerrit-HttpModule: com.googlesource.gerrit.plugins.multisite.http.HttpModule",
         "Implementation-Title: multi-site plugin",
         "Implementation-URL: https://review.gerrithub.io/admin/repos/GerritForge/plugins_multi-site",
     ],
diff --git a/README.md b/README.md
index 34f7709..79094d3 100644
--- a/README.md
+++ b/README.md
@@ -86,3 +86,8 @@
 You also need to setup the Git-level replication between nodes, for more details
 please refer to the
 [replication plugin documentation](https://gerrit.googlesource.com/plugins/replication/+/refs/heads/master/src/main/resources/Documentation/config.md).
+
+# HTTP endpoints
+
+For information about available HTTP endpoints please refer to
+the [documentation](src/main/resources/Documentation/http-endpoints.md).
diff --git a/setup_local_env/README.md b/setup_local_env/README.md
index fca46d8..88b40d5 100644
--- a/setup_local_env/README.md
+++ b/setup_local_env/README.md
@@ -74,7 +74,7 @@
 [--gerrit2-httpd-port]          Gerrit Instance 2 http port; default 18081
 [--gerrit2-sshd-port]           Gerrit Instance 2 sshd port; default 49418
 
-[--replication-type]            Options [file,ssh]; default ssh
+[--replication-type]            Options [file,ssh]; default file
 [--replication-ssh-user]        SSH user for the replication plugin; default $(whoami)
 [--replication-delay]           Replication delay across the two instances in seconds
 
diff --git a/setup_local_env/docker-compose-kinesis.yaml b/setup_local_env/docker-compose-kinesis.yaml
index 15c609e..04429af 100644
--- a/setup_local_env/docker-compose-kinesis.yaml
+++ b/setup_local_env/docker-compose-kinesis.yaml
@@ -1,7 +1,7 @@
 version: '3'
 services:
   kinesis:
-    image: localstack/localstack:0.12.8
+    image: localstack/localstack:0.12.17.5
     ports:
       - "4566:4566"
       - "4751:4751"
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index 0115c36..4fee129 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -418,7 +418,7 @@
   wget $GERRIT_CI/plugin-websession-broker-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
   -O $DEPLOYMENT_LOCATION/websession-broker.jar || { echo >&2 "Cannot download websession-broker plugin: Check internet connection. Abort\
 ing"; exit 1; }
-  wget $GERRIT_CI/plugin-healthcheck-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/healthcheck/healthcheck.jar \
+  wget $GERRIT_CI/plugin-healthcheck-bazel-$GERRIT_BRANCH/$LAST_BUILD/healthcheck/healthcheck.jar \
   -O $DEPLOYMENT_LOCATION/healthcheck.jar || { echo >&2 "Cannot download healthcheck plugin: Check internet connection. Abort\
 ing"; exit 1; }
 else
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index 81a0d8d..20aaef6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -22,6 +22,7 @@
 import com.google.inject.Scopes;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.consumer.MultiSiteConsumerRunner;
+import com.googlesource.gerrit.plugins.multisite.consumer.ReplicationStatusModule;
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerForwarderModule;
 
@@ -41,6 +42,7 @@
     install(new BrokerForwarderModule());
     listener().to(MultiSiteConsumerRunner.class);
 
+    install(new ReplicationStatusModule());
     if (config.getSharedRefDbConfiguration().getSharedRefDb().isEnabled()) {
       listener().to(PluginStartup.class);
       DynamicSet.bind(binder(), ProjectDeletedListener.class)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
new file mode 100644
index 0000000..7ce4606
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
@@ -0,0 +1,168 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.server.cache.CacheModule;
+import com.google.gerrit.server.cache.serialize.JavaCacheSerializer;
+import com.google.gerrit.server.cache.serialize.StringCacheSerializer;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.inject.Inject;
+import com.google.inject.Module;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
+import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Singleton
+public class ReplicationStatus implements LifecycleListener, ProjectDeletedListener {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final Map<String, Long> replicationStatusPerProject = new HashMap<>();
+  static final String REPLICATION_STATUS_CACHE = "replication_status";
+
+  public static Module cacheModule() {
+    return new CacheModule() {
+      @Override
+      protected void configure() {
+        persist(REPLICATION_STATUS_CACHE, String.class, Long.class)
+            .version(1)
+            .keySerializer(StringCacheSerializer.INSTANCE)
+            .valueSerializer(new JavaCacheSerializer<>());
+      }
+    };
+  }
+
+  private final Map<String, Long> localVersionPerProject = new HashMap<>();
+  private final Cache<String, Long> cache;
+  private final ProjectVersionRefUpdate projectVersionRefUpdate;
+  private final ProjectVersionLogger verLogger;
+  private final ProjectCache projectCache;
+
+  @Inject
+  public ReplicationStatus(
+      @Named(REPLICATION_STATUS_CACHE) Cache<String, Long> cache,
+      ProjectVersionRefUpdate projectVersionRefUpdate,
+      ProjectVersionLogger verLogger,
+      ProjectCache projectCache) {
+    this.cache = cache;
+    this.projectVersionRefUpdate = projectVersionRefUpdate;
+    this.verLogger = verLogger;
+    this.projectCache = projectCache;
+  }
+
+  public Long getMaxLag() {
+    Collection<Long> lags = replicationStatusPerProject.values();
+    if (lags.isEmpty()) {
+      return 0L;
+    }
+    return Collections.max(lags);
+  }
+
+  public Map<String, Long> getReplicationLags(Integer limit) {
+    return replicationStatusPerProject.entrySet().stream()
+        .sorted((c1, c2) -> c2.getValue().compareTo(c1.getValue()))
+        .limit(limit)
+        .collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                Map.Entry::getValue,
+                (oldValue, newValue) -> oldValue,
+                LinkedHashMap::new));
+  }
+
+  public void updateReplicationLag(Project.NameKey projectName) {
+    Optional<Long> remoteVersion =
+        projectVersionRefUpdate.getProjectRemoteVersion(projectName.get());
+    Optional<Long> localVersion = projectVersionRefUpdate.getProjectLocalVersion(projectName.get());
+    if (remoteVersion.isPresent() && localVersion.isPresent()) {
+      long lag = remoteVersion.get() - localVersion.get();
+
+      if (!localVersion.get().equals(localVersionPerProject.get(projectName.get()))
+          || lag != replicationStatusPerProject.get(projectName.get())) {
+        logger.atFine().log(
+            "Updated replication lag for project '%s' of %d sec(s) [local-ref=%d global-ref=%d]",
+            projectName, lag, localVersion.get(), remoteVersion.get());
+        doUpdateLag(projectName, lag);
+        localVersionPerProject.put(projectName.get(), localVersion.get());
+        verLogger.log(projectName, localVersion.get(), lag);
+      }
+    } else {
+      logger.atFine().log(
+          "Did not update replication lag for %s because the %s version is not defined",
+          projectName, localVersion.isPresent() ? "remote" : "local");
+    }
+  }
+
+  void removeProjectFromReplicationLagMetrics(Project.NameKey projectName) {
+    Optional<Long> localVersion = projectVersionRefUpdate.getProjectLocalVersion(projectName.get());
+
+    if (!localVersion.isPresent() && replicationStatusPerProject.containsKey(projectName.get())) {
+      cache.invalidate(projectName.get());
+      replicationStatusPerProject.remove(projectName.get());
+      localVersionPerProject.remove(projectName.get());
+      verLogger.logDeleted(projectName);
+      logger.atFine().log("Removed project '%s' from replication lag metrics", projectName);
+    }
+  }
+
+  @VisibleForTesting
+  public void doUpdateLag(Project.NameKey projectName, Long lag) {
+    cache.put(projectName.get(), lag);
+    replicationStatusPerProject.put(projectName.get(), lag);
+  }
+
+  @VisibleForTesting
+  Long getReplicationStatus(String projectName) {
+    return replicationStatusPerProject.get(projectName);
+  }
+
+  @VisibleForTesting
+  Long getLocalVersion(String projectName) {
+    return localVersionPerProject.get(projectName);
+  }
+
+  @Override
+  public void start() {
+    loadAllFromCache();
+  }
+
+  @Override
+  public void stop() {}
+
+  private void loadAllFromCache() {
+    Set<String> cachedProjects =
+        projectCache.all().stream().map(Project.NameKey::get).collect(Collectors.toSet());
+    replicationStatusPerProject.putAll(cache.getAllPresent(cachedProjects));
+  }
+
+  @Override
+  public void onProjectDeleted(Event event) {
+    removeProjectFromReplicationLagMetrics(Project.nameKey(event.getProjectName()));
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
new file mode 100644
index 0000000..791517f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Scopes;
+
+public class ReplicationStatusModule extends LifecycleModule {
+  @Override
+  protected void configure() {
+    bind(ReplicationStatus.class).in(Scopes.SINGLETON);
+    install(ReplicationStatus.cacheModule());
+    listener().to(ReplicationStatus.class);
+    DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ReplicationStatus.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
index f7d5f99..cce74f9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
@@ -14,9 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.entities.Project;
 import com.google.gerrit.metrics.Counter1;
 import com.google.gerrit.metrics.Description;
 import com.google.gerrit.metrics.MetricMaker;
@@ -26,17 +24,10 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.MultiSiteMetrics;
-import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
-import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
 import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationSucceededEvent;
 import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
 import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
 import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
 
 @Singleton
 public class SubscriberMetrics extends MultiSiteMetrics {
@@ -49,20 +40,12 @@
 
   private final Counter1<String> subscriberSuccessCounter;
   private final Counter1<String> subscriberFailureCounter;
-  private final ProjectVersionLogger verLogger;
-
-  private final Map<String, Long> replicationStatusPerProject = new HashMap<>();
-  private final Map<String, Long> localVersionPerProject = new HashMap<>();
-
-  private ProjectVersionRefUpdate projectVersionRefUpdate;
+  private final ReplicationStatus replicationStatus;
 
   @Inject
-  public SubscriberMetrics(
-      MetricMaker metricMaker,
-      ProjectVersionRefUpdate projectVersionRefUpdate,
-      ProjectVersionLogger verLogger) {
+  public SubscriberMetrics(MetricMaker metricMaker, ReplicationStatus replicationStatus) {
+    this.replicationStatus = replicationStatus;
 
-    this.projectVersionRefUpdate = projectVersionRefUpdate;
     this.subscriberSuccessCounter =
         metricMaker.newCounter(
             "multi_site/subscriber/subscriber_message_consumer_counter",
@@ -81,15 +64,7 @@
         REPLICATION_LAG_SEC,
         Long.class,
         new Description("Replication lag (sec)").setGauge().setUnit(Description.Units.SECONDS),
-        () -> {
-          Collection<Long> lags = replicationStatusPerProject.values();
-          if (lags.isEmpty()) {
-            return 0L;
-          }
-          return Collections.max(lags);
-        });
-
-    this.verLogger = verLogger;
+        replicationStatus::getMaxLag);
   }
 
   public void incrementSubscriberConsumedMessage() {
@@ -107,55 +82,11 @@
         || event instanceof ReplicationScheduledEvent
         || event instanceof RefUpdatedEvent) {
       ProjectEvent projectEvent = (ProjectEvent) event;
-      updateReplicationLagMetrics(projectEvent.getProjectNameKey());
+      replicationStatus.updateReplicationLag(projectEvent.getProjectNameKey());
     } else if (event instanceof ProjectDeletionReplicationSucceededEvent) {
       ProjectDeletionReplicationSucceededEvent projectDeletion =
           (ProjectDeletionReplicationSucceededEvent) event;
-      removeProjectFromReplicationLagMetrics(projectDeletion.getProjectNameKey());
+      replicationStatus.removeProjectFromReplicationLagMetrics(projectDeletion.getProjectNameKey());
     }
   }
-
-  private void removeProjectFromReplicationLagMetrics(Project.NameKey projectName) {
-    Optional<Long> localVersion = projectVersionRefUpdate.getProjectLocalVersion(projectName.get());
-
-    if (!localVersion.isPresent() && localVersionPerProject.containsKey(projectName.get())) {
-      replicationStatusPerProject.remove(projectName.get());
-      localVersionPerProject.remove(projectName.get());
-      verLogger.logDeleted(projectName);
-      logger.atFine().log("Removed project '%s' from replication lag metrics", projectName);
-    }
-  }
-
-  private void updateReplicationLagMetrics(Project.NameKey projectName) {
-    Optional<Long> remoteVersion =
-        projectVersionRefUpdate.getProjectRemoteVersion(projectName.get());
-    Optional<Long> localVersion = projectVersionRefUpdate.getProjectLocalVersion(projectName.get());
-    if (remoteVersion.isPresent() && localVersion.isPresent()) {
-      long lag = remoteVersion.get() - localVersion.get();
-
-      if (!localVersion.get().equals(localVersionPerProject.get(projectName.get()))
-          || lag != replicationStatusPerProject.get(projectName.get())) {
-        logger.atFine().log(
-            "Published replication lag metric for project '%s' of %d sec(s) [local-ref=%d global-ref=%d]",
-            projectName, lag, localVersion.get(), remoteVersion.get());
-        replicationStatusPerProject.put(projectName.get(), lag);
-        localVersionPerProject.put(projectName.get(), localVersion.get());
-        verLogger.log(projectName, localVersion.get(), lag);
-      }
-    } else {
-      logger.atFine().log(
-          "Did not publish replication lag metric for %s because the %s version is not defined",
-          projectName, localVersion.isPresent() ? "remote" : "local");
-    }
-  }
-
-  @VisibleForTesting
-  Long getReplicationStatus(String projectName) {
-    return replicationStatusPerProject.get(projectName);
-  }
-
-  @VisibleForTesting
-  Long getLocalVersion(String projectName) {
-    return localVersionPerProject.get(projectName);
-  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderTask.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderTask.java
index 329b5cb..c5706a5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderTask.java
@@ -15,9 +15,9 @@
 package com.googlesource.gerrit.plugins.multisite.forwarder;
 
 public abstract class ForwarderTask implements Runnable {
-  private final Thread callerThread = Thread.currentThread();
+  private final String callerThreadName = Thread.currentThread().getName();
 
-  public Thread getCallerThread() {
-    return callerThread;
+  public String getCallerThreadName() {
+    return callerThreadName;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
index ac4c38e..19936ef 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
@@ -35,7 +35,7 @@
   }
 
   protected boolean currentThreadBelongsToHighAvailabilityPlugin(ForwarderTask task) {
-    String currentThreadName = task.getCallerThread().getName();
+    String currentThreadName = task.getCallerThreadName();
 
     return currentThreadName.contains(HIGH_AVAILABILITY_PLUGIN)
         || currentThreadName.contains(HIGH_AVAILABILITY_FORWARDER)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/http/HttpModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/http/HttpModule.java
new file mode 100644
index 0000000..2ae2d9b
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/http/HttpModule.java
@@ -0,0 +1,27 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.http;
+
+import com.google.inject.servlet.ServletModule;
+
+public class HttpModule extends ServletModule {
+
+  public static final String LAG_ENDPOINT_SEGMENT = "replication-lag";
+
+  @Override
+  protected void configureServlets() {
+    serve(String.format("/%s", LAG_ENDPOINT_SEGMENT)).with(ReplicationStatusServlet.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServlet.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServlet.java
new file mode 100644
index 0000000..e2b3e18
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServlet.java
@@ -0,0 +1,84 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.http;
+
+import static com.google.gerrit.server.permissions.GlobalPermission.ADMINISTRATE_SERVER;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.httpd.restapi.RestApiServlet;
+import com.google.gerrit.server.permissions.PermissionBackend;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.consumer.ReplicationStatus;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Optional;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+@Singleton
+public class ReplicationStatusServlet extends HttpServlet {
+  protected static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final String LIMIT_RESULT_PARAMETER = "limit";
+  private static final long serialVersionUID = 1L;
+  private static final Integer DEFAULT_LIMIT_RESULT_PARAMETER = 10;
+
+  private final Gson gson;
+  private final ReplicationStatus replicationStatus;
+  private final PermissionBackend permissionBackend;
+
+  @Inject
+  ReplicationStatusServlet(
+      Gson gson, ReplicationStatus replicationStatus, PermissionBackend permissionBackend) {
+    this.gson = gson;
+    this.replicationStatus = replicationStatus;
+    this.permissionBackend = permissionBackend;
+  }
+
+  @Override
+  protected void doGet(
+      HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse)
+      throws ServletException, IOException {
+    if (!permissionBackend.currentUser().testOrFalse(ADMINISTRATE_SERVER)) {
+      setResponse(
+          httpServletResponse,
+          HttpServletResponse.SC_FORBIDDEN,
+          String.format("%s permissions required. Operation not permitted", ADMINISTRATE_SERVER));
+      return;
+    }
+
+    int limitResult =
+        Optional.ofNullable(httpServletRequest.getParameter(LIMIT_RESULT_PARAMETER))
+            .map(Integer::parseInt)
+            .orElse(DEFAULT_LIMIT_RESULT_PARAMETER);
+
+    setResponse(
+        httpServletResponse,
+        HttpServletResponse.SC_OK,
+        gson.toJson(replicationStatus.getReplicationLags(limitResult)));
+  }
+
+  static void setResponse(HttpServletResponse httpResponse, int statusCode, String value)
+      throws IOException {
+    httpResponse.setContentType("application/json");
+    httpResponse.setStatus(statusCode);
+    PrintWriter writer = httpResponse.getWriter();
+    writer.print(new String(RestApiServlet.JSON_MAGIC));
+    writer.print(value);
+  }
+}
diff --git a/src/main/resources/Documentation/http-endpoints.md b/src/main/resources/Documentation/http-endpoints.md
new file mode 100644
index 0000000..ff56fdf
--- /dev/null
+++ b/src/main/resources/Documentation/http-endpoints.md
@@ -0,0 +1,42 @@
+HTTP endpoints
+=========================
+
+The @PLUGIN@ plugin also provides HTTP endpoints, as described here below:
+
+## replication-lag
+
+Admin users can query for the replication lag in order to understand what
+projects' replication is running behind and by how much (in milliseconds).
+
+The results are returned in a map ordered in descending order by the replication
+lag, so that the most behind projects are returned first.
+
+Whilst some lag information is also available as a metric (see
+the [documentation](./about.md#metrics)), this endpoint provides more
+information since it shows _which_ project is associated to _which specific lag_.
+
+You can query the endpoint (at the receiving end of the replication) as follows:
+
+```bash
+curl -v -XGET -u <admin> '<gerrit>/a/plugins/multi-site/replication-lag?[limit=LIMIT]'
+```
+
+Output example:
+
+```
+)]}'
+{
+  "All-Users": 62,
+  "bar": 13,
+  "foo": 0,
+  "some/other/project": 1451,
+  "baz": 6432
+}
+```
+
+Optionally the REST endpoint can receive the following additional arguments:
+
+* limit=LIMIT
+
+maximum number of projects to return
+*default:10*
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
new file mode 100644
index 0000000..aec9404
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
@@ -0,0 +1,161 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.server.project.ProjectCache;
+import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
+import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
+import java.util.Optional;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ReplicationStatusTest {
+
+  @Mock private ProjectVersionLogger verLogger;
+  @Mock private ProjectCache projectCache;
+  @Mock private ProjectVersionRefUpdate projectVersionRefUpdate;
+  private ReplicationStatus objectUnderTest;
+  private Cache<String, Long> replicationStatusCache;
+
+  @Before
+  public void setup() throws Exception {
+    when(projectCache.all())
+        .thenReturn(
+            ImmutableSortedSet.of(Project.nameKey("projectA"), Project.nameKey("projectB")));
+    replicationStatusCache = CacheBuilder.newBuilder().build();
+    objectUnderTest =
+        new ReplicationStatus(
+            replicationStatusCache, projectVersionRefUpdate, verLogger, projectCache);
+  }
+
+  @Test
+  public void shouldPopulateLagsFromPersistedCacheOnStart() {
+    replicationStatusCache.put("projectA", 10L);
+    replicationStatusCache.put("projectB", 3L);
+
+    objectUnderTest.start();
+    assertThat(objectUnderTest.getMaxLag()).isEqualTo(10L);
+  }
+
+  @Test
+  public void shouldBeAbleToUpdatePersistedCacheValues() {
+    replicationStatusCache.put("projectA", 3L);
+
+    objectUnderTest.start();
+
+    objectUnderTest.doUpdateLag(Project.nameKey("projectA"), 20L);
+    assertThat(objectUnderTest.getMaxLag()).isEqualTo(20L);
+  }
+
+  @Test
+  public void shouldCombinePersistedProjectsWithNewEntries() {
+    replicationStatusCache.put("projectA", 3L);
+    objectUnderTest.start();
+
+    objectUnderTest.doUpdateLag(Project.nameKey("projectB"), 20L);
+
+    assertThat(objectUnderTest.getReplicationLags(2).keySet())
+        .containsExactly("projectA", "projectB");
+  }
+
+  @Test
+  public void shouldUpdatePersistedCacheWhenUpdatingLagValue() {
+    objectUnderTest.doUpdateLag(Project.nameKey("projectA"), 20L);
+
+    assertThat(replicationStatusCache.getIfPresent("projectA")).isEqualTo(20L);
+  }
+
+  @Test
+  public void shouldRemoveProjectFromPersistedCache() {
+    String projectName = "projectA";
+    long lag = 100;
+    setupReplicationLag(projectName, lag);
+    when(projectVersionRefUpdate.getProjectLocalVersion(projectName)).thenReturn(Optional.empty());
+
+    objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+    assertThat(replicationStatusCache.getIfPresent(projectName)).isNull();
+  }
+
+  @Test
+  public void shouldRemoveProjectFromReplicationLags() {
+    String projectName = "projectA";
+    long lag = 100;
+    setupReplicationLag(projectName, lag);
+    when(projectVersionRefUpdate.getProjectLocalVersion(projectName)).thenReturn(Optional.empty());
+
+    assertThat(objectUnderTest.getReplicationLags(1).keySet()).containsExactly(projectName);
+
+    objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+    assertThat(objectUnderTest.getReplicationLags(1).keySet()).isEmpty();
+  }
+
+  @Test
+  public void shouldNotRemoveProjectFromReplicationLagsIfLocalVersionStillExists() {
+    String projectName = "projectA";
+    long lag = 100;
+    setupReplicationLag(projectName, lag);
+    when(projectVersionRefUpdate.getProjectLocalVersion(projectName))
+        .thenReturn(Optional.of(System.currentTimeMillis()));
+
+    objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+    assertThat(objectUnderTest.getReplicationLags(1).keySet()).containsExactly(projectName);
+  }
+
+  @Test
+  public void shouldNotEvictFromPersistentCacheIfLocalVersionStillExists() {
+    String projectName = "projectA";
+    long lag = 100;
+    setupReplicationLag(projectName, lag);
+    when(projectVersionRefUpdate.getProjectLocalVersion(projectName))
+        .thenReturn(Optional.of(System.currentTimeMillis()));
+
+    objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+    assertThat(replicationStatusCache.getIfPresent(projectName)).isEqualTo(lag);
+  }
+
+  private void setupReplicationLag(String projectName, long lag) {
+    long currentVersion = System.currentTimeMillis();
+    long newVersion = currentVersion + lag;
+    replicationStatusCache.put(projectName, 3L);
+    when(projectVersionRefUpdate.getProjectRemoteVersion(projectName))
+        .thenReturn(Optional.of(newVersion));
+    when(projectVersionRefUpdate.getProjectLocalVersion(projectName))
+        .thenReturn(Optional.of(currentVersion));
+    objectUnderTest.updateReplicationLag(Project.nameKey(projectName));
+  }
+
+  private ProjectDeletedListener.Event projectDeletedEvent(String projectName) {
+    ProjectDeletedListener.Event event = mock(ProjectDeletedListener.Event.class);
+    when(event.getProjectName()).thenReturn(projectName);
+    return event;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
index aa17c36..ab1ef80 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
@@ -20,11 +20,13 @@
 import static org.mockito.Mockito.when;
 
 import com.google.common.base.Suppliers;
+import com.google.common.cache.CacheBuilder;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.server.data.RefUpdateAttribute;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.project.ProjectCache;
 import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
 import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationSucceededEvent;
@@ -45,12 +47,17 @@
 
   @Mock private MetricMaker metricMaker;
   @Mock private ProjectVersionLogger verLogger;
+  @Mock private ProjectCache projectCache;
   @Mock private ProjectVersionRefUpdate projectVersionRefUpdate;
   private SubscriberMetrics metrics;
+  private ReplicationStatus replicationStatus;
 
   @Before
   public void setup() throws Exception {
-    metrics = new SubscriberMetrics(metricMaker, projectVersionRefUpdate, verLogger);
+    replicationStatus =
+        new ReplicationStatus(
+            CacheBuilder.newBuilder().build(), projectVersionRefUpdate, verLogger, projectCache);
+    metrics = new SubscriberMetrics(metricMaker, replicationStatus);
   }
 
   @Test
@@ -99,8 +106,9 @@
     Event refUpdateEventMessage = newRefUpdateEvent();
     metrics.updateReplicationStatusMetrics(refUpdateEventMessage);
 
-    assertThat(metrics.getReplicationStatus(A_TEST_PROJECT_NAME)).isEqualTo(replicationLagSecs);
-    assertThat(metrics.getLocalVersion(A_TEST_PROJECT_NAME)).isEqualTo(nowSecs);
+    assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME))
+        .isEqualTo(replicationLagSecs);
+    assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isEqualTo(nowSecs);
 
     when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
         .thenReturn(Optional.empty());
@@ -118,8 +126,8 @@
     when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
         .thenReturn(Optional.empty());
 
-    assertThat(metrics.getReplicationStatus(A_TEST_PROJECT_NAME)).isNull();
-    assertThat(metrics.getLocalVersion(A_TEST_PROJECT_NAME)).isNull();
+    assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME)).isNull();
+    assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isNull();
 
     metrics.updateReplicationStatusMetrics(eventMessage);
 
@@ -152,8 +160,9 @@
 
     metrics.updateReplicationStatusMetrics(eventMessage);
 
-    assertThat(metrics.getReplicationStatus(A_TEST_PROJECT_NAME)).isEqualTo(replicationLagSecs);
-    assertThat(metrics.getLocalVersion(A_TEST_PROJECT_NAME)).isEqualTo(nowSecs);
+    assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME))
+        .isEqualTo(replicationLagSecs);
+    assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isEqualTo(nowSecs);
 
     when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
         .thenReturn(Optional.empty());
@@ -161,8 +170,8 @@
 
     metrics.updateReplicationStatusMetrics(projectDeleteEvent);
 
-    assertThat(metrics.getReplicationStatus(A_TEST_PROJECT_NAME)).isNull();
-    assertThat(metrics.getLocalVersion(A_TEST_PROJECT_NAME)).isNull();
+    assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME)).isNull();
+    assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isNull();
   }
 
   private ProjectDeletionReplicationSucceededEvent projectDeletionSuccess()
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java
new file mode 100644
index 0000000..f95668e
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java
@@ -0,0 +1,133 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.http;
+
+import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.multisite.http.HttpModule.LAG_ENDPOINT_SEGMENT;
+
+import com.gerritforge.gerrit.globalrefdb.validation.Log4jSharedRefLogger;
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDbConfiguration;
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefLogger;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.RestResponse;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.config.GerritConfig;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.httpd.restapi.RestApiServlet;
+import com.google.inject.AbstractModule;
+import com.googlesource.gerrit.plugins.multisite.Log4jProjectVersionLogger;
+import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
+import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
+import com.googlesource.gerrit.plugins.multisite.consumer.ReplicationStatus;
+import com.googlesource.gerrit.plugins.multisite.consumer.ReplicationStatusModule;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
+import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
+import java.io.IOException;
+import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
+import org.junit.Test;
+
+@TestPlugin(
+    name = "multi-site",
+    sysModule =
+        "com.googlesource.gerrit.plugins.multisite.http.ReplicationStatusServletIT$TestModule",
+    httpModule = "com.googlesource.gerrit.plugins.multisite.http.HttpModule")
+public class ReplicationStatusServletIT extends LightweightPluginDaemonTest {
+  private static final String APPLICATION_JSON = "application/json";
+  private static final String LAG_ENDPOINT =
+      String.format("/plugins/multi-site/%s", LAG_ENDPOINT_SEGMENT);
+  private ReplicationStatus replicationStatus;
+
+  public static class TestModule extends AbstractModule {
+    @Override
+    protected void configure() {
+      install(new ForwarderModule());
+      install(new CacheModule());
+      install(new RouterModule());
+      install(new IndexModule());
+      install(new ReplicationStatusModule());
+      SharedRefDbConfiguration sharedRefDbConfig =
+          new SharedRefDbConfiguration(new Config(), "multi-site");
+      bind(SharedRefDbConfiguration.class).toInstance(sharedRefDbConfig);
+      bind(ProjectVersionLogger.class).to(Log4jProjectVersionLogger.class);
+      bind(SharedRefLogger.class).to(Log4jSharedRefLogger.class);
+    }
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    replicationStatus = plugin.getSysInjector().getInstance(ReplicationStatus.class);
+  }
+
+  @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+  public void shouldSucceedForAdminUsers() throws Exception {
+    RestResponse result = adminRestSession.get(LAG_ENDPOINT);
+
+    result.assertOK();
+    assertThat(result.getHeader(CONTENT_TYPE)).contains(APPLICATION_JSON);
+  }
+
+  @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+  public void shouldFailWhenUserHasNoAdminServerCapability() throws Exception {
+    RestResponse result = userRestSession.get(LAG_ENDPOINT);
+    result.assertForbidden();
+    assertThat(result.getEntityContent()).contains("not permitted");
+  }
+
+  @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+  public void shouldReturnCurrentProjectLag() throws Exception {
+    replicationStatus.doUpdateLag(Project.nameKey("foo"), 123L);
+
+    RestResponse result = adminRestSession.get(LAG_ENDPOINT);
+
+    result.assertOK();
+    assertThat(contentWithoutMagicJson(result)).isEqualTo("{\"foo\":123}");
+  }
+
+  @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+  public void shouldReturnProjectsOrderedDescendinglyByLag() throws Exception {
+    replicationStatus.doUpdateLag(Project.nameKey("bar"), 123L);
+    replicationStatus.doUpdateLag(Project.nameKey("foo"), 3L);
+    replicationStatus.doUpdateLag(Project.nameKey("baz"), 52300L);
+
+    RestResponse result = adminRestSession.get(LAG_ENDPOINT);
+
+    result.assertOK();
+    assertThat(contentWithoutMagicJson(result)).isEqualTo("{\"baz\":52300,\"bar\":123,\"foo\":3}");
+  }
+
+  @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+  public void shouldHonourTheLimitParameter() throws Exception {
+    replicationStatus.doUpdateLag(Project.nameKey("bar"), 1L);
+    replicationStatus.doUpdateLag(Project.nameKey("foo"), 2L);
+    replicationStatus.doUpdateLag(Project.nameKey("baz"), 3L);
+
+    RestResponse result = adminRestSession.get(String.format("%s?limit=2", LAG_ENDPOINT));
+
+    result.assertOK();
+    assertThat(contentWithoutMagicJson(result)).isEqualTo("{\"baz\":3,\"foo\":2}");
+  }
+
+  private String contentWithoutMagicJson(RestResponse response) throws IOException {
+    return response.getEntityContent().substring(RestApiServlet.JSON_MAGIC.length);
+  }
+}