Introduce E2E fetch REST-API metrics
The E2E replication through the fetch REST-API
needs to be calculated from the client originating the call
where the initial ref-update happens.
Introduce a new end_2_end metric to track this latency.
When the pull replication is executed asynchronously,
the client originating the call propagates the timestamp
in an HTTP header (X-StartTimeNanos) and propagated
to the receiver side, which will use the initial time for
recording the final E2E replication time at the receiver
side.
Also add the pull-replication async integration tests
that were completely missing before: testing with async
replication is key for demonstrating that the E2E metrics
are working as expected.
Consequently, the TEST_REPLICATION_DELAY is reduced to
1 second for preventing the whole test suite to fail
for timeout.
TODO: Additional assertion on E2E metrics in sync and async
modes.
Change-Id: Ieb29e3475dc0938fe7700007b7d05a630c3c927f
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
index bcd86d7..42310ff 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
@@ -21,6 +21,7 @@
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.transport.URIish;
@@ -91,7 +92,7 @@
for (Source cfg : sources.getAll()) {
if (cfg.wouldFetchProject(project)) {
for (URIish uri : cfg.getURIs(project, urlMatch)) {
- cfg.schedule(project, FetchOne.ALL_REFS, uri, state, replicationType);
+ cfg.schedule(project, FetchOne.ALL_REFS, uri, state, replicationType, Optional.empty());
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index cee52b5..cdf573f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -32,6 +32,7 @@
import com.google.gerrit.server.util.IdGenerator;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
import com.googlesource.gerrit.plugins.replication.pull.fetch.Fetch;
import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchFactory;
import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
@@ -69,7 +70,8 @@
static final String ID_KEY = "fetchOneId";
interface Factory {
- FetchOne create(Project.NameKey d, URIish u);
+ FetchOne create(
+ Project.NameKey d, URIish u, Optional<PullReplicationApiRequestMetrics> apiRequestMetrics);
}
private final GitRepositoryManager gitManager;
@@ -94,6 +96,7 @@
private final FetchReplicationMetrics metrics;
private final AtomicBoolean canceledWhileRunning;
private final FetchFactory fetchFactory;
+ private final Optional<PullReplicationApiRequestMetrics> apiRequestMetrics;
@Inject
FetchOne(
@@ -106,7 +109,8 @@
FetchReplicationMetrics m,
FetchFactory fetchFactory,
@Assisted Project.NameKey d,
- @Assisted URIish u) {
+ @Assisted URIish u,
+ @Assisted Optional<PullReplicationApiRequestMetrics> apiRequestMetrics) {
gitManager = grm;
pool = s;
config = c.getRemoteConfig();
@@ -122,6 +126,7 @@
canceledWhileRunning = new AtomicBoolean(false);
this.fetchFactory = fetchFactory;
maxRetries = s.getMaxRetries();
+ this.apiRequestMetrics = apiRequestMetrics;
}
@Override
@@ -299,12 +304,17 @@
git = gitManager.openRepository(projectName);
runImpl();
long elapsed = NANOSECONDS.toMillis(context.stop());
+ Optional<Long> elapsedEnd2End =
+ apiRequestMetrics
+ .flatMap(metrics -> metrics.stop(config.getName()))
+ .map(NANOSECONDS::toMillis);
repLog.info(
- "Replication from {} completed in {}ms, {}ms delay, {} retries",
+ "Replication from {} completed in {}ms, {}ms delay, {} retries{}",
uri,
elapsed,
delay,
- retryCount);
+ retryCount,
+ elapsedEnd2End.map(el -> String.format(", E2E %dms", el)).orElse(""));
} catch (RepositoryNotFoundException e) {
stateLog.error(
"Cannot replicate " + projectName + "; Local repository error: " + e.getMessage(),
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java
index e952252..22bb073 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java
@@ -23,10 +23,12 @@
import com.google.gerrit.server.logging.PluginMetadata;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import java.util.concurrent.TimeUnit;
@Singleton
public class FetchReplicationMetrics {
private final Timer1<String> executionTime;
+ private final Timer1<String> end2EndExecutionTime;
private final Histogram1<String> executionDelay;
private final Histogram1<String> executionRetries;
@@ -34,11 +36,11 @@
FetchReplicationMetrics(@PluginName String pluginName, MetricMaker metricMaker) {
Field<String> SOURCE_FIELD =
Field.ofString(
- "source",
+ "pull_replication",
(metadataBuilder, fieldValue) ->
metadataBuilder
.pluginName(pluginName)
- .addPluginMetadata(PluginMetadata.create("source", fieldValue)))
+ .addPluginMetadata(PluginMetadata.create("pull_replication", fieldValue)))
.build();
executionTime =
@@ -49,6 +51,14 @@
.setUnit(Description.Units.MILLISECONDS),
SOURCE_FIELD);
+ end2EndExecutionTime =
+ metricMaker.newTimer(
+ "replication_end_2_end_latency",
+ new Description("Time spent end-2-end fetching from remote source.")
+ .setCumulative()
+ .setUnit(Description.Units.MILLISECONDS),
+ SOURCE_FIELD);
+
executionDelay =
metricMaker.newHistogram(
"replication_delay",
@@ -77,6 +87,26 @@
}
/**
+ * Start the end-to-end replication latency timer from a source.
+ *
+ * @param name the source name.
+ * @return the timer context.
+ */
+ public Timer1.Context<String> startEnd2End(String name) {
+ return end2EndExecutionTime.start(name);
+ }
+
+ /**
+ * Record the end-to-end replication latency timer from a source.
+ *
+ * @param name the source name.
+ * @param metricNanos the timer value in nanos
+ */
+ public void recordEnd2End(String name, long metricNanos) {
+ end2EndExecutionTime.record(name, metricNanos, TimeUnit.NANOSECONDS);
+ }
+
+ /**
* Record the replication delay and retry metrics for a source.
*
* @param name the source name.
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index 6393c08..431255c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -77,6 +77,7 @@
private ExcludedRefsFilter refsFilter;
private RevisionReader revisionReader;
private final ApplyObjectMetrics applyObjectMetrics;
+ private final FetchReplicationMetrics fetchMetrics;
@Inject
ReplicationQueue(
@@ -87,7 +88,8 @@
FetchApiClient.Factory fetchClientFactory,
ExcludedRefsFilter refsFilter,
RevisionReader revReader,
- ApplyObjectMetrics applyObjectMetrics) {
+ ApplyObjectMetrics applyObjectMetrics,
+ FetchReplicationMetrics fetchMetrics) {
workQueue = wq;
dispatcher = dis;
sources = rd;
@@ -97,6 +99,7 @@
this.refsFilter = refsFilter;
this.revisionReader = revReader;
this.applyObjectMetrics = applyObjectMetrics;
+ this.fetchMetrics = fetchMetrics;
}
@Override
@@ -349,7 +352,18 @@
try {
URIish uri = new URIish(apiUrl);
FetchApiClient fetchClient = fetchClientFactory.create(source);
- HttpResult result = fetchClient.callFetch(project, refName, uri);
+ repLog.info("Pull replication REST API fetch to {} for {}:{}", apiUrl, project, refName);
+ Context<String> timer = fetchMetrics.startEnd2End(source.getRemoteConfigName());
+ HttpResult result = fetchClient.callFetch(project, refName, uri, timer.getStartTime());
+ long elapsedMs = TimeUnit.NANOSECONDS.toMillis(timer.stop());
+ repLog.info(
+ "Pull replication REST API fetch to {} COMPLETED for {}:{}, HTTP Result:"
+ + " {} - time:{} ms",
+ apiUrl,
+ project,
+ refName,
+ result,
+ elapsedMs);
if (isProjectMissing(result, project) && source.isCreateMissingRepositories()) {
result = initProject(project, uri, fetchClient, result);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index ddefa47..4ae3f7d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -61,6 +61,7 @@
import com.google.inject.servlet.RequestScoped;
import com.googlesource.gerrit.plugins.replication.RemoteSiteUser;
import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
import com.googlesource.gerrit.plugins.replication.pull.fetch.BatchFetchClient;
import com.googlesource.gerrit.plugins.replication.pull.fetch.CGitFetch;
import com.googlesource.gerrit.plugins.replication.pull.fetch.CGitFetchValidator;
@@ -392,9 +393,10 @@
Project.NameKey project,
String ref,
ReplicationState state,
- ReplicationType replicationType) {
+ ReplicationType replicationType,
+ Optional<PullReplicationApiRequestMetrics> apiRequestMetrics) {
URIish uri = getURI(project);
- return schedule(project, ref, uri, state, replicationType);
+ return schedule(project, ref, uri, state, replicationType, apiRequestMetrics);
}
public Future<?> schedule(
@@ -402,7 +404,8 @@
String ref,
URIish uri,
ReplicationState state,
- ReplicationType replicationType) {
+ ReplicationType replicationType,
+ Optional<PullReplicationApiRequestMetrics> apiRequestMetrics) {
repLog.info("scheduling replication {}:{} => {}", uri, ref, project);
if (!shouldReplicate(project, ref, state)) {
@@ -438,7 +441,7 @@
FetchOne e = pending.get(uri);
Future<?> f = CompletableFuture.completedFuture(null);
if (e == null) {
- e = opFactory.create(project, uri);
+ e = opFactory.create(project, uri, apiRequestMetrics);
addRef(e, ref);
e.addState(ref, state);
pending.put(uri, e);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
index 2767f1d..48964db 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
@@ -101,7 +101,10 @@
@SuppressWarnings("unchecked")
WorkQueue.Task<Void> task =
(WorkQueue.Task<Void>)
- workQueue.getDefaultQueue().submit(new FetchJob(command, project, input));
+ workQueue
+ .getDefaultQueue()
+ .submit(
+ new FetchJob(command, project, input, PullReplicationApiRequestMetrics.get()));
Optional<String> url =
urlFormatter
.get()
@@ -117,17 +120,23 @@
private FetchCommand command;
private Project.NameKey project;
private FetchAction.Input input;
+ private final PullReplicationApiRequestMetrics apiRequestMetrics;
- public FetchJob(FetchCommand command, Project.NameKey project, FetchAction.Input input) {
+ public FetchJob(
+ FetchCommand command,
+ Project.NameKey project,
+ FetchAction.Input input,
+ PullReplicationApiRequestMetrics apiRequestMetrics) {
this.command = command;
this.project = project;
this.input = input;
+ this.apiRequestMetrics = apiRequestMetrics;
}
@Override
public void run() {
try {
- command.fetchAsync(project, input.label, input.refName);
+ command.fetchAsync(project, input.label, input.refName, apiRequestMetrics);
} catch (InterruptedException
| ExecutionException
| RemoteConfigurationMissingException
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
index e1ac9ac..3a502ef 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
@@ -54,19 +54,28 @@
this.eventDispatcher = eventDispatcher;
}
- public void fetchAsync(Project.NameKey name, String label, String refName)
+ public void fetchAsync(
+ Project.NameKey name,
+ String label,
+ String refName,
+ PullReplicationApiRequestMetrics apiRequestMetrics)
throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
TimeoutException {
- fetch(name, label, refName, ASYNC);
+ fetch(name, label, refName, ASYNC, Optional.of(apiRequestMetrics));
}
public void fetchSync(Project.NameKey name, String label, String refName)
throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
TimeoutException {
- fetch(name, label, refName, SYNC);
+ fetch(name, label, refName, SYNC, Optional.empty());
}
- private void fetch(Project.NameKey name, String label, String refName, ReplicationType fetchType)
+ private void fetch(
+ Project.NameKey name,
+ String label,
+ String refName,
+ ReplicationType fetchType,
+ Optional<PullReplicationApiRequestMetrics> apiRequestMetrics)
throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
TimeoutException {
ReplicationState state =
@@ -82,7 +91,7 @@
try {
state.markAllFetchTasksScheduled();
- Future<?> future = source.get().schedule(name, refName, state, fetchType);
+ Future<?> future = source.get().schedule(name, refName, state, fetchType, apiRequestMetrics);
future.get(source.get().getTimeout(), TimeUnit.SECONDS);
} catch (ExecutionException
| IllegalStateException
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/HttpModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/HttpModule.java
index b2ef28d..b140cb4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/HttpModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/HttpModule.java
@@ -38,5 +38,9 @@
} else {
serveRegex("/init-project/.*$").with(ProjectInitializationAction.class);
}
+
+ DynamicSet.bind(binder(), AllRequestFilter.class)
+ .to(PullReplicationApiMetricsFilter.class)
+ .in(Scopes.SINGLETON);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiMetricsFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiMetricsFilter.java
new file mode 100644
index 0000000..3858db2
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiMetricsFilter.java
@@ -0,0 +1,53 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import com.google.gerrit.httpd.AllRequestFilter;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import javax.servlet.FilterChain;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+@Singleton
+public class PullReplicationApiMetricsFilter extends AllRequestFilter {
+ private final Provider<PullReplicationApiRequestMetrics> apiRequestMetrics;
+
+ @Inject
+ public PullReplicationApiMetricsFilter(
+ Provider<PullReplicationApiRequestMetrics> apiRequestMetrics) {
+ this.apiRequestMetrics = apiRequestMetrics;
+ }
+
+ @Override
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
+ throws IOException, ServletException {
+ if (!(request instanceof HttpServletRequest) || !(response instanceof HttpServletResponse)) {
+ chain.doFilter(request, response);
+ return;
+ }
+
+ PullReplicationApiRequestMetrics requestMetrics = apiRequestMetrics.get();
+ requestMetrics.start((HttpServletRequest) request);
+ PullReplicationApiRequestMetrics.set(requestMetrics);
+
+ chain.doFilter(request, response);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiRequestMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiRequestMetrics.java
new file mode 100644
index 0000000..8a97901
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiRequestMetrics.java
@@ -0,0 +1,68 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.pull.FetchReplicationMetrics;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.servlet.http.HttpServletRequest;
+
+public class PullReplicationApiRequestMetrics {
+ private static final ThreadLocal<PullReplicationApiRequestMetrics> localApiRequestMetrics =
+ new ThreadLocal<>();
+
+ public static final String HTTP_HEADER_X_START_TIME_NANOS = "X-StartTimeNanos";
+
+ private Optional<Long> startTimeNanos;
+ private final AtomicBoolean initialised = new AtomicBoolean();
+ private final FetchReplicationMetrics metrics;
+
+ public static PullReplicationApiRequestMetrics get() {
+ return localApiRequestMetrics.get();
+ }
+
+ public static void set(PullReplicationApiRequestMetrics metrics) {
+ localApiRequestMetrics.set(metrics);
+ }
+
+ @Inject
+ public PullReplicationApiRequestMetrics(FetchReplicationMetrics metrics) {
+ this.metrics = metrics;
+ }
+
+ public void start(HttpServletRequest req) {
+ if (!initialised.compareAndSet(false, true)) {
+ throw new IllegalStateException("PullReplicationApiRequestMetrics already initialised");
+ }
+
+ startTimeNanos =
+ Optional.ofNullable(req.getHeader(HTTP_HEADER_X_START_TIME_NANOS))
+ .map(Long::parseLong)
+ /* Adjust with System.nanoTime() for preventing negative execution times
+ * due to a clock skew between the client and the server timestamp.
+ */
+ .map(nanoTime -> Math.min(System.nanoTime(), nanoTime));
+ }
+
+ public Optional<Long> stop(String replicationSourceName) {
+ return startTimeNanos.map(
+ start -> {
+ long elapsed = System.nanoTime() - start;
+ metrics.recordEnd2End(replicationSourceName, elapsed);
+ return elapsed;
+ });
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
index 476a35b..01b6f1d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
@@ -27,9 +27,15 @@
FetchApiClient create(Source source);
}
- HttpResult callFetch(Project.NameKey project, String refName, URIish targetUri)
+ HttpResult callFetch(
+ Project.NameKey project, String refName, URIish targetUri, long startTimeNanos)
throws ClientProtocolException, IOException;
+ default HttpResult callFetch(Project.NameKey project, String refName, URIish targetUri)
+ throws ClientProtocolException, IOException {
+ return callFetch(project, refName, targetUri, System.nanoTime());
+ }
+
HttpResult initProject(Project.NameKey project, URIish uri) throws IOException;
HttpResult deleteProject(Project.NameKey project, URIish apiUri) throws IOException;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index f2f57cd..4db4ceb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -32,6 +32,7 @@
import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionInput;
import com.googlesource.gerrit.plugins.replication.pull.filter.SyncRefsFilter;
@@ -95,7 +96,8 @@
* @see com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient#callFetch(com.google.gerrit.entities.Project.NameKey, java.lang.String, org.eclipse.jgit.transport.URIish)
*/
@Override
- public HttpResult callFetch(Project.NameKey project, String refName, URIish targetUri)
+ public HttpResult callFetch(
+ Project.NameKey project, String refName, URIish targetUri, long startTimeNanos)
throws ClientProtocolException, IOException {
String url =
String.format(
@@ -110,6 +112,9 @@
instanceLabel, refName, callAsync),
StandardCharsets.UTF_8));
post.addHeader(new BasicHeader("Content-Type", "application/json"));
+ post.addHeader(
+ PullReplicationApiRequestMetrics.HTTP_HEADER_X_START_TIME_NANOS,
+ Long.toString(startTimeNanos));
return httpClientFactory.create(source).execute(post, this, getContext(targetUri));
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
new file mode 100644
index 0000000..58be58e
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
@@ -0,0 +1,43 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+ name = "pull-replication",
+ sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+ httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class PullReplicationAsyncIT extends PullReplicationIT {
+ @Inject private SitePaths sitePaths;
+
+ @Override
+ public void setUpTestPlugin() throws Exception {
+ FileBasedConfig config =
+ new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+ config.setString("replication", null, "syncRefs", "^$");
+ config.save();
+
+ super.setUpTestPlugin(true);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
index c6d32bf..7fcff10 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
@@ -50,6 +50,7 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
+import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
@@ -78,11 +79,12 @@
@UseLocalDisk
@TestPlugin(
name = "pull-replication",
- sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule")
+ sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+ httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
public class PullReplicationIT extends LightweightPluginDaemonTest {
private static final Optional<String> ALL_PROJECTS = Optional.empty();
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private static final int TEST_REPLICATION_DELAY = 60;
+ private static final int TEST_REPLICATION_DELAY = 1;
private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2000);
private static final String TEST_REPLICATION_SUFFIX = "suffix1";
private static final String TEST_REPLICATION_REMOTE = "remote1";
@@ -96,10 +98,17 @@
@Override
public void setUpTestPlugin() throws Exception {
+ setUpTestPlugin(false);
+ }
+
+ protected void setUpTestPlugin(boolean loadExisting) throws Exception {
gitPath = sitePaths.site_path.resolve("git");
- config =
- new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+ File configFile = sitePaths.etc_dir.resolve("replication.config").toFile();
+ config = new FileBasedConfig(configFile, FS.DETECTED);
+ if (loadExisting && configFile.exists()) {
+ config.load();
+ }
setReplicationSource(
TEST_REPLICATION_REMOTE,
TEST_REPLICATION_SUFFIX,
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
index 08d7e5a..56b6ad1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -19,6 +19,7 @@
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -81,6 +82,7 @@
@Mock RevisionData revisionData;
@Mock HttpResult httpResult;
ApplyObjectMetrics applyObjectMetrics;
+ FetchReplicationMetrics fetchMetrics;
@Captor ArgumentCaptor<String> stringCaptor;
@Captor ArgumentCaptor<Project.NameKey> projectNameKeyCaptor;
@@ -108,15 +110,24 @@
when(fetchClientFactory.create(any())).thenReturn(fetchRestApiClient);
when(fetchRestApiClient.callSendObject(any(), anyString(), anyBoolean(), any(), any()))
.thenReturn(httpResult);
- when(fetchRestApiClient.callFetch(any(), anyString(), any())).thenReturn(httpResult);
+ when(fetchRestApiClient.callFetch(any(), anyString(), any(), anyLong())).thenReturn(httpResult);
when(httpResult.isSuccessful()).thenReturn(true);
when(httpResult.isProjectMissing(any())).thenReturn(false);
applyObjectMetrics = new ApplyObjectMetrics("pull-replication", new DisabledMetricMaker());
+ fetchMetrics = new FetchReplicationMetrics("pull-replication", new DisabledMetricMaker());
objectUnderTest =
new ReplicationQueue(
- wq, rd, dis, sl, fetchClientFactory, refsFilter, revReader, applyObjectMetrics);
+ wq,
+ rd,
+ dis,
+ sl,
+ fetchClientFactory,
+ refsFilter,
+ revReader,
+ applyObjectMetrics,
+ fetchMetrics);
}
@Test
@@ -173,7 +184,7 @@
objectUnderTest.onGitReferenceUpdated(event);
- verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+ verify(fetchRestApiClient).callFetch(any(), anyString(), any(), anyLong());
}
@Test
@@ -186,7 +197,7 @@
objectUnderTest.onGitReferenceUpdated(event);
- verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+ verify(fetchRestApiClient).callFetch(any(), anyString(), any(), anyLong());
}
@Test
@@ -202,7 +213,7 @@
objectUnderTest.onGitReferenceUpdated(event);
- verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+ verify(fetchRestApiClient).callFetch(any(), anyString(), any(), anyLong());
}
@Test
@@ -248,7 +259,15 @@
objectUnderTest =
new ReplicationQueue(
- wq, rd, dis, sl, fetchClientFactory, refsFilter, revReader, applyObjectMetrics);
+ wq,
+ rd,
+ dis,
+ sl,
+ fetchClientFactory,
+ refsFilter,
+ revReader,
+ applyObjectMetrics,
+ fetchMetrics);
Event event = new TestEvent("refs/multi-site/version");
objectUnderTest.onGitReferenceUpdated(event);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
index e1ad565..9af2d10 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
@@ -35,6 +35,7 @@
import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
import com.googlesource.gerrit.plugins.replication.pull.api.exception.RemoteConfigurationMissingException;
import java.net.URISyntaxException;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -56,6 +57,7 @@
@Mock Source source;
@Mock SourcesCollection sources;
@Mock DynamicItem<EventDispatcher> eventDispatcher;
+ @Mock PullReplicationApiRequestMetrics apiRequestMetrics;
@SuppressWarnings("rawtypes")
@Mock
@@ -76,7 +78,7 @@
when(fetchReplicationStateFactory.create(any())).thenReturn(state);
when(source.getRemoteConfigName()).thenReturn(label);
when(sources.getAll()).thenReturn(Lists.newArrayList(source));
- when(source.schedule(eq(projectName), eq(REF_NAME_TO_FETCH), eq(state), any()))
+ when(source.schedule(eq(projectName), eq(REF_NAME_TO_FETCH), eq(state), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
objectUnderTest =
new FetchCommand(fetchReplicationStateFactory, fetchStateLog, sources, eventDispatcher);
@@ -88,16 +90,18 @@
TimeoutException {
objectUnderTest.fetchSync(projectName, label, REF_NAME_TO_FETCH);
- verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, SYNC);
+ verify(source, times(1))
+ .schedule(projectName, REF_NAME_TO_FETCH, state, SYNC, Optional.empty());
}
@Test
public void shouldScheduleRefFetchWithDelay()
throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
TimeoutException {
- objectUnderTest.fetchAsync(projectName, label, REF_NAME_TO_FETCH);
+ objectUnderTest.fetchAsync(projectName, label, REF_NAME_TO_FETCH, apiRequestMetrics);
- verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, ASYNC);
+ verify(source, times(1))
+ .schedule(projectName, REF_NAME_TO_FETCH, state, ASYNC, Optional.of(apiRequestMetrics));
}
@Test
@@ -106,7 +110,8 @@
TimeoutException {
objectUnderTest.fetchSync(projectName, label, REF_NAME_TO_FETCH);
- verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, SYNC);
+ verify(source, times(1))
+ .schedule(projectName, REF_NAME_TO_FETCH, state, SYNC, Optional.empty());
verify(state, times(1)).markAllFetchTasksScheduled();
}
@@ -123,7 +128,8 @@
public void shouldUpdateStateWhenInterruptedException()
throws InterruptedException, ExecutionException, TimeoutException {
when(future.get(anyLong(), eq(TimeUnit.SECONDS))).thenThrow(new InterruptedException());
- when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC)).thenReturn(future);
+ when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC, Optional.empty()))
+ .thenReturn(future);
InterruptedException e =
assertThrows(
@@ -138,7 +144,8 @@
throws InterruptedException, ExecutionException, TimeoutException {
when(future.get(anyLong(), eq(TimeUnit.SECONDS)))
.thenThrow(new ExecutionException(new Exception()));
- when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC)).thenReturn(future);
+ when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC, Optional.empty()))
+ .thenReturn(future);
ExecutionException e =
assertThrows(
@@ -152,7 +159,8 @@
public void shouldUpdateStateWhenTimeoutException()
throws InterruptedException, ExecutionException, TimeoutException {
when(future.get(anyLong(), eq(TimeUnit.SECONDS))).thenThrow(new TimeoutException());
- when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC)).thenReturn(future);
+ when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC, Optional.empty()))
+ .thenReturn(future);
TimeoutException e =
assertThrows(