Merge branch 'stable-3.4'
* stable-3.4:
Delete replication metrics upon project deletion
Introduce persistent cache for replication-status
Fix issue with skipping events from high-availability
Introduce endpoint to query replication-lag
Extract replication status logic from subscriber metrics
Documentation: fix default replication type for local setup
Fix healthcheck dependency name in local setup script
Introduce local (docker) setup for e2e tests
Bump up localstack from 0.12.8 to 0.12.17.5
Change-Id: Iffff8d6c0689e804a7db00f8d0e888bbcd78f2ea
diff --git a/BUILD b/BUILD
index 39e1bc3..d7bd3d8 100644
--- a/BUILD
+++ b/BUILD
@@ -12,6 +12,7 @@
manifest_entries = [
"Gerrit-PluginName: multi-site",
"Gerrit-Module: com.googlesource.gerrit.plugins.multisite.PluginModule",
+ "Gerrit-HttpModule: com.googlesource.gerrit.plugins.multisite.http.HttpModule",
"Implementation-Title: multi-site plugin",
"Implementation-URL: https://review.gerrithub.io/admin/repos/GerritForge/plugins_multi-site",
],
diff --git a/README.md b/README.md
index 34f7709..79094d3 100644
--- a/README.md
+++ b/README.md
@@ -86,3 +86,8 @@
You also need to setup the Git-level replication between nodes, for more details
please refer to the
[replication plugin documentation](https://gerrit.googlesource.com/plugins/replication/+/refs/heads/master/src/main/resources/Documentation/config.md).
+
+# HTTP endpoints
+
+For information about available HTTP endpoints please refer to
+the [documentation](src/main/resources/Documentation/http-endpoints.md).
diff --git a/setup_local_env/README.md b/setup_local_env/README.md
index fca46d8..88b40d5 100644
--- a/setup_local_env/README.md
+++ b/setup_local_env/README.md
@@ -74,7 +74,7 @@
[--gerrit2-httpd-port] Gerrit Instance 2 http port; default 18081
[--gerrit2-sshd-port] Gerrit Instance 2 sshd port; default 49418
-[--replication-type] Options [file,ssh]; default ssh
+[--replication-type] Options [file,ssh]; default file
[--replication-ssh-user] SSH user for the replication plugin; default $(whoami)
[--replication-delay] Replication delay across the two instances in seconds
diff --git a/setup_local_env/docker-compose-kinesis.yaml b/setup_local_env/docker-compose-kinesis.yaml
index 15c609e..04429af 100644
--- a/setup_local_env/docker-compose-kinesis.yaml
+++ b/setup_local_env/docker-compose-kinesis.yaml
@@ -1,7 +1,7 @@
version: '3'
services:
kinesis:
- image: localstack/localstack:0.12.8
+ image: localstack/localstack:0.12.17.5
ports:
- "4566:4566"
- "4751:4751"
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index 0115c36..4fee129 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -418,7 +418,7 @@
wget $GERRIT_CI/plugin-websession-broker-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
-O $DEPLOYMENT_LOCATION/websession-broker.jar || { echo >&2 "Cannot download websession-broker plugin: Check internet connection. Abort\
ing"; exit 1; }
- wget $GERRIT_CI/plugin-healthcheck-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/healthcheck/healthcheck.jar \
+ wget $GERRIT_CI/plugin-healthcheck-bazel-$GERRIT_BRANCH/$LAST_BUILD/healthcheck/healthcheck.jar \
-O $DEPLOYMENT_LOCATION/healthcheck.jar || { echo >&2 "Cannot download healthcheck plugin: Check internet connection. Abort\
ing"; exit 1; }
else
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index 81a0d8d..20aaef6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -22,6 +22,7 @@
import com.google.inject.Scopes;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.consumer.MultiSiteConsumerRunner;
+import com.googlesource.gerrit.plugins.multisite.consumer.ReplicationStatusModule;
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerForwarderModule;
@@ -41,6 +42,7 @@
install(new BrokerForwarderModule());
listener().to(MultiSiteConsumerRunner.class);
+ install(new ReplicationStatusModule());
if (config.getSharedRefDbConfiguration().getSharedRefDb().isEnabled()) {
listener().to(PluginStartup.class);
DynamicSet.bind(binder(), ProjectDeletedListener.class)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
new file mode 100644
index 0000000..7ce4606
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
@@ -0,0 +1,168 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.server.cache.CacheModule;
+import com.google.gerrit.server.cache.serialize.JavaCacheSerializer;
+import com.google.gerrit.server.cache.serialize.StringCacheSerializer;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.inject.Inject;
+import com.google.inject.Module;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
+import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Singleton
+public class ReplicationStatus implements LifecycleListener, ProjectDeletedListener {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private final Map<String, Long> replicationStatusPerProject = new HashMap<>();
+ static final String REPLICATION_STATUS_CACHE = "replication_status";
+
+ public static Module cacheModule() {
+ return new CacheModule() {
+ @Override
+ protected void configure() {
+ persist(REPLICATION_STATUS_CACHE, String.class, Long.class)
+ .version(1)
+ .keySerializer(StringCacheSerializer.INSTANCE)
+ .valueSerializer(new JavaCacheSerializer<>());
+ }
+ };
+ }
+
+ private final Map<String, Long> localVersionPerProject = new HashMap<>();
+ private final Cache<String, Long> cache;
+ private final ProjectVersionRefUpdate projectVersionRefUpdate;
+ private final ProjectVersionLogger verLogger;
+ private final ProjectCache projectCache;
+
+ @Inject
+ public ReplicationStatus(
+ @Named(REPLICATION_STATUS_CACHE) Cache<String, Long> cache,
+ ProjectVersionRefUpdate projectVersionRefUpdate,
+ ProjectVersionLogger verLogger,
+ ProjectCache projectCache) {
+ this.cache = cache;
+ this.projectVersionRefUpdate = projectVersionRefUpdate;
+ this.verLogger = verLogger;
+ this.projectCache = projectCache;
+ }
+
+ public Long getMaxLag() {
+ Collection<Long> lags = replicationStatusPerProject.values();
+ if (lags.isEmpty()) {
+ return 0L;
+ }
+ return Collections.max(lags);
+ }
+
+ public Map<String, Long> getReplicationLags(Integer limit) {
+ return replicationStatusPerProject.entrySet().stream()
+ .sorted((c1, c2) -> c2.getValue().compareTo(c1.getValue()))
+ .limit(limit)
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ Map.Entry::getValue,
+ (oldValue, newValue) -> oldValue,
+ LinkedHashMap::new));
+ }
+
+ public void updateReplicationLag(Project.NameKey projectName) {
+ Optional<Long> remoteVersion =
+ projectVersionRefUpdate.getProjectRemoteVersion(projectName.get());
+ Optional<Long> localVersion = projectVersionRefUpdate.getProjectLocalVersion(projectName.get());
+ if (remoteVersion.isPresent() && localVersion.isPresent()) {
+ long lag = remoteVersion.get() - localVersion.get();
+
+ if (!localVersion.get().equals(localVersionPerProject.get(projectName.get()))
+ || lag != replicationStatusPerProject.get(projectName.get())) {
+ logger.atFine().log(
+ "Updated replication lag for project '%s' of %d sec(s) [local-ref=%d global-ref=%d]",
+ projectName, lag, localVersion.get(), remoteVersion.get());
+ doUpdateLag(projectName, lag);
+ localVersionPerProject.put(projectName.get(), localVersion.get());
+ verLogger.log(projectName, localVersion.get(), lag);
+ }
+ } else {
+ logger.atFine().log(
+ "Did not update replication lag for %s because the %s version is not defined",
+ projectName, localVersion.isPresent() ? "remote" : "local");
+ }
+ }
+
+ void removeProjectFromReplicationLagMetrics(Project.NameKey projectName) {
+ Optional<Long> localVersion = projectVersionRefUpdate.getProjectLocalVersion(projectName.get());
+
+ if (!localVersion.isPresent() && replicationStatusPerProject.containsKey(projectName.get())) {
+ cache.invalidate(projectName.get());
+ replicationStatusPerProject.remove(projectName.get());
+ localVersionPerProject.remove(projectName.get());
+ verLogger.logDeleted(projectName);
+ logger.atFine().log("Removed project '%s' from replication lag metrics", projectName);
+ }
+ }
+
+ @VisibleForTesting
+ public void doUpdateLag(Project.NameKey projectName, Long lag) {
+ cache.put(projectName.get(), lag);
+ replicationStatusPerProject.put(projectName.get(), lag);
+ }
+
+ @VisibleForTesting
+ Long getReplicationStatus(String projectName) {
+ return replicationStatusPerProject.get(projectName);
+ }
+
+ @VisibleForTesting
+ Long getLocalVersion(String projectName) {
+ return localVersionPerProject.get(projectName);
+ }
+
+ @Override
+ public void start() {
+ loadAllFromCache();
+ }
+
+ @Override
+ public void stop() {}
+
+ private void loadAllFromCache() {
+ Set<String> cachedProjects =
+ projectCache.all().stream().map(Project.NameKey::get).collect(Collectors.toSet());
+ replicationStatusPerProject.putAll(cache.getAllPresent(cachedProjects));
+ }
+
+ @Override
+ public void onProjectDeleted(Event event) {
+ removeProjectFromReplicationLagMetrics(Project.nameKey(event.getProjectName()));
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
new file mode 100644
index 0000000..791517f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Scopes;
+
+public class ReplicationStatusModule extends LifecycleModule {
+ @Override
+ protected void configure() {
+ bind(ReplicationStatus.class).in(Scopes.SINGLETON);
+ install(ReplicationStatus.cacheModule());
+ listener().to(ReplicationStatus.class);
+ DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ReplicationStatus.class);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
index f7d5f99..cce74f9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
@@ -14,9 +14,7 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.entities.Project;
import com.google.gerrit.metrics.Counter1;
import com.google.gerrit.metrics.Description;
import com.google.gerrit.metrics.MetricMaker;
@@ -26,17 +24,10 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.MultiSiteMetrics;
-import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
-import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationSucceededEvent;
import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
@Singleton
public class SubscriberMetrics extends MultiSiteMetrics {
@@ -49,20 +40,12 @@
private final Counter1<String> subscriberSuccessCounter;
private final Counter1<String> subscriberFailureCounter;
- private final ProjectVersionLogger verLogger;
-
- private final Map<String, Long> replicationStatusPerProject = new HashMap<>();
- private final Map<String, Long> localVersionPerProject = new HashMap<>();
-
- private ProjectVersionRefUpdate projectVersionRefUpdate;
+ private final ReplicationStatus replicationStatus;
@Inject
- public SubscriberMetrics(
- MetricMaker metricMaker,
- ProjectVersionRefUpdate projectVersionRefUpdate,
- ProjectVersionLogger verLogger) {
+ public SubscriberMetrics(MetricMaker metricMaker, ReplicationStatus replicationStatus) {
+ this.replicationStatus = replicationStatus;
- this.projectVersionRefUpdate = projectVersionRefUpdate;
this.subscriberSuccessCounter =
metricMaker.newCounter(
"multi_site/subscriber/subscriber_message_consumer_counter",
@@ -81,15 +64,7 @@
REPLICATION_LAG_SEC,
Long.class,
new Description("Replication lag (sec)").setGauge().setUnit(Description.Units.SECONDS),
- () -> {
- Collection<Long> lags = replicationStatusPerProject.values();
- if (lags.isEmpty()) {
- return 0L;
- }
- return Collections.max(lags);
- });
-
- this.verLogger = verLogger;
+ replicationStatus::getMaxLag);
}
public void incrementSubscriberConsumedMessage() {
@@ -107,55 +82,11 @@
|| event instanceof ReplicationScheduledEvent
|| event instanceof RefUpdatedEvent) {
ProjectEvent projectEvent = (ProjectEvent) event;
- updateReplicationLagMetrics(projectEvent.getProjectNameKey());
+ replicationStatus.updateReplicationLag(projectEvent.getProjectNameKey());
} else if (event instanceof ProjectDeletionReplicationSucceededEvent) {
ProjectDeletionReplicationSucceededEvent projectDeletion =
(ProjectDeletionReplicationSucceededEvent) event;
- removeProjectFromReplicationLagMetrics(projectDeletion.getProjectNameKey());
+ replicationStatus.removeProjectFromReplicationLagMetrics(projectDeletion.getProjectNameKey());
}
}
-
- private void removeProjectFromReplicationLagMetrics(Project.NameKey projectName) {
- Optional<Long> localVersion = projectVersionRefUpdate.getProjectLocalVersion(projectName.get());
-
- if (!localVersion.isPresent() && localVersionPerProject.containsKey(projectName.get())) {
- replicationStatusPerProject.remove(projectName.get());
- localVersionPerProject.remove(projectName.get());
- verLogger.logDeleted(projectName);
- logger.atFine().log("Removed project '%s' from replication lag metrics", projectName);
- }
- }
-
- private void updateReplicationLagMetrics(Project.NameKey projectName) {
- Optional<Long> remoteVersion =
- projectVersionRefUpdate.getProjectRemoteVersion(projectName.get());
- Optional<Long> localVersion = projectVersionRefUpdate.getProjectLocalVersion(projectName.get());
- if (remoteVersion.isPresent() && localVersion.isPresent()) {
- long lag = remoteVersion.get() - localVersion.get();
-
- if (!localVersion.get().equals(localVersionPerProject.get(projectName.get()))
- || lag != replicationStatusPerProject.get(projectName.get())) {
- logger.atFine().log(
- "Published replication lag metric for project '%s' of %d sec(s) [local-ref=%d global-ref=%d]",
- projectName, lag, localVersion.get(), remoteVersion.get());
- replicationStatusPerProject.put(projectName.get(), lag);
- localVersionPerProject.put(projectName.get(), localVersion.get());
- verLogger.log(projectName, localVersion.get(), lag);
- }
- } else {
- logger.atFine().log(
- "Did not publish replication lag metric for %s because the %s version is not defined",
- projectName, localVersion.isPresent() ? "remote" : "local");
- }
- }
-
- @VisibleForTesting
- Long getReplicationStatus(String projectName) {
- return replicationStatusPerProject.get(projectName);
- }
-
- @VisibleForTesting
- Long getLocalVersion(String projectName) {
- return localVersionPerProject.get(projectName);
- }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderTask.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderTask.java
index 329b5cb..c5706a5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderTask.java
@@ -15,9 +15,9 @@
package com.googlesource.gerrit.plugins.multisite.forwarder;
public abstract class ForwarderTask implements Runnable {
- private final Thread callerThread = Thread.currentThread();
+ private final String callerThreadName = Thread.currentThread().getName();
- public Thread getCallerThread() {
- return callerThread;
+ public String getCallerThreadName() {
+ return callerThreadName;
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
index ac4c38e..19936ef 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
@@ -35,7 +35,7 @@
}
protected boolean currentThreadBelongsToHighAvailabilityPlugin(ForwarderTask task) {
- String currentThreadName = task.getCallerThread().getName();
+ String currentThreadName = task.getCallerThreadName();
return currentThreadName.contains(HIGH_AVAILABILITY_PLUGIN)
|| currentThreadName.contains(HIGH_AVAILABILITY_FORWARDER)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/http/HttpModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/http/HttpModule.java
new file mode 100644
index 0000000..2ae2d9b
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/http/HttpModule.java
@@ -0,0 +1,27 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.http;
+
+import com.google.inject.servlet.ServletModule;
+
+public class HttpModule extends ServletModule {
+
+ public static final String LAG_ENDPOINT_SEGMENT = "replication-lag";
+
+ @Override
+ protected void configureServlets() {
+ serve(String.format("/%s", LAG_ENDPOINT_SEGMENT)).with(ReplicationStatusServlet.class);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServlet.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServlet.java
new file mode 100644
index 0000000..e2b3e18
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServlet.java
@@ -0,0 +1,84 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.http;
+
+import static com.google.gerrit.server.permissions.GlobalPermission.ADMINISTRATE_SERVER;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.httpd.restapi.RestApiServlet;
+import com.google.gerrit.server.permissions.PermissionBackend;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.consumer.ReplicationStatus;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Optional;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+@Singleton
+public class ReplicationStatusServlet extends HttpServlet {
+ protected static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private static final String LIMIT_RESULT_PARAMETER = "limit";
+ private static final long serialVersionUID = 1L;
+ private static final Integer DEFAULT_LIMIT_RESULT_PARAMETER = 10;
+
+ private final Gson gson;
+ private final ReplicationStatus replicationStatus;
+ private final PermissionBackend permissionBackend;
+
+ @Inject
+ ReplicationStatusServlet(
+ Gson gson, ReplicationStatus replicationStatus, PermissionBackend permissionBackend) {
+ this.gson = gson;
+ this.replicationStatus = replicationStatus;
+ this.permissionBackend = permissionBackend;
+ }
+
+ @Override
+ protected void doGet(
+ HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse)
+ throws ServletException, IOException {
+ if (!permissionBackend.currentUser().testOrFalse(ADMINISTRATE_SERVER)) {
+ setResponse(
+ httpServletResponse,
+ HttpServletResponse.SC_FORBIDDEN,
+ String.format("%s permissions required. Operation not permitted", ADMINISTRATE_SERVER));
+ return;
+ }
+
+ int limitResult =
+ Optional.ofNullable(httpServletRequest.getParameter(LIMIT_RESULT_PARAMETER))
+ .map(Integer::parseInt)
+ .orElse(DEFAULT_LIMIT_RESULT_PARAMETER);
+
+ setResponse(
+ httpServletResponse,
+ HttpServletResponse.SC_OK,
+ gson.toJson(replicationStatus.getReplicationLags(limitResult)));
+ }
+
+ static void setResponse(HttpServletResponse httpResponse, int statusCode, String value)
+ throws IOException {
+ httpResponse.setContentType("application/json");
+ httpResponse.setStatus(statusCode);
+ PrintWriter writer = httpResponse.getWriter();
+ writer.print(new String(RestApiServlet.JSON_MAGIC));
+ writer.print(value);
+ }
+}
diff --git a/src/main/resources/Documentation/http-endpoints.md b/src/main/resources/Documentation/http-endpoints.md
new file mode 100644
index 0000000..ff56fdf
--- /dev/null
+++ b/src/main/resources/Documentation/http-endpoints.md
@@ -0,0 +1,42 @@
+HTTP endpoints
+=========================
+
+The @PLUGIN@ plugin also provides HTTP endpoints, as described here below:
+
+## replication-lag
+
+Admin users can query for the replication lag in order to understand what
+projects' replication is running behind and by how much (in milliseconds).
+
+The results are returned in a map ordered in descending order by the replication
+lag, so that the most behind projects are returned first.
+
+Whilst some lag information is also available as a metric (see
+the [documentation](./about.md#metrics)), this endpoint provides more
+information since it shows _which_ project is associated to _which specific lag_.
+
+You can query the endpoint (at the receiving end of the replication) as follows:
+
+```bash
+curl -v -XGET -u <admin> '<gerrit>/a/plugins/multi-site/replication-lag?[limit=LIMIT]'
+```
+
+Output example:
+
+```
+)]}'
+{
+ "All-Users": 62,
+ "bar": 13,
+ "foo": 0,
+ "some/other/project": 1451,
+ "baz": 6432
+}
+```
+
+Optionally the REST endpoint can receive the following additional arguments:
+
+* limit=LIMIT
+
+maximum number of projects to return
+*default:10*
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
new file mode 100644
index 0000000..aec9404
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
@@ -0,0 +1,161 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.server.project.ProjectCache;
+import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
+import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
+import java.util.Optional;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ReplicationStatusTest {
+
+ @Mock private ProjectVersionLogger verLogger;
+ @Mock private ProjectCache projectCache;
+ @Mock private ProjectVersionRefUpdate projectVersionRefUpdate;
+ private ReplicationStatus objectUnderTest;
+ private Cache<String, Long> replicationStatusCache;
+
+ @Before
+ public void setup() throws Exception {
+ when(projectCache.all())
+ .thenReturn(
+ ImmutableSortedSet.of(Project.nameKey("projectA"), Project.nameKey("projectB")));
+ replicationStatusCache = CacheBuilder.newBuilder().build();
+ objectUnderTest =
+ new ReplicationStatus(
+ replicationStatusCache, projectVersionRefUpdate, verLogger, projectCache);
+ }
+
+ @Test
+ public void shouldPopulateLagsFromPersistedCacheOnStart() {
+ replicationStatusCache.put("projectA", 10L);
+ replicationStatusCache.put("projectB", 3L);
+
+ objectUnderTest.start();
+ assertThat(objectUnderTest.getMaxLag()).isEqualTo(10L);
+ }
+
+ @Test
+ public void shouldBeAbleToUpdatePersistedCacheValues() {
+ replicationStatusCache.put("projectA", 3L);
+
+ objectUnderTest.start();
+
+ objectUnderTest.doUpdateLag(Project.nameKey("projectA"), 20L);
+ assertThat(objectUnderTest.getMaxLag()).isEqualTo(20L);
+ }
+
+ @Test
+ public void shouldCombinePersistedProjectsWithNewEntries() {
+ replicationStatusCache.put("projectA", 3L);
+ objectUnderTest.start();
+
+ objectUnderTest.doUpdateLag(Project.nameKey("projectB"), 20L);
+
+ assertThat(objectUnderTest.getReplicationLags(2).keySet())
+ .containsExactly("projectA", "projectB");
+ }
+
+ @Test
+ public void shouldUpdatePersistedCacheWhenUpdatingLagValue() {
+ objectUnderTest.doUpdateLag(Project.nameKey("projectA"), 20L);
+
+ assertThat(replicationStatusCache.getIfPresent("projectA")).isEqualTo(20L);
+ }
+
+ @Test
+ public void shouldRemoveProjectFromPersistedCache() {
+ String projectName = "projectA";
+ long lag = 100;
+ setupReplicationLag(projectName, lag);
+ when(projectVersionRefUpdate.getProjectLocalVersion(projectName)).thenReturn(Optional.empty());
+
+ objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+ assertThat(replicationStatusCache.getIfPresent(projectName)).isNull();
+ }
+
+ @Test
+ public void shouldRemoveProjectFromReplicationLags() {
+ String projectName = "projectA";
+ long lag = 100;
+ setupReplicationLag(projectName, lag);
+ when(projectVersionRefUpdate.getProjectLocalVersion(projectName)).thenReturn(Optional.empty());
+
+ assertThat(objectUnderTest.getReplicationLags(1).keySet()).containsExactly(projectName);
+
+ objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+ assertThat(objectUnderTest.getReplicationLags(1).keySet()).isEmpty();
+ }
+
+ @Test
+ public void shouldNotRemoveProjectFromReplicationLagsIfLocalVersionStillExists() {
+ String projectName = "projectA";
+ long lag = 100;
+ setupReplicationLag(projectName, lag);
+ when(projectVersionRefUpdate.getProjectLocalVersion(projectName))
+ .thenReturn(Optional.of(System.currentTimeMillis()));
+
+ objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+ assertThat(objectUnderTest.getReplicationLags(1).keySet()).containsExactly(projectName);
+ }
+
+ @Test
+ public void shouldNotEvictFromPersistentCacheIfLocalVersionStillExists() {
+ String projectName = "projectA";
+ long lag = 100;
+ setupReplicationLag(projectName, lag);
+ when(projectVersionRefUpdate.getProjectLocalVersion(projectName))
+ .thenReturn(Optional.of(System.currentTimeMillis()));
+
+ objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+ assertThat(replicationStatusCache.getIfPresent(projectName)).isEqualTo(lag);
+ }
+
+ private void setupReplicationLag(String projectName, long lag) {
+ long currentVersion = System.currentTimeMillis();
+ long newVersion = currentVersion + lag;
+ replicationStatusCache.put(projectName, 3L);
+ when(projectVersionRefUpdate.getProjectRemoteVersion(projectName))
+ .thenReturn(Optional.of(newVersion));
+ when(projectVersionRefUpdate.getProjectLocalVersion(projectName))
+ .thenReturn(Optional.of(currentVersion));
+ objectUnderTest.updateReplicationLag(Project.nameKey(projectName));
+ }
+
+ private ProjectDeletedListener.Event projectDeletedEvent(String projectName) {
+ ProjectDeletedListener.Event event = mock(ProjectDeletedListener.Event.class);
+ when(event.getProjectName()).thenReturn(projectName);
+ return event;
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
index aa17c36..ab1ef80 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
@@ -20,11 +20,13 @@
import static org.mockito.Mockito.when;
import com.google.common.base.Suppliers;
+import com.google.common.cache.CacheBuilder;
import com.google.gerrit.entities.Project;
import com.google.gerrit.metrics.MetricMaker;
import com.google.gerrit.server.data.RefUpdateAttribute;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.project.ProjectCache;
import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationSucceededEvent;
@@ -45,12 +47,17 @@
@Mock private MetricMaker metricMaker;
@Mock private ProjectVersionLogger verLogger;
+ @Mock private ProjectCache projectCache;
@Mock private ProjectVersionRefUpdate projectVersionRefUpdate;
private SubscriberMetrics metrics;
+ private ReplicationStatus replicationStatus;
@Before
public void setup() throws Exception {
- metrics = new SubscriberMetrics(metricMaker, projectVersionRefUpdate, verLogger);
+ replicationStatus =
+ new ReplicationStatus(
+ CacheBuilder.newBuilder().build(), projectVersionRefUpdate, verLogger, projectCache);
+ metrics = new SubscriberMetrics(metricMaker, replicationStatus);
}
@Test
@@ -99,8 +106,9 @@
Event refUpdateEventMessage = newRefUpdateEvent();
metrics.updateReplicationStatusMetrics(refUpdateEventMessage);
- assertThat(metrics.getReplicationStatus(A_TEST_PROJECT_NAME)).isEqualTo(replicationLagSecs);
- assertThat(metrics.getLocalVersion(A_TEST_PROJECT_NAME)).isEqualTo(nowSecs);
+ assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME))
+ .isEqualTo(replicationLagSecs);
+ assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isEqualTo(nowSecs);
when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
.thenReturn(Optional.empty());
@@ -118,8 +126,8 @@
when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
.thenReturn(Optional.empty());
- assertThat(metrics.getReplicationStatus(A_TEST_PROJECT_NAME)).isNull();
- assertThat(metrics.getLocalVersion(A_TEST_PROJECT_NAME)).isNull();
+ assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME)).isNull();
+ assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isNull();
metrics.updateReplicationStatusMetrics(eventMessage);
@@ -152,8 +160,9 @@
metrics.updateReplicationStatusMetrics(eventMessage);
- assertThat(metrics.getReplicationStatus(A_TEST_PROJECT_NAME)).isEqualTo(replicationLagSecs);
- assertThat(metrics.getLocalVersion(A_TEST_PROJECT_NAME)).isEqualTo(nowSecs);
+ assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME))
+ .isEqualTo(replicationLagSecs);
+ assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isEqualTo(nowSecs);
when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
.thenReturn(Optional.empty());
@@ -161,8 +170,8 @@
metrics.updateReplicationStatusMetrics(projectDeleteEvent);
- assertThat(metrics.getReplicationStatus(A_TEST_PROJECT_NAME)).isNull();
- assertThat(metrics.getLocalVersion(A_TEST_PROJECT_NAME)).isNull();
+ assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME)).isNull();
+ assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isNull();
}
private ProjectDeletionReplicationSucceededEvent projectDeletionSuccess()
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java
new file mode 100644
index 0000000..f95668e
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java
@@ -0,0 +1,133 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.http;
+
+import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.multisite.http.HttpModule.LAG_ENDPOINT_SEGMENT;
+
+import com.gerritforge.gerrit.globalrefdb.validation.Log4jSharedRefLogger;
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDbConfiguration;
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefLogger;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.RestResponse;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.config.GerritConfig;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.httpd.restapi.RestApiServlet;
+import com.google.inject.AbstractModule;
+import com.googlesource.gerrit.plugins.multisite.Log4jProjectVersionLogger;
+import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
+import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
+import com.googlesource.gerrit.plugins.multisite.consumer.ReplicationStatus;
+import com.googlesource.gerrit.plugins.multisite.consumer.ReplicationStatusModule;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
+import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
+import java.io.IOException;
+import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
+import org.junit.Test;
+
+@TestPlugin(
+ name = "multi-site",
+ sysModule =
+ "com.googlesource.gerrit.plugins.multisite.http.ReplicationStatusServletIT$TestModule",
+ httpModule = "com.googlesource.gerrit.plugins.multisite.http.HttpModule")
+public class ReplicationStatusServletIT extends LightweightPluginDaemonTest {
+ private static final String APPLICATION_JSON = "application/json";
+ private static final String LAG_ENDPOINT =
+ String.format("/plugins/multi-site/%s", LAG_ENDPOINT_SEGMENT);
+ private ReplicationStatus replicationStatus;
+
+ public static class TestModule extends AbstractModule {
+ @Override
+ protected void configure() {
+ install(new ForwarderModule());
+ install(new CacheModule());
+ install(new RouterModule());
+ install(new IndexModule());
+ install(new ReplicationStatusModule());
+ SharedRefDbConfiguration sharedRefDbConfig =
+ new SharedRefDbConfiguration(new Config(), "multi-site");
+ bind(SharedRefDbConfiguration.class).toInstance(sharedRefDbConfig);
+ bind(ProjectVersionLogger.class).to(Log4jProjectVersionLogger.class);
+ bind(SharedRefLogger.class).to(Log4jSharedRefLogger.class);
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ replicationStatus = plugin.getSysInjector().getInstance(ReplicationStatus.class);
+ }
+
+ @Test
+ @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+ public void shouldSucceedForAdminUsers() throws Exception {
+ RestResponse result = adminRestSession.get(LAG_ENDPOINT);
+
+ result.assertOK();
+ assertThat(result.getHeader(CONTENT_TYPE)).contains(APPLICATION_JSON);
+ }
+
+ @Test
+ @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+ public void shouldFailWhenUserHasNoAdminServerCapability() throws Exception {
+ RestResponse result = userRestSession.get(LAG_ENDPOINT);
+ result.assertForbidden();
+ assertThat(result.getEntityContent()).contains("not permitted");
+ }
+
+ @Test
+ @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+ public void shouldReturnCurrentProjectLag() throws Exception {
+ replicationStatus.doUpdateLag(Project.nameKey("foo"), 123L);
+
+ RestResponse result = adminRestSession.get(LAG_ENDPOINT);
+
+ result.assertOK();
+ assertThat(contentWithoutMagicJson(result)).isEqualTo("{\"foo\":123}");
+ }
+
+ @Test
+ @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+ public void shouldReturnProjectsOrderedDescendinglyByLag() throws Exception {
+ replicationStatus.doUpdateLag(Project.nameKey("bar"), 123L);
+ replicationStatus.doUpdateLag(Project.nameKey("foo"), 3L);
+ replicationStatus.doUpdateLag(Project.nameKey("baz"), 52300L);
+
+ RestResponse result = adminRestSession.get(LAG_ENDPOINT);
+
+ result.assertOK();
+ assertThat(contentWithoutMagicJson(result)).isEqualTo("{\"baz\":52300,\"bar\":123,\"foo\":3}");
+ }
+
+ @Test
+ @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+ public void shouldHonourTheLimitParameter() throws Exception {
+ replicationStatus.doUpdateLag(Project.nameKey("bar"), 1L);
+ replicationStatus.doUpdateLag(Project.nameKey("foo"), 2L);
+ replicationStatus.doUpdateLag(Project.nameKey("baz"), 3L);
+
+ RestResponse result = adminRestSession.get(String.format("%s?limit=2", LAG_ENDPOINT));
+
+ result.assertOK();
+ assertThat(contentWithoutMagicJson(result)).isEqualTo("{\"baz\":3,\"foo\":2}");
+ }
+
+ private String contentWithoutMagicJson(RestResponse response) throws IOException {
+ return response.getEntityContent().substring(RestApiServlet.JSON_MAGIC.length);
+ }
+}