Introduce E2E fetch REST-API metrics

The E2E replication through the fetch REST-API
needs to be calculated from the client originating the call
where the initial ref-update happens.

Introduce a new end_2_end metric to track this latency.

When the pull replication is executed asynchronously,
the client originating the call propagates the timestamp
in an HTTP header (X-StartTimeNanos) and propagated
to the receiver side, which will use the initial time for
recording the final E2E replication time at the receiver
side.

Also add the pull-replication async integration tests
that were completely missing before: testing with async
replication is key for demonstrating that the E2E metrics
are working as expected.
Consequently, the TEST_REPLICATION_DELAY is reduced to
1 second for preventing the whole test suite to fail
for timeout.

TODO: Additional assertion on E2E metrics in sync and async
modes.

Change-Id: Ieb29e3475dc0938fe7700007b7d05a630c3c927f
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
index bcd86d7..42310ff 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
@@ -21,6 +21,7 @@
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import java.util.Optional;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.eclipse.jgit.transport.URIish;
@@ -91,7 +92,7 @@
     for (Source cfg : sources.getAll()) {
       if (cfg.wouldFetchProject(project)) {
         for (URIish uri : cfg.getURIs(project, urlMatch)) {
-          cfg.schedule(project, FetchOne.ALL_REFS, uri, state, replicationType);
+          cfg.schedule(project, FetchOne.ALL_REFS, uri, state, replicationType, Optional.empty());
         }
       }
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index cee52b5..cdf573f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -32,6 +32,7 @@
 import com.google.gerrit.server.util.IdGenerator;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.Fetch;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchFactory;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
@@ -69,7 +70,8 @@
   static final String ID_KEY = "fetchOneId";
 
   interface Factory {
-    FetchOne create(Project.NameKey d, URIish u);
+    FetchOne create(
+        Project.NameKey d, URIish u, Optional<PullReplicationApiRequestMetrics> apiRequestMetrics);
   }
 
   private final GitRepositoryManager gitManager;
@@ -94,6 +96,7 @@
   private final FetchReplicationMetrics metrics;
   private final AtomicBoolean canceledWhileRunning;
   private final FetchFactory fetchFactory;
+  private final Optional<PullReplicationApiRequestMetrics> apiRequestMetrics;
 
   @Inject
   FetchOne(
@@ -106,7 +109,8 @@
       FetchReplicationMetrics m,
       FetchFactory fetchFactory,
       @Assisted Project.NameKey d,
-      @Assisted URIish u) {
+      @Assisted URIish u,
+      @Assisted Optional<PullReplicationApiRequestMetrics> apiRequestMetrics) {
     gitManager = grm;
     pool = s;
     config = c.getRemoteConfig();
@@ -122,6 +126,7 @@
     canceledWhileRunning = new AtomicBoolean(false);
     this.fetchFactory = fetchFactory;
     maxRetries = s.getMaxRetries();
+    this.apiRequestMetrics = apiRequestMetrics;
   }
 
   @Override
@@ -299,12 +304,17 @@
       git = gitManager.openRepository(projectName);
       runImpl();
       long elapsed = NANOSECONDS.toMillis(context.stop());
+      Optional<Long> elapsedEnd2End =
+          apiRequestMetrics
+              .flatMap(metrics -> metrics.stop(config.getName()))
+              .map(NANOSECONDS::toMillis);
       repLog.info(
-          "Replication from {} completed in {}ms, {}ms delay, {} retries",
+          "Replication from {} completed in {}ms, {}ms delay, {} retries{}",
           uri,
           elapsed,
           delay,
-          retryCount);
+          retryCount,
+          elapsedEnd2End.map(el -> String.format(", E2E %dms", el)).orElse(""));
     } catch (RepositoryNotFoundException e) {
       stateLog.error(
           "Cannot replicate " + projectName + "; Local repository error: " + e.getMessage(),
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java
index e952252..22bb073 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java
@@ -23,10 +23,12 @@
 import com.google.gerrit.server.logging.PluginMetadata;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import java.util.concurrent.TimeUnit;
 
 @Singleton
 public class FetchReplicationMetrics {
   private final Timer1<String> executionTime;
+  private final Timer1<String> end2EndExecutionTime;
   private final Histogram1<String> executionDelay;
   private final Histogram1<String> executionRetries;
 
@@ -34,11 +36,11 @@
   FetchReplicationMetrics(@PluginName String pluginName, MetricMaker metricMaker) {
     Field<String> SOURCE_FIELD =
         Field.ofString(
-                "source",
+                "pull_replication",
                 (metadataBuilder, fieldValue) ->
                     metadataBuilder
                         .pluginName(pluginName)
-                        .addPluginMetadata(PluginMetadata.create("source", fieldValue)))
+                        .addPluginMetadata(PluginMetadata.create("pull_replication", fieldValue)))
             .build();
 
     executionTime =
@@ -49,6 +51,14 @@
                 .setUnit(Description.Units.MILLISECONDS),
             SOURCE_FIELD);
 
+    end2EndExecutionTime =
+        metricMaker.newTimer(
+            "replication_end_2_end_latency",
+            new Description("Time spent end-2-end fetching from remote source.")
+                .setCumulative()
+                .setUnit(Description.Units.MILLISECONDS),
+            SOURCE_FIELD);
+
     executionDelay =
         metricMaker.newHistogram(
             "replication_delay",
@@ -77,6 +87,26 @@
   }
 
   /**
+   * Start the end-to-end replication latency timer from a source.
+   *
+   * @param name the source name.
+   * @return the timer context.
+   */
+  public Timer1.Context<String> startEnd2End(String name) {
+    return end2EndExecutionTime.start(name);
+  }
+
+  /**
+   * Record the end-to-end replication latency timer from a source.
+   *
+   * @param name the source name.
+   * @param metricNanos the timer value in nanos
+   */
+  public void recordEnd2End(String name, long metricNanos) {
+    end2EndExecutionTime.record(name, metricNanos, TimeUnit.NANOSECONDS);
+  }
+
+  /**
    * Record the replication delay and retry metrics for a source.
    *
    * @param name the source name.
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index 6393c08..431255c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -77,6 +77,7 @@
   private ExcludedRefsFilter refsFilter;
   private RevisionReader revisionReader;
   private final ApplyObjectMetrics applyObjectMetrics;
+  private final FetchReplicationMetrics fetchMetrics;
 
   @Inject
   ReplicationQueue(
@@ -87,7 +88,8 @@
       FetchApiClient.Factory fetchClientFactory,
       ExcludedRefsFilter refsFilter,
       RevisionReader revReader,
-      ApplyObjectMetrics applyObjectMetrics) {
+      ApplyObjectMetrics applyObjectMetrics,
+      FetchReplicationMetrics fetchMetrics) {
     workQueue = wq;
     dispatcher = dis;
     sources = rd;
@@ -97,6 +99,7 @@
     this.refsFilter = refsFilter;
     this.revisionReader = revReader;
     this.applyObjectMetrics = applyObjectMetrics;
+    this.fetchMetrics = fetchMetrics;
   }
 
   @Override
@@ -349,7 +352,18 @@
         try {
           URIish uri = new URIish(apiUrl);
           FetchApiClient fetchClient = fetchClientFactory.create(source);
-          HttpResult result = fetchClient.callFetch(project, refName, uri);
+          repLog.info("Pull replication REST API fetch to {} for {}:{}", apiUrl, project, refName);
+          Context<String> timer = fetchMetrics.startEnd2End(source.getRemoteConfigName());
+          HttpResult result = fetchClient.callFetch(project, refName, uri, timer.getStartTime());
+          long elapsedMs = TimeUnit.NANOSECONDS.toMillis(timer.stop());
+          repLog.info(
+              "Pull replication REST API fetch to {} COMPLETED for {}:{}, HTTP Result:"
+                  + " {} - time:{} ms",
+              apiUrl,
+              project,
+              refName,
+              result,
+              elapsedMs);
           if (isProjectMissing(result, project) && source.isCreateMissingRepositories()) {
             result = initProject(project, uri, fetchClient, result);
           }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index ddefa47..4ae3f7d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -61,6 +61,7 @@
 import com.google.inject.servlet.RequestScoped;
 import com.googlesource.gerrit.plugins.replication.RemoteSiteUser;
 import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.BatchFetchClient;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.CGitFetch;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.CGitFetchValidator;
@@ -392,9 +393,10 @@
       Project.NameKey project,
       String ref,
       ReplicationState state,
-      ReplicationType replicationType) {
+      ReplicationType replicationType,
+      Optional<PullReplicationApiRequestMetrics> apiRequestMetrics) {
     URIish uri = getURI(project);
-    return schedule(project, ref, uri, state, replicationType);
+    return schedule(project, ref, uri, state, replicationType, apiRequestMetrics);
   }
 
   public Future<?> schedule(
@@ -402,7 +404,8 @@
       String ref,
       URIish uri,
       ReplicationState state,
-      ReplicationType replicationType) {
+      ReplicationType replicationType,
+      Optional<PullReplicationApiRequestMetrics> apiRequestMetrics) {
 
     repLog.info("scheduling replication {}:{} => {}", uri, ref, project);
     if (!shouldReplicate(project, ref, state)) {
@@ -438,7 +441,7 @@
       FetchOne e = pending.get(uri);
       Future<?> f = CompletableFuture.completedFuture(null);
       if (e == null) {
-        e = opFactory.create(project, uri);
+        e = opFactory.create(project, uri, apiRequestMetrics);
         addRef(e, ref);
         e.addState(ref, state);
         pending.put(uri, e);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
index 2767f1d..48964db 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
@@ -101,7 +101,10 @@
     @SuppressWarnings("unchecked")
     WorkQueue.Task<Void> task =
         (WorkQueue.Task<Void>)
-            workQueue.getDefaultQueue().submit(new FetchJob(command, project, input));
+            workQueue
+                .getDefaultQueue()
+                .submit(
+                    new FetchJob(command, project, input, PullReplicationApiRequestMetrics.get()));
     Optional<String> url =
         urlFormatter
             .get()
@@ -117,17 +120,23 @@
     private FetchCommand command;
     private Project.NameKey project;
     private FetchAction.Input input;
+    private final PullReplicationApiRequestMetrics apiRequestMetrics;
 
-    public FetchJob(FetchCommand command, Project.NameKey project, FetchAction.Input input) {
+    public FetchJob(
+        FetchCommand command,
+        Project.NameKey project,
+        FetchAction.Input input,
+        PullReplicationApiRequestMetrics apiRequestMetrics) {
       this.command = command;
       this.project = project;
       this.input = input;
+      this.apiRequestMetrics = apiRequestMetrics;
     }
 
     @Override
     public void run() {
       try {
-        command.fetchAsync(project, input.label, input.refName);
+        command.fetchAsync(project, input.label, input.refName, apiRequestMetrics);
       } catch (InterruptedException
           | ExecutionException
           | RemoteConfigurationMissingException
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
index e1ac9ac..3a502ef 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
@@ -54,19 +54,28 @@
     this.eventDispatcher = eventDispatcher;
   }
 
-  public void fetchAsync(Project.NameKey name, String label, String refName)
+  public void fetchAsync(
+      Project.NameKey name,
+      String label,
+      String refName,
+      PullReplicationApiRequestMetrics apiRequestMetrics)
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
           TimeoutException {
-    fetch(name, label, refName, ASYNC);
+    fetch(name, label, refName, ASYNC, Optional.of(apiRequestMetrics));
   }
 
   public void fetchSync(Project.NameKey name, String label, String refName)
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
           TimeoutException {
-    fetch(name, label, refName, SYNC);
+    fetch(name, label, refName, SYNC, Optional.empty());
   }
 
-  private void fetch(Project.NameKey name, String label, String refName, ReplicationType fetchType)
+  private void fetch(
+      Project.NameKey name,
+      String label,
+      String refName,
+      ReplicationType fetchType,
+      Optional<PullReplicationApiRequestMetrics> apiRequestMetrics)
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
           TimeoutException {
     ReplicationState state =
@@ -82,7 +91,7 @@
 
     try {
       state.markAllFetchTasksScheduled();
-      Future<?> future = source.get().schedule(name, refName, state, fetchType);
+      Future<?> future = source.get().schedule(name, refName, state, fetchType, apiRequestMetrics);
       future.get(source.get().getTimeout(), TimeUnit.SECONDS);
     } catch (ExecutionException
         | IllegalStateException
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/HttpModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/HttpModule.java
index b2ef28d..b140cb4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/HttpModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/HttpModule.java
@@ -38,5 +38,9 @@
     } else {
       serveRegex("/init-project/.*$").with(ProjectInitializationAction.class);
     }
+
+    DynamicSet.bind(binder(), AllRequestFilter.class)
+        .to(PullReplicationApiMetricsFilter.class)
+        .in(Scopes.SINGLETON);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiMetricsFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiMetricsFilter.java
new file mode 100644
index 0000000..3858db2
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiMetricsFilter.java
@@ -0,0 +1,53 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import com.google.gerrit.httpd.AllRequestFilter;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import javax.servlet.FilterChain;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+@Singleton
+public class PullReplicationApiMetricsFilter extends AllRequestFilter {
+  private final Provider<PullReplicationApiRequestMetrics> apiRequestMetrics;
+
+  @Inject
+  public PullReplicationApiMetricsFilter(
+      Provider<PullReplicationApiRequestMetrics> apiRequestMetrics) {
+    this.apiRequestMetrics = apiRequestMetrics;
+  }
+
+  @Override
+  public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
+      throws IOException, ServletException {
+    if (!(request instanceof HttpServletRequest) || !(response instanceof HttpServletResponse)) {
+      chain.doFilter(request, response);
+      return;
+    }
+
+    PullReplicationApiRequestMetrics requestMetrics = apiRequestMetrics.get();
+    requestMetrics.start((HttpServletRequest) request);
+    PullReplicationApiRequestMetrics.set(requestMetrics);
+
+    chain.doFilter(request, response);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiRequestMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiRequestMetrics.java
new file mode 100644
index 0000000..8a97901
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiRequestMetrics.java
@@ -0,0 +1,68 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.pull.FetchReplicationMetrics;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.servlet.http.HttpServletRequest;
+
+public class PullReplicationApiRequestMetrics {
+  private static final ThreadLocal<PullReplicationApiRequestMetrics> localApiRequestMetrics =
+      new ThreadLocal<>();
+
+  public static final String HTTP_HEADER_X_START_TIME_NANOS = "X-StartTimeNanos";
+
+  private Optional<Long> startTimeNanos;
+  private final AtomicBoolean initialised = new AtomicBoolean();
+  private final FetchReplicationMetrics metrics;
+
+  public static PullReplicationApiRequestMetrics get() {
+    return localApiRequestMetrics.get();
+  }
+
+  public static void set(PullReplicationApiRequestMetrics metrics) {
+    localApiRequestMetrics.set(metrics);
+  }
+
+  @Inject
+  public PullReplicationApiRequestMetrics(FetchReplicationMetrics metrics) {
+    this.metrics = metrics;
+  }
+
+  public void start(HttpServletRequest req) {
+    if (!initialised.compareAndSet(false, true)) {
+      throw new IllegalStateException("PullReplicationApiRequestMetrics already initialised");
+    }
+
+    startTimeNanos =
+        Optional.ofNullable(req.getHeader(HTTP_HEADER_X_START_TIME_NANOS))
+            .map(Long::parseLong)
+            /* Adjust with System.nanoTime() for preventing negative execution times
+             * due to a clock skew between the client and the server timestamp.
+             */
+            .map(nanoTime -> Math.min(System.nanoTime(), nanoTime));
+  }
+
+  public Optional<Long> stop(String replicationSourceName) {
+    return startTimeNanos.map(
+        start -> {
+          long elapsed = System.nanoTime() - start;
+          metrics.recordEnd2End(replicationSourceName, elapsed);
+          return elapsed;
+        });
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
index 476a35b..01b6f1d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
@@ -27,9 +27,15 @@
     FetchApiClient create(Source source);
   }
 
-  HttpResult callFetch(Project.NameKey project, String refName, URIish targetUri)
+  HttpResult callFetch(
+      Project.NameKey project, String refName, URIish targetUri, long startTimeNanos)
       throws ClientProtocolException, IOException;
 
+  default HttpResult callFetch(Project.NameKey project, String refName, URIish targetUri)
+      throws ClientProtocolException, IOException {
+    return callFetch(project, refName, targetUri, System.nanoTime());
+  }
+
   HttpResult initProject(Project.NameKey project, URIish uri) throws IOException;
 
   HttpResult deleteProject(Project.NameKey project, URIish apiUri) throws IOException;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index f2f57cd..4db4ceb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -32,6 +32,7 @@
 import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionInput;
 import com.googlesource.gerrit.plugins.replication.pull.filter.SyncRefsFilter;
@@ -95,7 +96,8 @@
    * @see com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient#callFetch(com.google.gerrit.entities.Project.NameKey, java.lang.String, org.eclipse.jgit.transport.URIish)
    */
   @Override
-  public HttpResult callFetch(Project.NameKey project, String refName, URIish targetUri)
+  public HttpResult callFetch(
+      Project.NameKey project, String refName, URIish targetUri, long startTimeNanos)
       throws ClientProtocolException, IOException {
     String url =
         String.format(
@@ -110,6 +112,9 @@
                 instanceLabel, refName, callAsync),
             StandardCharsets.UTF_8));
     post.addHeader(new BasicHeader("Content-Type", "application/json"));
+    post.addHeader(
+        PullReplicationApiRequestMetrics.HTTP_HEADER_X_START_TIME_NANOS,
+        Long.toString(startTimeNanos));
     return httpClientFactory.create(source).execute(post, this, getContext(targetUri));
   }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
new file mode 100644
index 0000000..58be58e
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
@@ -0,0 +1,43 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class PullReplicationAsyncIT extends PullReplicationIT {
+  @Inject private SitePaths sitePaths;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    FileBasedConfig config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    config.setString("replication", null, "syncRefs", "^$");
+    config.save();
+
+    super.setUpTestPlugin(true);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
index c6d32bf..7fcff10 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
@@ -50,6 +50,7 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.time.Duration;
@@ -78,11 +79,12 @@
 @UseLocalDisk
 @TestPlugin(
     name = "pull-replication",
-    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule")
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
 public class PullReplicationIT extends LightweightPluginDaemonTest {
   private static final Optional<String> ALL_PROJECTS = Optional.empty();
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-  private static final int TEST_REPLICATION_DELAY = 60;
+  private static final int TEST_REPLICATION_DELAY = 1;
   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2000);
   private static final String TEST_REPLICATION_SUFFIX = "suffix1";
   private static final String TEST_REPLICATION_REMOTE = "remote1";
@@ -96,10 +98,17 @@
 
   @Override
   public void setUpTestPlugin() throws Exception {
+    setUpTestPlugin(false);
+  }
+
+  protected void setUpTestPlugin(boolean loadExisting) throws Exception {
     gitPath = sitePaths.site_path.resolve("git");
 
-    config =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    File configFile = sitePaths.etc_dir.resolve("replication.config").toFile();
+    config = new FileBasedConfig(configFile, FS.DETECTED);
+    if (loadExisting && configFile.exists()) {
+      config.load();
+    }
     setReplicationSource(
         TEST_REPLICATION_REMOTE,
         TEST_REPLICATION_SUFFIX,
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
index 08d7e5a..56b6ad1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -19,6 +19,7 @@
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -81,6 +82,7 @@
   @Mock RevisionData revisionData;
   @Mock HttpResult httpResult;
   ApplyObjectMetrics applyObjectMetrics;
+  FetchReplicationMetrics fetchMetrics;
 
   @Captor ArgumentCaptor<String> stringCaptor;
   @Captor ArgumentCaptor<Project.NameKey> projectNameKeyCaptor;
@@ -108,15 +110,24 @@
     when(fetchClientFactory.create(any())).thenReturn(fetchRestApiClient);
     when(fetchRestApiClient.callSendObject(any(), anyString(), anyBoolean(), any(), any()))
         .thenReturn(httpResult);
-    when(fetchRestApiClient.callFetch(any(), anyString(), any())).thenReturn(httpResult);
+    when(fetchRestApiClient.callFetch(any(), anyString(), any(), anyLong())).thenReturn(httpResult);
     when(httpResult.isSuccessful()).thenReturn(true);
     when(httpResult.isProjectMissing(any())).thenReturn(false);
 
     applyObjectMetrics = new ApplyObjectMetrics("pull-replication", new DisabledMetricMaker());
+    fetchMetrics = new FetchReplicationMetrics("pull-replication", new DisabledMetricMaker());
 
     objectUnderTest =
         new ReplicationQueue(
-            wq, rd, dis, sl, fetchClientFactory, refsFilter, revReader, applyObjectMetrics);
+            wq,
+            rd,
+            dis,
+            sl,
+            fetchClientFactory,
+            refsFilter,
+            revReader,
+            applyObjectMetrics,
+            fetchMetrics);
   }
 
   @Test
@@ -173,7 +184,7 @@
 
     objectUnderTest.onGitReferenceUpdated(event);
 
-    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+    verify(fetchRestApiClient).callFetch(any(), anyString(), any(), anyLong());
   }
 
   @Test
@@ -186,7 +197,7 @@
 
     objectUnderTest.onGitReferenceUpdated(event);
 
-    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+    verify(fetchRestApiClient).callFetch(any(), anyString(), any(), anyLong());
   }
 
   @Test
@@ -202,7 +213,7 @@
 
     objectUnderTest.onGitReferenceUpdated(event);
 
-    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+    verify(fetchRestApiClient).callFetch(any(), anyString(), any(), anyLong());
   }
 
   @Test
@@ -248,7 +259,15 @@
 
     objectUnderTest =
         new ReplicationQueue(
-            wq, rd, dis, sl, fetchClientFactory, refsFilter, revReader, applyObjectMetrics);
+            wq,
+            rd,
+            dis,
+            sl,
+            fetchClientFactory,
+            refsFilter,
+            revReader,
+            applyObjectMetrics,
+            fetchMetrics);
     Event event = new TestEvent("refs/multi-site/version");
     objectUnderTest.onGitReferenceUpdated(event);
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
index e1ad565..9af2d10 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
@@ -35,6 +35,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.RemoteConfigurationMissingException;
 import java.net.URISyntaxException;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -56,6 +57,7 @@
   @Mock Source source;
   @Mock SourcesCollection sources;
   @Mock DynamicItem<EventDispatcher> eventDispatcher;
+  @Mock PullReplicationApiRequestMetrics apiRequestMetrics;
 
   @SuppressWarnings("rawtypes")
   @Mock
@@ -76,7 +78,7 @@
     when(fetchReplicationStateFactory.create(any())).thenReturn(state);
     when(source.getRemoteConfigName()).thenReturn(label);
     when(sources.getAll()).thenReturn(Lists.newArrayList(source));
-    when(source.schedule(eq(projectName), eq(REF_NAME_TO_FETCH), eq(state), any()))
+    when(source.schedule(eq(projectName), eq(REF_NAME_TO_FETCH), eq(state), any(), any()))
         .thenReturn(CompletableFuture.completedFuture(null));
     objectUnderTest =
         new FetchCommand(fetchReplicationStateFactory, fetchStateLog, sources, eventDispatcher);
@@ -88,16 +90,18 @@
           TimeoutException {
     objectUnderTest.fetchSync(projectName, label, REF_NAME_TO_FETCH);
 
-    verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, SYNC);
+    verify(source, times(1))
+        .schedule(projectName, REF_NAME_TO_FETCH, state, SYNC, Optional.empty());
   }
 
   @Test
   public void shouldScheduleRefFetchWithDelay()
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
           TimeoutException {
-    objectUnderTest.fetchAsync(projectName, label, REF_NAME_TO_FETCH);
+    objectUnderTest.fetchAsync(projectName, label, REF_NAME_TO_FETCH, apiRequestMetrics);
 
-    verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, ASYNC);
+    verify(source, times(1))
+        .schedule(projectName, REF_NAME_TO_FETCH, state, ASYNC, Optional.of(apiRequestMetrics));
   }
 
   @Test
@@ -106,7 +110,8 @@
           TimeoutException {
     objectUnderTest.fetchSync(projectName, label, REF_NAME_TO_FETCH);
 
-    verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, SYNC);
+    verify(source, times(1))
+        .schedule(projectName, REF_NAME_TO_FETCH, state, SYNC, Optional.empty());
     verify(state, times(1)).markAllFetchTasksScheduled();
   }
 
@@ -123,7 +128,8 @@
   public void shouldUpdateStateWhenInterruptedException()
       throws InterruptedException, ExecutionException, TimeoutException {
     when(future.get(anyLong(), eq(TimeUnit.SECONDS))).thenThrow(new InterruptedException());
-    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC)).thenReturn(future);
+    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC, Optional.empty()))
+        .thenReturn(future);
 
     InterruptedException e =
         assertThrows(
@@ -138,7 +144,8 @@
       throws InterruptedException, ExecutionException, TimeoutException {
     when(future.get(anyLong(), eq(TimeUnit.SECONDS)))
         .thenThrow(new ExecutionException(new Exception()));
-    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC)).thenReturn(future);
+    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC, Optional.empty()))
+        .thenReturn(future);
 
     ExecutionException e =
         assertThrows(
@@ -152,7 +159,8 @@
   public void shouldUpdateStateWhenTimeoutException()
       throws InterruptedException, ExecutionException, TimeoutException {
     when(future.get(anyLong(), eq(TimeUnit.SECONDS))).thenThrow(new TimeoutException());
-    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC)).thenReturn(future);
+    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC, Optional.empty()))
+        .thenReturn(future);
 
     TimeoutException e =
         assertThrows(