Introduce replication queue metrics

The pull-replication queue has a lot of intermediate
states for the events fired and the replication tasks
allocated in the various queues.

Expose the internal status of the queue with fine-grained
counters that allow to understand what is the overall
status on every source.

The plugin reload mode needs to be different now because
it won't be possible anymore to have two plugins (old and new)
loaded at the same time at any point in time, otherwise
the metric names would conflict.

Change-Id: I42db2dfe8b8de9afc202f650ae42d164f9754c51
diff --git a/BUILD b/BUILD
index 0983983..da87d80 100644
--- a/BUILD
+++ b/BUILD
@@ -12,6 +12,7 @@
         "Gerrit-InitStep: com.googlesource.gerrit.plugins.replication.pull.InitPlugin",
         "Gerrit-SshModule: com.googlesource.gerrit.plugins.replication.pull.SshModule",
         "Gerrit-HttpModule: com.googlesource.gerrit.plugins.replication.pull.api.HttpModule",
+        "Gerrit-ReloadMode: restart",
     ],
     resources = glob(["src/main/resources/**/*"]),
     deps = [
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Completable.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Completable.java
new file mode 100644
index 0000000..fa3d53b
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Completable.java
@@ -0,0 +1,20 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+interface Completable {
+
+  boolean hasSucceeded();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
index 527b746..89a9067 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
@@ -28,7 +28,7 @@
 import java.net.URISyntaxException;
 import org.eclipse.jgit.transport.URIish;
 
-public class DeleteProjectTask implements Runnable {
+public class DeleteProjectTask implements Runnable, Completable {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   interface Factory {
@@ -40,6 +40,7 @@
   private final String uri;
   private final Project.NameKey project;
   private final FetchApiClient.Factory fetchClientFactory;
+  private boolean succeeded;
 
   @Inject
   DeleteProjectTask(
@@ -63,6 +64,7 @@
       if (!httpResult.isSuccessful()) {
         throw new IOException(httpResult.getMessage().orElse("Unknown"));
       }
+      succeeded = true;
       logger.atFine().log("Successfully deleted project %s on remote %s", project.get(), uri);
     } catch (URISyntaxException | IOException e) {
       String errorMessage =
@@ -76,4 +78,9 @@
   public String toString() {
     return String.format("[%s] delete-project %s at %s", HexFormat.fromInt(id), project.get(), uri);
   }
+
+  @Override
+  public boolean hasSucceeded() {
+    return succeeded;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index 42bea3e..3fe64e4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -66,7 +66,7 @@
  * <p>Instance members are protected by the lock within FetchQueue. Callers must take that lock to
  * ensure they are working with a current view of the object.
  */
-public class FetchOne implements ProjectRunnable, CanceledWhileRunning {
+public class FetchOne implements ProjectRunnable, CanceledWhileRunning, Completable {
   private final ReplicationStateListener stateLog;
   public static final String ALL_REFS = "..all..";
   static final String ID_KEY = "fetchOneId";
@@ -102,6 +102,7 @@
   private final FetchFactory fetchFactory;
   private final Optional<PullReplicationApiRequestMetrics> apiRequestMetrics;
   private DynamicItem<ReplicationFetchFilter> replicationFetchFilter;
+  private boolean succeeded;
 
   @Inject
   FetchOne(
@@ -491,6 +492,7 @@
         case FORCED:
         case RENAMED:
         case FAST_FORWARD:
+          succeeded = true;
           break;
         case NOT_ATTEMPTED:
         case REJECTED:
@@ -568,4 +570,9 @@
   public Optional<PullReplicationApiRequestMetrics> getRequestMetrics() {
     return apiRequestMetrics;
   }
+
+  @Override
+  public boolean hasSucceeded() {
+    return succeeded;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index d1c5ca1..63d6a95 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -23,6 +23,8 @@
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.metrics.dropwizard.DropWizardMetricMaker;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gerrit.server.events.EventTypes;
@@ -32,6 +34,7 @@
 import com.google.inject.Scopes;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.internal.UniqueAnnotations;
+import com.google.inject.name.Names;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
 import com.googlesource.gerrit.plugins.replication.AutoReloadSecureCredentialsFactoryDecorator;
 import com.googlesource.gerrit.plugins.replication.ConfigParser;
@@ -74,6 +77,9 @@
 
   @Override
   protected void configure() {
+    bind(MetricMaker.class)
+        .annotatedWith(Names.named(ReplicationQueueMetrics.REPLICATION_QUEUE_METRICS))
+        .to(metricMakerClass());
 
     install(new PullReplicationGroupModule());
     bind(BearerTokenProvider.class).in(Scopes.SINGLETON);
@@ -155,6 +161,10 @@
     EventTypes.register(FetchReplicationScheduledEvent.TYPE, FetchReplicationScheduledEvent.class);
   }
 
+  protected Class<? extends MetricMaker> metricMakerClass() {
+    return DropWizardMetricMaker.class;
+  }
+
   private FileBasedConfig getReplicationConfig() {
     File replicationConfigFile = cfgPath.toFile();
     FileBasedConfig config = new FileBasedConfig(replicationConfigFile, FS.DETECTED);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index 57612cb..6bafaa4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -32,6 +32,7 @@
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
+import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.replication.ObservableQueue;
 import com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.GitUpdateProcessing;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
@@ -65,6 +66,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@Singleton
 public class ReplicationQueue
     implements ObservableQueue,
         EventListener,
@@ -93,6 +95,7 @@
   private Provider<RevisionReader> revReaderProvider;
   private final ApplyObjectMetrics applyObjectMetrics;
   private final FetchReplicationMetrics fetchMetrics;
+  private final ReplicationQueueMetrics queueMetrics;
   private final String instanceId;
   private ApplyObjectsRefsFilter applyObjectsRefsFilter;
 
@@ -107,6 +110,7 @@
       Provider<RevisionReader> revReaderProvider,
       ApplyObjectMetrics applyObjectMetrics,
       FetchReplicationMetrics fetchMetrics,
+      ReplicationQueueMetrics queueMetrics,
       @GerritInstanceId String instanceId,
       ApplyObjectsRefsFilter applyObjectsRefsFilter,
       ShutdownState shutdownState) {
@@ -121,6 +125,7 @@
     this.revReaderProvider = revReaderProvider;
     this.applyObjectMetrics = applyObjectMetrics;
     this.fetchMetrics = fetchMetrics;
+    this.queueMetrics = queueMetrics;
     this.instanceId = instanceId;
     this.applyObjectsRefsFilter = applyObjectsRefsFilter;
   }
@@ -129,6 +134,7 @@
   public void start() {
     if (!running) {
       sources.get().startup(workQueue);
+      queueMetrics.start(this);
       fetchCallsTimeout =
           2
               * sources.get().getAll().stream()
@@ -149,6 +155,7 @@
     if (discarded > 0) {
       repLog.warn("Canceled {} replication events during shutdown", discarded);
     }
+    queueMetrics.stop();
   }
 
   @Override
@@ -232,6 +239,8 @@
           state);
       beforeStartupEventsQueue.add(
           ReferenceUpdatedEvent.create(project.get(), refName, objectId, eventCreatedOn, isDelete));
+
+      queueMetrics.incrementQueuedBeforStartup();
       return;
     }
     ForkJoinPool fetchCallsPool = null;
@@ -552,6 +561,10 @@
                     .forEach(apiUrl -> s.scheduleUpdateHead(apiUrl, p, event.getNewHeadName())));
   }
 
+  SourcesCollection sourcesCollection() {
+    return sources.get();
+  }
+
   @AutoValue
   abstract static class ReferenceUpdatedEvent {
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueMetrics.java
new file mode 100644
index 0000000..9b8134f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueMetrics.java
@@ -0,0 +1,247 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.extensions.registration.RegistrationHandle;
+import com.google.gerrit.metrics.CallbackMetric1;
+import com.google.gerrit.metrics.Counter0;
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.logging.PluginMetadata;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+@Singleton
+public class ReplicationQueueMetrics {
+  private static final String EVENTS = "events";
+  private static final String TASKS = "tasks";
+  public static final String REPLICATION_QUEUE_METRICS = "ReplicationQueueMetrics";
+
+  private final Counter1<String> tasksScheduled;
+  private final Counter1<String> tasksCancelled;
+  private final Counter1<String> tasksNotScheduled;
+  private final Counter1<String> tasksRescheduled;
+  private final Counter1<String> tasksCompleted;
+  private final Counter1<String> tasksMerged;
+  private final Counter1<String> tasksFailed;
+  private final Counter1<String> tasksRetrying;
+  private final Counter0 eventsQueuedBeforeStartup;
+  private final Counter1<String> tasksCancelledMaxRetries;
+  private final MetricMaker metricMaker;
+  private final Field<String> sourceField;
+  private final Counter1<String> tasksStarted;
+  private final Set<RegistrationHandle> metricsHandles;
+
+  @Inject
+  public ReplicationQueueMetrics(
+      @PluginName String pluginName, @Named(REPLICATION_QUEUE_METRICS) MetricMaker metricMaker) {
+    metricsHandles = new HashSet<>();
+
+    sourceField =
+        Field.ofString(
+                "source",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(PluginMetadata.create("source", fieldValue)))
+            .build();
+
+    eventsQueuedBeforeStartup =
+        registerMetric(
+            metricMaker.newCounter(
+                "events/queued_before_startup",
+                new Description("Replication events queued before startup")
+                    .setCumulative()
+                    .setUnit(EVENTS)));
+
+    tasksScheduled =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/scheduled",
+                new Description("Replication tasks scheduled").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksStarted =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/started",
+                new Description("Replication tasks started").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksRescheduled =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/rescheduled",
+                new Description("Replication tasks rescheduled").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksCancelled =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/cancelled",
+                new Description("Replication tasks cancelled").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksFailed =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/failed",
+                new Description("Replication tasks failed").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksRetrying =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/retrying",
+                new Description("Replication tasks retrying").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksNotScheduled =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/not_scheduled",
+                new Description("Replication tasks not scheduled").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksCompleted =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/completed",
+                new Description("Replication tasks completed").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksMerged =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/merged",
+                new Description("Replication tasks merged").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksCancelledMaxRetries =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/max_retries",
+                new Description("Replication tasks cancelled for maximum number of retries")
+                    .setCumulative()
+                    .setUnit(TASKS),
+                sourceField));
+
+    this.metricMaker = metricMaker;
+  }
+
+  private <T extends RegistrationHandle> T registerMetric(T metricHandle) {
+    metricsHandles.add(metricHandle);
+    return metricHandle;
+  }
+
+  void start(ReplicationQueue queue) {
+    initCallbackMetrics(
+        queue,
+        Source::inflightTasksCount,
+        "tasks/inflight",
+        "In-flight replication tasks per source");
+    initCallbackMetrics(
+        queue, Source::pendingTasksCount, "tasks/pending", "Pending replication tasks per source");
+  }
+
+  void stop() {
+    metricsHandles.forEach(RegistrationHandle::remove);
+  }
+
+  private void initCallbackMetrics(
+      ReplicationQueue queue,
+      Function<Source, Long> sourceMetricFunc,
+      String metricName,
+      String description) {
+    CallbackMetric1<String, Long> metric =
+        registerMetric(
+            metricMaker.newCallbackMetric(
+                metricName,
+                Long.class,
+                new Description(description).setGauge().setUnit(TASKS),
+                sourceField));
+    registerMetric(
+        metricMaker.newTrigger(
+            metric,
+            () -> {
+              List<Source> sources = queue.sourcesCollection().getAll();
+              if (sources.isEmpty()) {
+                metric.forceCreate("");
+              } else {
+                sources.forEach(
+                    (source) -> {
+                      metric.set(source.getRemoteConfigName(), sourceMetricFunc.apply(source));
+                      metric.prune();
+                    });
+              }
+            }));
+  }
+
+  public void incrementTaskScheduled(Source source) {
+    tasksScheduled.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskCancelled(Source source) {
+    tasksCancelled.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementQueuedBeforStartup() {
+    eventsQueuedBeforeStartup.increment();
+  }
+
+  public void incrementTaskCompleted(Source source) {
+    tasksCompleted.increment(source.getRemoteConfigName());
+  }
+
+  public void failTask(Source source) {
+    tasksFailed.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskMerged(Source source) {
+    tasksMerged.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskNotScheduled(Source source) {
+    tasksNotScheduled.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskRescheduled(Source source) {
+    tasksRescheduled.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskFailed(Source source) {
+    tasksFailed.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskRetrying(Source source) {
+    tasksRetrying.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskCancelledMaxRetries(Source source) {
+    tasksCancelledMaxRetries.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskStarted(Source source) {
+    tasksStarted.increment(source.getRemoteConfigName());
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 82e2fea..5e65de7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -124,6 +124,7 @@
   private final DynamicItem<EventDispatcher> eventDispatcher;
   private CloseableHttpClient httpClient;
   private final DeleteProjectTask.Factory deleteProjectFactory;
+  private final ReplicationQueueMetrics queueMetrics;
   private static final int DRAINED_CHECK_FREQUENCY_MS = 50;
   private static final int DRAINED_LOGGING_FREQUENCY_SECS = 5;
 
@@ -155,7 +156,8 @@
       GroupBackend groupBackend,
       ReplicationStateListeners stateLog,
       GroupIncludeCache groupIncludeCache,
-      DynamicItem<EventDispatcher> eventDispatcher) {
+      DynamicItem<EventDispatcher> eventDispatcher,
+      ReplicationQueueMetrics queueMetrics) {
     config = cfg;
     this.eventDispatcher = eventDispatcher;
     gitManager = gitRepositoryManager;
@@ -163,6 +165,7 @@
     this.userProvider = userProvider;
     this.projectCache = projectCache;
     this.stateLog = stateLog;
+    this.queueMetrics = queueMetrics;
 
     CurrentUser remoteUser;
     if (!cfg.getAuthGroupNames().isEmpty()) {
@@ -443,6 +446,7 @@
 
     repLog.info("scheduling replication {}:{} => {}", uri, ref, project);
     if (!shouldReplicate(project, ref, state)) {
+      queueMetrics.incrementTaskNotScheduled(this);
       return CompletableFuture.completedFuture(null);
     }
 
@@ -458,14 +462,17 @@
             if (head != null
                 && head.isSymbolic()
                 && RefNames.REFS_CONFIG.equals(head.getLeaf().getName())) {
+              queueMetrics.incrementTaskNotScheduled(this);
               return CompletableFuture.completedFuture(null);
             }
           } catch (IOException err) {
             stateLog.error(String.format("cannot check type of project %s", project), err, state);
+            queueMetrics.incrementTaskNotScheduled(this);
             return CompletableFuture.completedFuture(null);
           }
         } catch (IOException err) {
           stateLog.error(String.format("source project %s not available", project), err, state);
+          queueMetrics.incrementTaskNotScheduled(this);
           return CompletableFuture.completedFuture(null);
         }
       }
@@ -479,10 +486,18 @@
         addRef(e, ref);
         e.addState(ref, state);
         pending.put(uri, e);
-        f = pool.schedule(e, isSyncCall(replicationType) ? 0 : config.getDelay(), TimeUnit.SECONDS);
+        f =
+            pool.schedule(
+                runWithMetrics(e),
+                isSyncCall(replicationType) ? 0 : config.getDelay(),
+                TimeUnit.SECONDS);
+        queueMetrics.incrementTaskScheduled(this);
       } else if (!e.getRefs().contains(ref)) {
         addRef(e, ref);
         e.addState(ref, state);
+        queueMetrics.incrementTaskMerged(this);
+      } else {
+        queueMetrics.incrementTaskNotScheduled(this);
       }
       state.increaseFetchTaskCount(project.get(), ref);
       repLog.info("scheduled {}:{} => {} to run after {}s", e, ref, project, config.getDelay());
@@ -493,13 +508,16 @@
   void scheduleDeleteProject(String uri, Project.NameKey project) {
     @SuppressWarnings("unused")
     ScheduledFuture<?> ignored =
-        pool.schedule(deleteProjectFactory.create(this, uri, project), 0, TimeUnit.SECONDS);
+        pool.schedule(
+            runWithMetrics(deleteProjectFactory.create(this, uri, project)), 0, TimeUnit.SECONDS);
+    queueMetrics.incrementTaskScheduled(this);
   }
 
   void fetchWasCanceled(FetchOne fetchOp) {
     synchronized (stateLock) {
       URIish uri = fetchOp.getURI();
       pending.remove(uri);
+      queueMetrics.incrementTaskCancelled(this);
     }
   }
 
@@ -563,6 +581,8 @@
                   fetchOp.getTaskIdHex(), fetchOp.getURI(), pendingFetchOp.getTaskIdHex()),
               fetchOp.getStatesAsArray());
 
+          queueMetrics.incrementTaskRescheduled(this);
+
         } else {
           // The one pending is one that is NOT retrying, it was just
           // scheduled believing no problem would happen. The one pending
@@ -587,6 +607,8 @@
                   "[%s] Merging the pending fetch from [%s] with task [%s] and rescheduling",
                   pendingFetchOp.getTaskIdHex(), pendingFetchOp.getURI(), fetchOp.getTaskIdHex()),
               pendingFetchOp.getStatesAsArray());
+
+          queueMetrics.incrementTaskMerged(this);
         }
       }
 
@@ -594,7 +616,8 @@
         pending.put(uri, fetchOp);
         switch (reason) {
           case COLLISION:
-            pool.schedule(fetchOp, config.getRescheduleDelay(), TimeUnit.SECONDS);
+            queueMetrics.incrementTaskRescheduled(this);
+            pool.schedule(runWithMetrics(fetchOp), config.getRescheduleDelay(), TimeUnit.SECONDS);
             break;
           case TRANSPORT_ERROR:
           case REPOSITORY_MISSING:
@@ -604,15 +627,19 @@
                     ? RefUpdate.Result.NOT_ATTEMPTED
                     : RefUpdate.Result.REJECTED_OTHER_REASON;
             postReplicationFailedEvent(fetchOp, trackingRefUpdate);
+            queueMetrics.incrementTaskFailed(this);
+
             if (fetchOp.setToRetry()) {
               postReplicationScheduledEvent(fetchOp);
               pool.schedule(fetchOp, config.getRetryDelay(), TimeUnit.MINUTES);
+              queueMetrics.incrementTaskRetrying(this);
             } else {
               fetchOp.canceledByReplication();
               pending.remove(uri);
               stateLog.error(
                   "Fetch from " + fetchOp.getURI() + " cancelled after maximum number of retries",
                   fetchOp.getStatesAsArray());
+              queueMetrics.incrementTaskCancelledMaxRetries(this);
             }
             break;
         }
@@ -835,13 +862,24 @@
       @SuppressWarnings("unused")
       ScheduledFuture<?> ignored =
           pool.schedule(
-              updateHeadFactory.create(this, apiURI, project, newHead), 0, TimeUnit.SECONDS);
+              runWithMetrics(updateHeadFactory.create(this, apiURI, project, newHead)),
+              0,
+              TimeUnit.SECONDS);
+      queueMetrics.incrementTaskScheduled(this);
     } catch (URISyntaxException e) {
       logger.atSevere().withCause(e).log(
           "Could not schedule HEAD pull-replication for project %s", project.get());
     }
   }
 
+  long inflightTasksCount() {
+    return inFlight.size();
+  }
+
+  long pendingTasksCount() {
+    return pending.size();
+  }
+
   private static boolean matches(URIish uri, String urlMatch) {
     if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
       return true;
@@ -887,4 +925,16 @@
       Context.unsetLocalEvent();
     }
   }
+
+  private Runnable runWithMetrics(Runnable runnableTask) {
+    return () -> {
+      queueMetrics.incrementTaskStarted(Source.this);
+      runnableTask.run();
+      if (runnableTask instanceof Completable) {
+        if (((Completable) runnableTask).hasSucceeded()) {
+          queueMetrics.incrementTaskCompleted(Source.this);
+        }
+      }
+    };
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
index e169eb3..b366b90 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
@@ -27,7 +27,7 @@
 import java.io.IOException;
 import org.eclipse.jgit.transport.URIish;
 
-public class UpdateHeadTask implements Runnable {
+public class UpdateHeadTask implements Runnable, Completable {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private final FetchApiClient.Factory fetchClientFactory;
   private final Source source;
@@ -35,6 +35,7 @@
   private final Project.NameKey project;
   private final String newHead;
   private final int id;
+  private boolean succeeded;
 
   interface Factory {
     UpdateHeadTask create(Source source, URIish apiURI, Project.NameKey project, String newHead);
@@ -64,6 +65,7 @@
       if (!httpResult.isSuccessful()) {
         throw new IOException(httpResult.getMessage().orElse("Unknown"));
       }
+      succeeded = true;
       logger.atFine().log(
           "Successfully updated HEAD of project %s on remote %s",
           project.get(), apiURI.toASCIIString());
@@ -83,4 +85,9 @@
         "[%s] update-head %s at %s to %s",
         HexFormat.fromInt(id), project.get(), apiURI.toASCIIString(), newHead);
   }
+
+  @Override
+  public boolean hasSucceeded() {
+    return succeeded;
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/InMemoryMetricMaker.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/InMemoryMetricMaker.java
new file mode 100644
index 0000000..bd85f4f
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/InMemoryMetricMaker.java
@@ -0,0 +1,103 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.metrics.Counter0;
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.DisabledMetricMaker;
+import com.google.gerrit.metrics.Field;
+import com.google.inject.Singleton;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** Exports no metrics but counters kept in memory for testing purposes */
+@Singleton
+public class InMemoryMetricMaker extends DisabledMetricMaker {
+
+  public static class InMemoryCounter0 extends Counter0 {
+    private AtomicLong counter = new AtomicLong();
+
+    @Override
+    public void remove() {}
+
+    @Override
+    public void incrementBy(long delta) {
+      counter.addAndGet(delta);
+    }
+
+    public long getValue() {
+      return counter.get();
+    }
+
+    @Override
+    public String toString() {
+      return counter.toString();
+    }
+  }
+
+  public static class InMemoryCounter1<F> extends Counter1<F> {
+    private ConcurrentHashMap<F, Long> countersMap = new ConcurrentHashMap<>();
+
+    @Override
+    public void remove() {}
+
+    @Override
+    public void incrementBy(F field, long delta) {
+      countersMap.compute(field, (f, counter) -> counter == null ? delta : counter + delta);
+    }
+
+    public Optional<Long> getValue(Object fieldValue) {
+      return Optional.ofNullable(countersMap.get(fieldValue));
+    }
+
+    @Override
+    public String toString() {
+      return countersMap.toString();
+    }
+  }
+
+  private ConcurrentHashMap<String, InMemoryCounter0> counters0Map = new ConcurrentHashMap<>();
+  private ConcurrentHashMap<String, InMemoryCounter1<?>> counters1Map = new ConcurrentHashMap<>();
+
+  @Override
+  public Counter0 newCounter(String name, Description desc) {
+    InMemoryCounter0 counter = new InMemoryCounter0();
+    counters0Map.put(name, counter);
+    return counter;
+  }
+
+  @Override
+  public <F> Counter1<F> newCounter(String name, Description desc, Field<F> field) {
+    InMemoryCounter1<F> counter = new InMemoryCounter1<>();
+    counters1Map.put(name, counter);
+    return counter;
+  }
+
+  public Optional<Long> counterValue(String name) {
+    return Optional.ofNullable(counters0Map.get(name)).map(InMemoryCounter0::getValue);
+  }
+
+  public <F> Optional<Long> counterValue(String name, F fieldValue) {
+    return Optional.ofNullable(counters1Map.get(name))
+        .flatMap(counter -> counter.getValue(fieldValue));
+  }
+
+  @Override
+  public String toString() {
+    return counters0Map.toString() + "\n" + counters1Map.toString();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
index be017d0..0ebd0ae 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
@@ -26,7 +26,8 @@
 @UseLocalDisk
 @TestPlugin(
     name = "pull-replication",
-    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+    sysModule =
+        "com.googlesource.gerrit.plugins.replication.pull.PullReplicationITAbstract$PullReplicationTestModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
 public class PullReplicationAsyncIT extends PullReplicationITAbstract {
   @Inject private SitePaths sitePaths;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
index 6ee3c43..18f7a0c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
@@ -22,6 +22,7 @@
 @UseLocalDisk
 @TestPlugin(
     name = "pull-replication",
-    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+    sysModule =
+        "com.googlesource.gerrit.plugins.replication.pull.PullReplicationITAbstract$PullReplicationTestModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
 public class PullReplicationIT extends PullReplicationITAbstract {}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
index ba812e2..f9fa0da 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.replication.pull;
 
 import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.truth.Truth8.assertThat;
 import static com.google.gerrit.acceptance.GitUtil.fetch;
 import static com.google.gerrit.acceptance.GitUtil.pushOne;
 import static com.google.gerrit.acceptance.testsuite.project.TestProjectUpdate.allow;
@@ -31,6 +32,9 @@
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
 import java.io.IOException;
 import java.util.Collection;
@@ -52,6 +56,19 @@
 /** Base class to run regular and async acceptance tests */
 public abstract class PullReplicationITAbstract extends PullReplicationSetupBase {
 
+  public static class PullReplicationTestModule extends PullReplicationModule {
+
+    @Inject
+    public PullReplicationTestModule(SitePaths site) {
+      super(site);
+    }
+
+    @Override
+    protected Class<? extends MetricMaker> metricMakerClass() {
+      return InMemoryMetricMaker.class;
+    }
+  }
+
   @Override
   protected void setReplicationSource(
       String remoteName, List<String> replicaSuffixes, Optional<String> project)
@@ -91,6 +108,7 @@
             sourceCommit.getId().getName(),
             TEST_REPLICATION_REMOTE);
     pullReplicationQueue.onEvent(event);
+    waitUntilReplicationCompleted(1);
 
     try (Repository repo = repoManager.openRepository(project)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -101,6 +119,13 @@
     }
   }
 
+  private void assertTasksMetricScheduledAndCompleted(int numTasks) {
+    assertTasksMetric("scheduled", numTasks);
+    assertTasksMetric("started", numTasks);
+    assertTasksMetric("completed", numTasks);
+    assertEmptyTasksMetric("failed");
+  }
+
   @Test
   @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewBranch() throws Exception {
@@ -124,6 +149,7 @@
             branchRevision,
             TEST_REPLICATION_REMOTE);
     pullReplicationQueue.onEvent(event);
+    waitUntilReplicationCompleted(1);
 
     try (Repository repo = repoManager.openRepository(project);
         Repository sourceRepo = repoManager.openRepository(project)) {
@@ -133,6 +159,8 @@
       assertThat(targetBranchRef).isNotNull();
       assertThat(targetBranchRef.getObjectId().getName()).isEqualTo(branchRevision);
     }
+
+    assertTasksMetricScheduledAndCompleted(1);
   }
 
   @Test
@@ -167,6 +195,7 @@
             branchRevision,
             TEST_REPLICATION_REMOTE);
     pullReplicationQueue.onEvent(event);
+    waitUntilReplicationCompleted(1);
 
     try (Repository repo = repoManager.openRepository(project)) {
       waitUntil(() -> checkedGetRef(repo, newBranch) != null);
@@ -193,6 +222,7 @@
             amendedCommit.getId().getName(),
             TEST_REPLICATION_REMOTE);
     pullReplicationQueue.onEvent(forcedPushEvent);
+    waitUntilReplicationCompleted(2);
 
     try (Repository repo = repoManager.openRepository(project);
         Repository sourceRepo = repoManager.openRepository(project)) {
@@ -204,6 +234,8 @@
                       .getName()
                       .equals(amendedCommit.getId().getName()));
     }
+
+    assertTasksMetricScheduledAndCompleted(2);
   }
 
   @Test
@@ -232,6 +264,7 @@
             sourceCommit.getId().getName(),
             TEST_REPLICATION_REMOTE);
     pullReplicationQueue.onEvent(event);
+    waitUntilReplicationCompleted(1);
 
     try (Repository repo = repoManager.openRepository(project)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -240,6 +273,8 @@
       assertThat(targetBranchRef).isNotNull();
       assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
     }
+
+    assertTasksMetricScheduledAndCompleted(1);
   }
 
   @Test
@@ -273,6 +308,7 @@
             branchRevision,
             TEST_REPLICATION_REMOTE);
     pullReplicationQueue.onEvent(event);
+    waitUntilReplicationCompleted(1);
 
     try (Repository repo = repoManager.openRepository(project);
         Repository sourceRepo = repoManager.openRepository(project)) {
@@ -282,6 +318,8 @@
       assertThat(targetBranchRef).isNotNull();
       assertThat(targetBranchRef.getObjectId().getName()).isEqualTo(branchRevision);
     }
+
+    assertTasksMetricScheduledAndCompleted(1);
   }
 
   @Test
@@ -309,8 +347,11 @@
     for (ProjectDeletedListener l : deletedListeners) {
       l.onProjectDeleted(event);
     }
+    waitUntilReplicationCompleted(1);
 
     waitUntil(() -> !repoManager.list().contains(project));
+
+    assertTasksMetricScheduledAndCompleted(1);
   }
 
   @Test
@@ -335,6 +376,7 @@
 
     HeadUpdatedListener.Event event = new FakeHeadUpdateEvent(master, newBranch, testProjectName);
     pullReplicationQueue.onHeadUpdated(event);
+    waitUntilReplicationCompleted(1);
 
     waitUntil(
         () -> {
@@ -344,6 +386,8 @@
             return false;
           }
         });
+
+    assertTasksMetricScheduledAndCompleted(1);
   }
 
   @Ignore
@@ -365,6 +409,7 @@
             sourceCommit.getId().getName(),
             TEST_REPLICATION_REMOTE);
     pullReplicationQueue.onEvent(event);
+    waitUntilReplicationCompleted(1);
 
     try (Repository repo = repoManager.openRepository(project)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -374,4 +419,27 @@
       assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
     }
   }
+
+  private void waitUntilReplicationCompleted(int expected) throws InterruptedException {
+    waitUntil(
+        () ->
+            inMemoryMetrics()
+                .counterValue("tasks/completed", TEST_REPLICATION_REMOTE)
+                .filter(counter -> counter == expected)
+                .isPresent());
+  }
+
+  private InMemoryMetricMaker inMemoryMetrics() {
+    return getInstance(InMemoryMetricMaker.class);
+  }
+
+  private void assertTasksMetric(String taskMetric, long value) {
+    assertThat(inMemoryMetrics().counterValue("tasks/" + taskMetric, TEST_REPLICATION_REMOTE))
+        .hasValue(value);
+  }
+
+  private void assertEmptyTasksMetric(String taskMetric) {
+    assertThat(inMemoryMetrics().counterValue("tasks/" + taskMetric, TEST_REPLICATION_REMOTE))
+        .isEmpty();
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
index 0a88ef3..9dc0b47 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -98,6 +98,7 @@
   @Mock ApplyObjectsRefsFilter applyObjectsRefsFilter;
   ApplyObjectMetrics applyObjectMetrics;
   FetchReplicationMetrics fetchMetrics;
+  ReplicationQueueMetrics queueMetrics;
   ShutdownState shutdownState;
 
   @Captor ArgumentCaptor<String> stringCaptor;
@@ -158,6 +159,7 @@
 
     applyObjectMetrics = new ApplyObjectMetrics("pull-replication", new DisabledMetricMaker());
     fetchMetrics = new FetchReplicationMetrics("pull-replication", new DisabledMetricMaker());
+    queueMetrics = new ReplicationQueueMetrics("pull-replication", new DisabledMetricMaker());
     shutdownState = new ShutdownState();
 
     objectUnderTest =
@@ -171,6 +173,7 @@
             () -> revReader,
             applyObjectMetrics,
             fetchMetrics,
+            queueMetrics,
             LOCAL_INSTANCE_ID,
             applyObjectsRefsFilter,
             shutdownState);
@@ -348,6 +351,7 @@
             () -> revReader,
             applyObjectMetrics,
             fetchMetrics,
+            queueMetrics,
             LOCAL_INSTANCE_ID,
             applyObjectsRefsFilter,
             shutdownState);