Merge branch 'stable-3.5' into stable-3.6

* stable-3.5:
  Rename metric tasks/max_retries to tasks/failed_max_retries
  Document metrics
  Fix completed tasks metrics
  Log fetch tasks when graceful shutdown fails
  Displaying refs for fetch tasks
  Leverage ShutdownState in SourcesCollection
  Fix pull replication queue metric prefix
  Set pull replication tests as large
  Introduce replication queue metrics
  Handle fetch tasks gracefully during shutdown

Change-Id: Iedbefd1b32b9d651b8f865fd485b7d3c0dfe3087
diff --git a/BUILD b/BUILD
index 0983983..972f0d8 100644
--- a/BUILD
+++ b/BUILD
@@ -12,6 +12,7 @@
         "Gerrit-InitStep: com.googlesource.gerrit.plugins.replication.pull.InitPlugin",
         "Gerrit-SshModule: com.googlesource.gerrit.plugins.replication.pull.SshModule",
         "Gerrit-HttpModule: com.googlesource.gerrit.plugins.replication.pull.api.HttpModule",
+        "Gerrit-ReloadMode: restart",
     ],
     resources = glob(["src/main/resources/**/*"]),
     deps = [
@@ -24,6 +25,7 @@
 
 junit_tests(
     name = "pull_replication_tests",
+    size = "large",
     srcs = glob([
         "src/test/java/**/*Test.java",
     ]),
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Completable.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Completable.java
new file mode 100644
index 0000000..fa3d53b
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Completable.java
@@ -0,0 +1,20 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+interface Completable {
+
+  boolean hasSucceeded();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
index 2882482..4818ec7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
@@ -28,7 +28,7 @@
 import java.net.URISyntaxException;
 import org.eclipse.jgit.transport.URIish;
 
-public class DeleteProjectTask implements Runnable {
+public class DeleteProjectTask implements Runnable, Completable {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   interface Factory {
@@ -40,6 +40,7 @@
   private final String uri;
   private final Project.NameKey project;
   private final FetchApiClient.Factory fetchClientFactory;
+  private boolean succeeded;
 
   @Inject
   DeleteProjectTask(
@@ -63,6 +64,7 @@
       if (!httpResult.isSuccessful()) {
         throw new IOException(httpResult.getMessage().orElse("Unknown"));
       }
+      succeeded = true;
       logger.atFine().log("Successfully deleted project %s on remote %s", project.get(), uri);
     } catch (URISyntaxException | IOException e) {
       String errorMessage =
@@ -76,4 +78,9 @@
   public String toString() {
     return String.format("[%s] delete-project %s at %s", HexFormat.fromInt(id), project.get(), uri);
   }
+
+  @Override
+  public boolean hasSucceeded() {
+    return succeeded;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index 39db764..a6b5ab7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -67,7 +67,7 @@
  * <p>Instance members are protected by the lock within FetchQueue. Callers must take that lock to
  * ensure they are working with a current view of the object.
  */
-public class FetchOne implements ProjectRunnable, CanceledWhileRunning {
+public class FetchOne implements ProjectRunnable, CanceledWhileRunning, Completable {
   private final ReplicationStateListener stateLog;
   public static final String ALL_REFS = "..all..";
   static final String ID_KEY = "fetchOneId";
@@ -103,6 +103,7 @@
   private final FetchFactory fetchFactory;
   private final Optional<PullReplicationApiRequestMetrics> apiRequestMetrics;
   private DynamicItem<ReplicationFetchFilter> replicationFetchFilter;
+  private boolean succeeded;
 
   @Inject
   FetchOne(
@@ -177,7 +178,7 @@
 
   @Override
   public String toString() {
-    String print = "[" + taskIdHex + "] fetch " + uri;
+    String print = "[" + taskIdHex + "] fetch " + uri + " [" + String.join(",", delta) + "]";
 
     if (retryCount > 0) {
       print = "(retry " + retryCount + ") " + print;
@@ -503,6 +504,7 @@
         case FORCED:
         case RENAMED:
         case FAST_FORWARD:
+          succeeded = true;
           break;
         case NOT_ATTEMPTED:
         case REJECTED:
@@ -580,4 +582,9 @@
   public Optional<PullReplicationApiRequestMetrics> getRequestMetrics() {
     return apiRequestMetrics;
   }
+
+  @Override
+  public boolean hasSucceeded() {
+    return succeeded;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 03345e2..49a0ca1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -24,6 +24,7 @@
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gerrit.server.events.EventTypes;
@@ -33,6 +34,7 @@
 import com.google.inject.Scopes;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.internal.UniqueAnnotations;
+import com.google.inject.name.Names;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
 import com.googlesource.gerrit.plugins.replication.AutoReloadSecureCredentialsFactoryDecorator;
 import com.googlesource.gerrit.plugins.replication.ConfigParser;
@@ -66,15 +68,20 @@
 class PullReplicationModule extends AbstractModule {
   private final SitePaths site;
   private final Path cfgPath;
+  private final MetricMaker pluginMetricMaker;
 
   @Inject
-  public PullReplicationModule(SitePaths site) {
+  public PullReplicationModule(SitePaths site, MetricMaker pluginMetricMaker) {
     this.site = site;
     cfgPath = site.etc_dir.resolve("replication.config");
+    this.pluginMetricMaker = pluginMetricMaker;
   }
 
   @Override
   protected void configure() {
+    bind(MetricMaker.class)
+        .annotatedWith(Names.named(ReplicationQueueMetrics.REPLICATION_QUEUE_METRICS))
+        .toInstance(pluginMetricMaker);
 
     bind(CapabilityDefinition.class)
         .annotatedWith(Exports.named(CALL_FETCH_ACTION))
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index b88db69..1ba47de 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -32,6 +32,7 @@
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
+import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.replication.ObservableQueue;
 import com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.GitUpdateProcessing;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
@@ -65,6 +66,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@Singleton
 public class ReplicationQueue
     implements ObservableQueue,
         EventListener,
@@ -79,6 +81,7 @@
   private static final String REF_UDPATED_EVENT_TYPE = new RefUpdatedEvent().type;
   private static final String ZEROS_OBJECTID = ObjectId.zeroId().getName();
   private final ReplicationStateListener stateLog;
+  private final ShutdownState shutdownState;
 
   private final WorkQueue workQueue;
   private final DynamicItem<EventDispatcher> dispatcher;
@@ -92,6 +95,7 @@
   private Provider<RevisionReader> revReaderProvider;
   private final ApplyObjectMetrics applyObjectMetrics;
   private final FetchReplicationMetrics fetchMetrics;
+  private final ReplicationQueueMetrics queueMetrics;
   private final String instanceId;
   private ApplyObjectsRefsFilter applyObjectsRefsFilter;
 
@@ -106,18 +110,22 @@
       Provider<RevisionReader> revReaderProvider,
       ApplyObjectMetrics applyObjectMetrics,
       FetchReplicationMetrics fetchMetrics,
+      ReplicationQueueMetrics queueMetrics,
       @GerritInstanceId String instanceId,
-      ApplyObjectsRefsFilter applyObjectsRefsFilter) {
+      ApplyObjectsRefsFilter applyObjectsRefsFilter,
+      ShutdownState shutdownState) {
     workQueue = wq;
     dispatcher = dis;
     sources = rd;
     stateLog = sl;
+    this.shutdownState = shutdownState;
     beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
     this.fetchClientFactory = fetchClientFactory;
     this.refsFilter = refsFilter;
     this.revReaderProvider = revReaderProvider;
     this.applyObjectMetrics = applyObjectMetrics;
     this.fetchMetrics = fetchMetrics;
+    this.queueMetrics = queueMetrics;
     this.instanceId = instanceId;
     this.applyObjectsRefsFilter = applyObjectsRefsFilter;
   }
@@ -126,6 +134,7 @@
   public void start() {
     if (!running) {
       sources.get().startup(workQueue);
+      queueMetrics.start(this);
       fetchCallsTimeout =
           2
               * sources.get().getAll().stream()
@@ -141,10 +150,12 @@
   @Override
   public void stop() {
     running = false;
+    shutdownState.setIsShuttingDown(true);
     int discarded = sources.get().shutdown();
     if (discarded > 0) {
       repLog.warn("Canceled {} replication events during shutdown", discarded);
     }
+    queueMetrics.stop();
   }
 
   @Override
@@ -211,6 +222,8 @@
           "Replication plugin did not finish startup before event, event replication is postponed",
           state);
       beforeStartupEventsQueue.add(event);
+
+      queueMetrics.incrementQueuedBeforStartup();
       return;
     }
     ForkJoinPool fetchCallsPool = null;
@@ -538,6 +551,10 @@
                     .forEach(apiUrl -> s.scheduleUpdateHead(apiUrl, p, event.getNewHeadName())));
   }
 
+  SourcesCollection sourcesCollection() {
+    return sources.get();
+  }
+
   @AutoValue
   abstract static class ReferenceUpdatedEvent {
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueMetrics.java
new file mode 100644
index 0000000..741927f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueMetrics.java
@@ -0,0 +1,276 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.extensions.registration.RegistrationHandle;
+import com.google.gerrit.metrics.CallbackMetric1;
+import com.google.gerrit.metrics.Counter0;
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.logging.PluginMetadata;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+@Singleton
+public class ReplicationQueueMetrics {
+  private static final String EVENTS = "events";
+  private static final String TASKS = "tasks";
+  public static final String REPLICATION_QUEUE_METRICS = "ReplicationQueueMetrics";
+
+  private final Counter1<String> tasksScheduled;
+  private final Counter1<String> tasksCancelled;
+  private final Counter1<String> tasksNotScheduled;
+  private final Counter1<String> tasksRescheduled;
+  private final Counter1<String> tasksCompleted;
+  private final Counter1<String> tasksMerged;
+  private final Counter1<String> tasksFailed;
+  private final Counter1<String> tasksRetrying;
+  private final Counter0 eventsQueuedBeforeStartup;
+  private final Counter1<String> tasksCancelledMaxRetries;
+  private final MetricMaker metricMaker;
+  private final Field<String> sourceField;
+  private final Counter1<String> tasksStarted;
+  private final Set<RegistrationHandle> metricsHandles;
+
+  public class RunnableWithMetrics implements Runnable {
+    private final Source source;
+    private final Runnable runnable;
+
+    public RunnableWithMetrics(Source source, Runnable runnable) {
+      this.source = source;
+      this.runnable = runnable;
+    }
+
+    @Override
+    public void run() {
+      incrementTaskStarted(source);
+      runnable.run();
+      if (runnable instanceof Completable) {
+        if (((Completable) runnable).hasSucceeded()) {
+          incrementTaskCompleted(source);
+        }
+      }
+    }
+  }
+
+  @Inject
+  public ReplicationQueueMetrics(
+      @PluginName String pluginName, @Named(REPLICATION_QUEUE_METRICS) MetricMaker metricMaker) {
+    metricsHandles = new HashSet<>();
+
+    sourceField =
+        Field.ofString(
+                "source",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(PluginMetadata.create("source", fieldValue)))
+            .build();
+
+    eventsQueuedBeforeStartup =
+        registerMetric(
+            metricMaker.newCounter(
+                "events/queued_before_startup",
+                new Description("Replication events queued before startup")
+                    .setCumulative()
+                    .setUnit(EVENTS)));
+
+    tasksScheduled =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/scheduled",
+                new Description("Replication tasks scheduled").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksStarted =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/started",
+                new Description("Replication tasks started").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksRescheduled =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/rescheduled",
+                new Description("Replication tasks rescheduled").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksCancelled =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/cancelled",
+                new Description("Replication tasks cancelled").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksFailed =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/failed",
+                new Description("Replication tasks failed").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksRetrying =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/retrying",
+                new Description("Replication tasks retrying").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksNotScheduled =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/not_scheduled",
+                new Description("Replication tasks not scheduled").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksCompleted =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/completed",
+                new Description("Replication tasks completed").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksMerged =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/merged",
+                new Description("Replication tasks merged").setCumulative().setUnit(TASKS),
+                sourceField));
+
+    tasksCancelledMaxRetries =
+        registerMetric(
+            metricMaker.newCounter(
+                "tasks/failed_max_retries",
+                new Description("Replication tasks cancelled for maximum number of retries")
+                    .setCumulative()
+                    .setUnit(TASKS),
+                sourceField));
+
+    this.metricMaker = metricMaker;
+  }
+
+  private <T extends RegistrationHandle> T registerMetric(T metricHandle) {
+    metricsHandles.add(metricHandle);
+    return metricHandle;
+  }
+
+  void start(ReplicationQueue queue) {
+    initCallbackMetrics(
+        queue,
+        Source::inflightTasksCount,
+        "tasks/inflight",
+        "In-flight replication tasks per source");
+    initCallbackMetrics(
+        queue, Source::pendingTasksCount, "tasks/pending", "Pending replication tasks per source");
+  }
+
+  void stop() {
+    metricsHandles.forEach(RegistrationHandle::remove);
+  }
+
+  private void initCallbackMetrics(
+      ReplicationQueue queue,
+      Function<Source, Long> sourceMetricFunc,
+      String metricName,
+      String description) {
+    CallbackMetric1<String, Long> metric =
+        registerMetric(
+            metricMaker.newCallbackMetric(
+                metricName,
+                Long.class,
+                new Description(description).setGauge().setUnit(TASKS),
+                sourceField));
+    registerMetric(
+        metricMaker.newTrigger(
+            metric,
+            () -> {
+              List<Source> sources = queue.sourcesCollection().getAll();
+              if (sources.isEmpty()) {
+                metric.forceCreate("");
+              } else {
+                sources.forEach(
+                    (source) -> {
+                      metric.set(source.getRemoteConfigName(), sourceMetricFunc.apply(source));
+                      metric.prune();
+                    });
+              }
+            }));
+  }
+
+  public void incrementTaskScheduled(Source source) {
+    tasksScheduled.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskCancelled(Source source) {
+    tasksCancelled.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementQueuedBeforStartup() {
+    eventsQueuedBeforeStartup.increment();
+  }
+
+  public void incrementTaskCompleted(Source source) {
+    tasksCompleted.increment(source.getRemoteConfigName());
+  }
+
+  public void failTask(Source source) {
+    tasksFailed.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskMerged(Source source) {
+    tasksMerged.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskNotScheduled(Source source) {
+    tasksNotScheduled.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskRescheduled(Source source) {
+    tasksRescheduled.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskFailed(Source source) {
+    tasksFailed.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskRetrying(Source source) {
+    tasksRetrying.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskCancelledMaxRetries(Source source) {
+    tasksCancelledMaxRetries.increment(source.getRemoteConfigName());
+  }
+
+  public void incrementTaskStarted(Source source) {
+    tasksStarted.increment(source.getRemoteConfigName());
+  }
+
+  public Runnable runWithMetrics(Source source, Runnable runnableTask) {
+    if (runnableTask instanceof RunnableWithMetrics) {
+      return runnableTask;
+    }
+
+    return new RunnableWithMetrics(source, runnableTask);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ShutdownState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ShutdownState.java
new file mode 100644
index 0000000..0f99a7f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ShutdownState.java
@@ -0,0 +1,41 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Singleton object representing this plugin shutdown state. This class can be injected any consumer
+ * interested in knowing if a shutdown was initiated.
+ */
+@Singleton
+public class ShutdownState {
+  private final AtomicBoolean isShuttingDown;
+
+  @Inject
+  public ShutdownState() {
+    isShuttingDown = new AtomicBoolean(false);
+  }
+
+  public void setIsShuttingDown(boolean shutting) {
+    isShuttingDown.set(shutting);
+  }
+
+  public boolean isShuttingDown() {
+    return isShuttingDown.get();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index f75ea7b..5ab3859 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -16,7 +16,10 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
 import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.SYNC;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -72,6 +75,7 @@
 import java.io.UnsupportedEncodingException;
 import java.net.URISyntaxException;
 import java.net.URLEncoder;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -84,6 +88,7 @@
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.eclipse.jgit.errors.TransportException;
@@ -119,6 +124,9 @@
   private final DynamicItem<EventDispatcher> eventDispatcher;
   private CloseableHttpClient httpClient;
   private final DeleteProjectTask.Factory deleteProjectFactory;
+  private final ReplicationQueueMetrics queueMetrics;
+  private static final int DRAINED_CHECK_FREQUENCY_MS = 50;
+  private static final int DRAINED_LOGGING_FREQUENCY_SECS = 5;
 
   protected enum RetryReason {
     TRANSPORT_ERROR,
@@ -148,7 +156,8 @@
       GroupBackend groupBackend,
       ReplicationStateListeners stateLog,
       GroupIncludeCache groupIncludeCache,
-      DynamicItem<EventDispatcher> eventDispatcher) {
+      DynamicItem<EventDispatcher> eventDispatcher,
+      ReplicationQueueMetrics queueMetrics) {
     config = cfg;
     this.eventDispatcher = eventDispatcher;
     gitManager = gitRepositoryManager;
@@ -156,6 +165,7 @@
     this.userProvider = userProvider;
     this.projectCache = projectCache;
     this.stateLog = stateLog;
+    this.queueMetrics = queueMetrics;
 
     CurrentUser remoteUser;
     if (!cfg.getAuthGroupNames().isEmpty()) {
@@ -256,7 +266,15 @@
   public synchronized int shutdown() {
     int cnt = 0;
     if (pool != null) {
-      cnt = pool.shutdownNow().size();
+      try {
+        waitUntil(this::isDrained, Duration.ofSeconds(config.getShutDownDrainTimeout()));
+        cnt = pool.shutdownNow().size();
+      } catch (InterruptedException e) {
+        logger.atSevere().withCause(e).log("Interrupted during termination.");
+        List<Runnable> fetchTasks = pool.shutdownNow();
+        logInterruptedShutdownStatus(fetchTasks);
+        cnt = fetchTasks.size();
+      }
       pool = null;
     }
     if (httpClient != null) {
@@ -271,6 +289,46 @@
     return cnt;
   }
 
+  private void logInterruptedShutdownStatus(List<Runnable> fetchTasks) {
+    String neverExecutedTasks =
+        fetchTasks.stream().map(r -> r.toString()).collect(Collectors.joining(","));
+    String pendingTasks =
+        pending.values().stream().map(FetchOne::toString).collect(Collectors.joining(","));
+    String inFlightTasks =
+        inFlight.values().stream().map(FetchOne::toString).collect(Collectors.joining(","));
+
+    repLog.error("Never executed tasks: {}", neverExecutedTasks);
+    repLog.error("Pending tasks: {}", pendingTasks);
+    repLog.error("In-flight tasks: {}", inFlightTasks);
+  }
+
+  private boolean isDrained() {
+    int numberOfPending = pending.size();
+    int numberOfInFlight = inFlight.size();
+
+    boolean drained = numberOfPending == 0 && numberOfInFlight == 0;
+
+    if (!drained) {
+      logger.atWarning().atMostEvery(DRAINED_LOGGING_FREQUENCY_SECS, SECONDS).log(
+          "Queue not drained: %d pending|%d in-flight", numberOfPending, numberOfInFlight);
+    } else {
+      logger.atWarning().log("Queue drained");
+    }
+
+    return drained;
+  }
+
+  private static void waitUntil(Supplier<Boolean> waitCondition, Duration timeout)
+      throws InterruptedException {
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    while (!waitCondition.get()) {
+      if (stopwatch.elapsed().compareTo(timeout) > 0) {
+        throw new InterruptedException();
+      }
+      MILLISECONDS.sleep(DRAINED_CHECK_FREQUENCY_MS);
+    }
+  }
+
   private boolean shouldReplicate(ProjectState state, CurrentUser user)
       throws PermissionBackendException {
     if (!config.replicateHiddenProjects()
@@ -402,6 +460,7 @@
 
     repLog.info("scheduling replication {}:{} => {}", uri, ref, project);
     if (!shouldReplicate(project, ref, state)) {
+      queueMetrics.incrementTaskNotScheduled(this);
       return CompletableFuture.completedFuture(null);
     }
 
@@ -417,14 +476,17 @@
             if (head != null
                 && head.isSymbolic()
                 && RefNames.REFS_CONFIG.equals(head.getLeaf().getName())) {
+              queueMetrics.incrementTaskNotScheduled(this);
               return CompletableFuture.completedFuture(null);
             }
           } catch (IOException err) {
             stateLog.error(String.format("cannot check type of project %s", project), err, state);
+            queueMetrics.incrementTaskNotScheduled(this);
             return CompletableFuture.completedFuture(null);
           }
         } catch (IOException err) {
           stateLog.error(String.format("source project %s not available", project), err, state);
+          queueMetrics.incrementTaskNotScheduled(this);
           return CompletableFuture.completedFuture(null);
         }
       }
@@ -438,10 +500,18 @@
         addRef(e, ref);
         e.addState(ref, state);
         pending.put(uri, e);
-        f = pool.schedule(e, isSyncCall(replicationType) ? 0 : config.getDelay(), TimeUnit.SECONDS);
+        f =
+            pool.schedule(
+                queueMetrics.runWithMetrics(this, e),
+                isSyncCall(replicationType) ? 0 : config.getDelay(),
+                TimeUnit.SECONDS);
+        queueMetrics.incrementTaskScheduled(this);
       } else if (!e.getRefs().contains(ref)) {
         addRef(e, ref);
         e.addState(ref, state);
+        queueMetrics.incrementTaskMerged(this);
+      } else {
+        queueMetrics.incrementTaskNotScheduled(this);
       }
       state.increaseFetchTaskCount(project.get(), ref);
       repLog.info("scheduled {}:{} => {} to run after {}s", e, ref, project, config.getDelay());
@@ -452,13 +522,18 @@
   void scheduleDeleteProject(String uri, Project.NameKey project) {
     @SuppressWarnings("unused")
     ScheduledFuture<?> ignored =
-        pool.schedule(deleteProjectFactory.create(this, uri, project), 0, TimeUnit.SECONDS);
+        pool.schedule(
+            queueMetrics.runWithMetrics(this, deleteProjectFactory.create(this, uri, project)),
+            0,
+            TimeUnit.SECONDS);
+    queueMetrics.incrementTaskScheduled(this);
   }
 
   void fetchWasCanceled(FetchOne fetchOp) {
     synchronized (stateLock) {
       URIish uri = fetchOp.getURI();
       pending.remove(uri);
+      queueMetrics.incrementTaskCancelled(this);
     }
   }
 
@@ -522,6 +597,8 @@
                   fetchOp.getTaskIdHex(), fetchOp.getURI(), pendingFetchOp.getTaskIdHex()),
               fetchOp.getStatesAsArray());
 
+          queueMetrics.incrementTaskRescheduled(this);
+
         } else {
           // The one pending is one that is NOT retrying, it was just
           // scheduled believing no problem would happen. The one pending
@@ -546,6 +623,8 @@
                   "[%s] Merging the pending fetch from [%s] with task [%s] and rescheduling",
                   pendingFetchOp.getTaskIdHex(), pendingFetchOp.getURI(), fetchOp.getTaskIdHex()),
               pendingFetchOp.getStatesAsArray());
+
+          queueMetrics.incrementTaskMerged(this);
         }
       }
 
@@ -553,7 +632,11 @@
         pending.put(uri, fetchOp);
         switch (reason) {
           case COLLISION:
-            pool.schedule(fetchOp, config.getRescheduleDelay(), TimeUnit.SECONDS);
+            queueMetrics.incrementTaskRescheduled(this);
+            pool.schedule(
+                queueMetrics.runWithMetrics(this, fetchOp),
+                config.getRescheduleDelay(),
+                TimeUnit.SECONDS);
             break;
           case TRANSPORT_ERROR:
           case REPOSITORY_MISSING:
@@ -563,15 +646,19 @@
                     ? RefUpdate.Result.NOT_ATTEMPTED
                     : RefUpdate.Result.REJECTED_OTHER_REASON;
             postReplicationFailedEvent(fetchOp, trackingRefUpdate);
+            queueMetrics.incrementTaskFailed(this);
+
             if (fetchOp.setToRetry()) {
               postReplicationScheduledEvent(fetchOp);
               pool.schedule(fetchOp, config.getRetryDelay(), TimeUnit.MINUTES);
+              queueMetrics.incrementTaskRetrying(this);
             } else {
               fetchOp.canceledByReplication();
               pending.remove(uri);
               stateLog.error(
                   "Fetch from " + fetchOp.getURI() + " cancelled after maximum number of retries",
                   fetchOp.getStatesAsArray());
+              queueMetrics.incrementTaskCancelledMaxRetries(this);
             }
             break;
         }
@@ -794,13 +881,25 @@
       @SuppressWarnings("unused")
       ScheduledFuture<?> ignored =
           pool.schedule(
-              updateHeadFactory.create(this, apiURI, project, newHead), 0, TimeUnit.SECONDS);
+              queueMetrics.runWithMetrics(
+                  this, updateHeadFactory.create(this, apiURI, project, newHead)),
+              0,
+              TimeUnit.SECONDS);
+      queueMetrics.incrementTaskScheduled(this);
     } catch (URISyntaxException e) {
       logger.atSevere().withCause(e).log(
           "Could not schedule HEAD pull-replication for project %s", project.get());
     }
   }
 
+  long inflightTasksCount() {
+    return inFlight.size();
+  }
+
+  long pendingTasksCount() {
+    return pending.size();
+  }
+
   private static boolean matches(URIish uri, String urlMatch) {
     if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
       return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
index aff1266..0e840bd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
@@ -29,6 +29,7 @@
   static final int DEFAULT_MAX_CONNECTION_INACTIVITY_MS = 10000;
   static final int DEFAULT_CONNECTION_TIMEOUT_MS = 5000;
   static final int DEFAULT_CONNECTIONS_PER_ROUTE = 100;
+  static final int DEFAULT_DRAIN_SHUTDOWN_TIMEOUT_SECS = 300;
 
   private final int delay;
   private final int rescheduleDelay;
@@ -51,6 +52,7 @@
   private final int maxConnectionsPerRoute;
   private final int maxConnections;
   private final int maxRetries;
+  private final int shutDownDrainTimeout;
   private int slowLatencyThreshold;
   private boolean useCGitClient;
   private int refsBatchSize;
@@ -98,6 +100,16 @@
                 "slowLatencyThreshold",
                 DEFAULT_SLOW_LATENCY_THRESHOLD_SECS,
                 TimeUnit.SECONDS);
+
+    shutDownDrainTimeout =
+        (int)
+            ConfigUtil.getTimeUnit(
+                cfg,
+                "replication",
+                null,
+                "shutDownDrainTimeout",
+                DEFAULT_DRAIN_SHUTDOWN_TIMEOUT_SECS,
+                TimeUnit.SECONDS);
   }
 
   @Override
@@ -216,4 +228,8 @@
   public int getPushBatchSize() {
     return 0;
   }
+
+  public int getShutDownDrainTimeout() {
+    return shutDownDrainTimeout;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
index 53adaaa..8bd1d95 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
@@ -40,7 +40,7 @@
 
   private final Source.Factory sourceFactory;
   private volatile Map<String, Source> sources;
-  private boolean shuttingDown;
+  private final ShutdownState shutdownState;
   private final Provider<ReplicationQueue> replicationQueue;
 
   @Inject
@@ -49,9 +49,11 @@
       ConfigParser configParser,
       Source.Factory sourceFactory,
       EventBus eventBus,
-      Provider<ReplicationQueue> replicationQueue)
+      Provider<ReplicationQueue> replicationQueue,
+      ShutdownState shutdownState)
       throws ConfigInvalidException {
     this.sourceFactory = sourceFactory;
+    this.shutdownState = shutdownState;
     this.sources =
         allSources(sourceFactory, configParser.parseRemotes(replicationConfig.getConfig()));
     this.replicationQueue = replicationQueue;
@@ -78,7 +80,7 @@
 
   @Override
   public void startup(WorkQueue workQueue) {
-    shuttingDown = false;
+    shutdownState.setIsShuttingDown(false);
     for (Source cfg : sources.values()) {
       cfg.start(workQueue);
     }
@@ -99,9 +101,7 @@
    */
   @Override
   public int shutdown() {
-    synchronized (this) {
-      shuttingDown = true;
-    }
+    shutdownState.setIsShuttingDown(true);
 
     int discarded = 0;
     for (Source cfg : sources.values()) {
@@ -117,7 +117,7 @@
 
   @Subscribe
   public synchronized void onReload(List<RemoteConfiguration> sourceConfigurations) {
-    if (shuttingDown) {
+    if (shutdownState.isShuttingDown()) {
       logger.atWarning().log("Shutting down: configuration reload ignored");
       return;
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
index fabe3cd..9dfe26e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
@@ -27,7 +27,7 @@
 import java.io.IOException;
 import org.eclipse.jgit.transport.URIish;
 
-public class UpdateHeadTask implements Runnable {
+public class UpdateHeadTask implements Runnable, Completable {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private final FetchApiClient.Factory fetchClientFactory;
   private final Source source;
@@ -35,6 +35,7 @@
   private final Project.NameKey project;
   private final String newHead;
   private final int id;
+  private boolean succeeded;
 
   interface Factory {
     UpdateHeadTask create(Source source, URIish apiURI, Project.NameKey project, String newHead);
@@ -64,6 +65,7 @@
       if (!httpResult.isSuccessful()) {
         throw new IOException(httpResult.getMessage().orElse("Unknown"));
       }
+      succeeded = true;
       logger.atFine().log(
           "Successfully updated HEAD of project %s on remote %s",
           project.get(), apiURI.toASCIIString());
@@ -83,4 +85,9 @@
         "[%s] update-head %s at %s to %s",
         HexFormat.fromInt(id), project.get(), apiURI.toASCIIString(), newHead);
   }
+
+  @Override
+  public boolean hasSucceeded() {
+    return succeeded;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
index 6fda9d3..d06681b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
@@ -24,22 +24,26 @@
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
+import com.googlesource.gerrit.plugins.replication.pull.ShutdownState;
 import java.util.function.Consumer;
 
 public class EventsBrokerMessageConsumer implements Consumer<Event>, LifecycleListener {
 
   private final DynamicItem<BrokerApi> eventsBroker;
   private final StreamEventListener eventListener;
+  private final ShutdownState shutdownState;
   private final String eventsTopicName;
 
   @Inject
   public EventsBrokerMessageConsumer(
       DynamicItem<BrokerApi> eventsBroker,
       StreamEventListener eventListener,
+      ShutdownState shutdownState,
       @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName) {
 
     this.eventsBroker = eventsBroker;
     this.eventListener = eventListener;
+    this.shutdownState = shutdownState;
     this.eventsTopicName = eventsTopicName;
   }
 
@@ -47,6 +51,7 @@
   public void accept(Event event) {
     try {
       eventListener.fetchRefsForEvent(event);
+      if (shutdownState.isShuttingDown()) stop();
     } catch (AuthException | PermissionBackendException e) {
       throw new EventRejectedException(event, e);
     }
@@ -59,6 +64,7 @@
 
   @Override
   public void stop() {
+    shutdownState.setIsShuttingDown(true);
     eventsBroker.get().disconnect();
   }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 65b03b8..6287773 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -341,6 +341,30 @@
 
 	Default: 10000
 
+replication.shutDownDrainTimeout
+:   Maximum duration to wait for pending, retrying and executing fetch tasks to
+	complete after a request of plugin shutdown. Values should use common unit
+	suffixes to express their setting:
+
+	s, sec, second, seconds
+
+	m, min, minute, minutes
+
+	h, hr, hour, hours
+
+	d, day, days
+
+	w, week, weeks (1 week is treated as 7 days)
+
+	mon, month, months (1 month is treated as 30 days)
+
+	y, year, years (1 year is treated as 365 days)
+
+	If a unit suffix is not specified, seconds is assumed. If 0 is supplied, the maximum age
+	is infinite and items are never purged except when the cache is full.
+
+	Default: 5 minutes
+
 remote.NAME.url
 :	Address of the remote server to fetch from. Single URL can be
 	specified within a single remote block. A remote node can request
diff --git a/src/main/resources/Documentation/metrics.md b/src/main/resources/Documentation/metrics.md
new file mode 100644
index 0000000..6fd6cf2
--- /dev/null
+++ b/src/main/resources/Documentation/metrics.md
@@ -0,0 +1,90 @@
+Metrics
+=======
+
+The @PLUGIN@ plugin reports the status of its internal components
+to Gerrit metric system.
+
+The metrics are all prefixed with @PLUGIN@ and then divided in groups,
+depending on the component involved.
+
+### plugins/@PLUGIN@/events
+
+This prefix represents the component involved in the processing of incoming
+ref-update events from Gerrit or other event-broker sources.
+
+- `queued_before_startup` Counter of the number of events that have been received
+  when the plugin was still in its starting phase and not ready yet to process events.
+
+### plugins/@PLUGIN@/tasks/<metric>/<source>
+
+This prefix represents the tasks scheduling and execution system, also
+known as _replication queue_. The replication task represents one operation
+performed against a Git repository from a remote source. The task may involve
+one or more refs.
+
+The `<metric>` field can have one of the values described here below,
+while the `<source>` represent the replication source endpoint.
+
+> Bear in mind that the _replication queue_ is a lot more than a simple
+> queueing system, as it contains the logic for selecting, merging, retrying
+> and cancelling incoming tasks.
+
+- `scheduled`: (counter) number of tasks triggered and scheduled for
+  execution.
+
+- `started`: (counter) number of tasks started.
+
+- `rescheduled`: (counter) number of tasks re-scheduled for execution.
+
+- `cancelled`: (counter) number of tasks that were previously scheduled
+  and then cancelled before being executed.
+
+- `failed`: (counter) number of tasks executed but failed altogether or
+  partially. A total failure is when the entire operation returned an
+  error and none of the operations took place; a partial failure is when
+  some of the operations in the tasks succeeded but other failed.
+
+- `retrying`: (counter) number of tasks being retired for execution.
+
+- `not_scheduled`: (counter) number of tasks which have been discarded before
+  being executed, because redundant (duplicate of existing scheduled tasks)
+  or not needed anymore (e.g. ref updated already by other means or other
+  checks)
+
+- `completed`: (counter) number of tasks completed successfully.
+
+- `merged`: (counter) number of tasks not being executed because they had
+  been consolidated with an existing scheduled task.
+
+- `failed_max_retries`: (counter) number of tasks that have reached their maximum
+  retry count but never succeeded.
+
+### plugins/@PLUGIN@
+
+- `apply_object_latency`: (timer) execution time statistics for the
+  synchronous replication using the _apply-object_ REST-API.
+
+- `apply_object_end_2_end_latency`: (timer) execution time statistics
+  for the end-2-end replication from when the event is triggered in
+  Gerrit until the successful execution of the synchronous replication
+  using the _apply-object_ REST-API.
+
+- `apply_object_max_api_payload_reached`: (counter) number of times that
+  the apply-object REST-API did fallback to the fetch REST-API because
+  it reached its maximum payload to transfer.
+
+- `replication_latency`: (timer) execution time statistics for the
+  synchronous replication using a _git fetch_.
+
+- `replication_end_2_end_latency`: (timer) execution time statistics
+  for the end-2-end replication from when the event is triggered in
+  Gerrit until the successful execution of the _git fetch_ for the
+  associated refs.
+
+- `replication_delay`: (timer) interval from when the ref-update event
+  is triggered to the start of the _git fetch_ command.
+
+- `replication_retries`: (counter) number of times that a replication task
+  has been retried.
+
+
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
index 0756c5c..d5d4546 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
@@ -112,15 +112,27 @@
 
   @Test
   public void shouldIncludeTheTaskIndexInItsStringRepresentation() {
-    String expected = "[" + objectUnderTest.getTaskIdHex() + "] fetch " + URI_PATTERN;
+    objectUnderTest.addRefs(Set.of("refs/heads/foo", "refs/heads/bar"));
+    String expected =
+        "["
+            + objectUnderTest.getTaskIdHex()
+            + "] fetch "
+            + URI_PATTERN
+            + " [refs/heads/bar,refs/heads/foo]";
 
     assertThat(objectUnderTest.toString()).isEqualTo(expected);
   }
 
   @Test
   public void shouldIncludeTheRetryCountInItsStringRepresentationWhenATaskIsRetried() {
+    objectUnderTest.addRefs(Set.of("refs/heads/bar", "refs/heads/foo"));
     objectUnderTest.setToRetry();
-    String expected = "(retry 1) [" + objectUnderTest.getTaskIdHex() + "] fetch " + URI_PATTERN;
+    String expected =
+        "(retry 1) ["
+            + objectUnderTest.getTaskIdHex()
+            + "] fetch "
+            + URI_PATTERN
+            + " [refs/heads/bar,refs/heads/foo]";
 
     assertThat(objectUnderTest.toString()).isEqualTo(expected);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/InMemoryMetricMaker.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/InMemoryMetricMaker.java
new file mode 100644
index 0000000..bd85f4f
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/InMemoryMetricMaker.java
@@ -0,0 +1,103 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.metrics.Counter0;
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.DisabledMetricMaker;
+import com.google.gerrit.metrics.Field;
+import com.google.inject.Singleton;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** Exports no metrics but counters kept in memory for testing purposes */
+@Singleton
+public class InMemoryMetricMaker extends DisabledMetricMaker {
+
+  public static class InMemoryCounter0 extends Counter0 {
+    private AtomicLong counter = new AtomicLong();
+
+    @Override
+    public void remove() {}
+
+    @Override
+    public void incrementBy(long delta) {
+      counter.addAndGet(delta);
+    }
+
+    public long getValue() {
+      return counter.get();
+    }
+
+    @Override
+    public String toString() {
+      return counter.toString();
+    }
+  }
+
+  public static class InMemoryCounter1<F> extends Counter1<F> {
+    private ConcurrentHashMap<F, Long> countersMap = new ConcurrentHashMap<>();
+
+    @Override
+    public void remove() {}
+
+    @Override
+    public void incrementBy(F field, long delta) {
+      countersMap.compute(field, (f, counter) -> counter == null ? delta : counter + delta);
+    }
+
+    public Optional<Long> getValue(Object fieldValue) {
+      return Optional.ofNullable(countersMap.get(fieldValue));
+    }
+
+    @Override
+    public String toString() {
+      return countersMap.toString();
+    }
+  }
+
+  private ConcurrentHashMap<String, InMemoryCounter0> counters0Map = new ConcurrentHashMap<>();
+  private ConcurrentHashMap<String, InMemoryCounter1<?>> counters1Map = new ConcurrentHashMap<>();
+
+  @Override
+  public Counter0 newCounter(String name, Description desc) {
+    InMemoryCounter0 counter = new InMemoryCounter0();
+    counters0Map.put(name, counter);
+    return counter;
+  }
+
+  @Override
+  public <F> Counter1<F> newCounter(String name, Description desc, Field<F> field) {
+    InMemoryCounter1<F> counter = new InMemoryCounter1<>();
+    counters1Map.put(name, counter);
+    return counter;
+  }
+
+  public Optional<Long> counterValue(String name) {
+    return Optional.ofNullable(counters0Map.get(name)).map(InMemoryCounter0::getValue);
+  }
+
+  public <F> Optional<Long> counterValue(String name, F fieldValue) {
+    return Optional.ofNullable(counters1Map.get(name))
+        .flatMap(counter -> counter.getValue(fieldValue));
+  }
+
+  @Override
+  public String toString() {
+    return counters0Map.toString() + "\n" + counters1Map.toString();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
index be017d0..0ebd0ae 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
@@ -26,7 +26,8 @@
 @UseLocalDisk
 @TestPlugin(
     name = "pull-replication",
-    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+    sysModule =
+        "com.googlesource.gerrit.plugins.replication.pull.PullReplicationITAbstract$PullReplicationTestModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
 public class PullReplicationAsyncIT extends PullReplicationITAbstract {
   @Inject private SitePaths sitePaths;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
index 4c00bae..29bf7e4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
@@ -58,7 +58,8 @@
 @UseLocalDisk
 @TestPlugin(
     name = "pull-replication",
-    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+    sysModule =
+        "com.googlesource.gerrit.plugins.replication.pull.PullReplicationITAbstract$PullReplicationTestModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
 public class PullReplicationIT extends PullReplicationSetupBase {
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
index ba812e2..58693bc 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.replication.pull;
 
 import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.truth.Truth8.assertThat;
 import static com.google.gerrit.acceptance.GitUtil.fetch;
 import static com.google.gerrit.acceptance.GitUtil.pushOne;
 import static com.google.gerrit.acceptance.testsuite.project.TestProjectUpdate.allow;
@@ -31,6 +32,8 @@
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
 import java.io.IOException;
 import java.util.Collection;
@@ -52,6 +55,13 @@
 /** Base class to run regular and async acceptance tests */
 public abstract class PullReplicationITAbstract extends PullReplicationSetupBase {
 
+  public static class PullReplicationTestModule extends PullReplicationModule {
+    @Inject
+    public PullReplicationTestModule(SitePaths site, InMemoryMetricMaker memMetric) {
+      super(site, memMetric);
+    }
+  }
+
   @Override
   protected void setReplicationSource(
       String remoteName, List<String> replicaSuffixes, Optional<String> project)
@@ -91,6 +101,7 @@
             sourceCommit.getId().getName(),
             TEST_REPLICATION_REMOTE);
     pullReplicationQueue.onEvent(event);
+    waitUntilReplicationCompleted(1);
 
     try (Repository repo = repoManager.openRepository(project)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -101,6 +112,13 @@
     }
   }
 
+  private void assertTasksMetricScheduledAndCompleted(int numTasks) {
+    assertTasksMetric("scheduled", numTasks);
+    assertTasksMetric("started", numTasks);
+    assertTasksMetric("completed", numTasks);
+    assertEmptyTasksMetric("failed");
+  }
+
   @Test
   @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewBranch() throws Exception {
@@ -124,6 +142,7 @@
             branchRevision,
             TEST_REPLICATION_REMOTE);
     pullReplicationQueue.onEvent(event);
+    waitUntilReplicationCompleted(1);
 
     try (Repository repo = repoManager.openRepository(project);
         Repository sourceRepo = repoManager.openRepository(project)) {
@@ -133,6 +152,8 @@
       assertThat(targetBranchRef).isNotNull();
       assertThat(targetBranchRef.getObjectId().getName()).isEqualTo(branchRevision);
     }
+
+    assertTasksMetricScheduledAndCompleted(1);
   }
 
   @Test
@@ -167,6 +188,7 @@
             branchRevision,
             TEST_REPLICATION_REMOTE);
     pullReplicationQueue.onEvent(event);
+    waitUntilReplicationCompleted(1);
 
     try (Repository repo = repoManager.openRepository(project)) {
       waitUntil(() -> checkedGetRef(repo, newBranch) != null);
@@ -193,6 +215,7 @@
             amendedCommit.getId().getName(),
             TEST_REPLICATION_REMOTE);
     pullReplicationQueue.onEvent(forcedPushEvent);
+    waitUntilReplicationCompleted(2);
 
     try (Repository repo = repoManager.openRepository(project);
         Repository sourceRepo = repoManager.openRepository(project)) {
@@ -204,6 +227,8 @@
                       .getName()
                       .equals(amendedCommit.getId().getName()));
     }
+
+    assertTasksMetricScheduledAndCompleted(2);
   }
 
   @Test
@@ -232,6 +257,7 @@
             sourceCommit.getId().getName(),
             TEST_REPLICATION_REMOTE);
     pullReplicationQueue.onEvent(event);
+    waitUntilReplicationCompleted(1);
 
     try (Repository repo = repoManager.openRepository(project)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -240,6 +266,8 @@
       assertThat(targetBranchRef).isNotNull();
       assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
     }
+
+    assertTasksMetricScheduledAndCompleted(1);
   }
 
   @Test
@@ -273,6 +301,7 @@
             branchRevision,
             TEST_REPLICATION_REMOTE);
     pullReplicationQueue.onEvent(event);
+    waitUntilReplicationCompleted(1);
 
     try (Repository repo = repoManager.openRepository(project);
         Repository sourceRepo = repoManager.openRepository(project)) {
@@ -282,6 +311,8 @@
       assertThat(targetBranchRef).isNotNull();
       assertThat(targetBranchRef.getObjectId().getName()).isEqualTo(branchRevision);
     }
+
+    assertTasksMetricScheduledAndCompleted(1);
   }
 
   @Test
@@ -309,8 +340,11 @@
     for (ProjectDeletedListener l : deletedListeners) {
       l.onProjectDeleted(event);
     }
+    waitUntilReplicationCompleted(1);
 
     waitUntil(() -> !repoManager.list().contains(project));
+
+    assertTasksMetricScheduledAndCompleted(1);
   }
 
   @Test
@@ -335,6 +369,7 @@
 
     HeadUpdatedListener.Event event = new FakeHeadUpdateEvent(master, newBranch, testProjectName);
     pullReplicationQueue.onHeadUpdated(event);
+    waitUntilReplicationCompleted(1);
 
     waitUntil(
         () -> {
@@ -344,6 +379,8 @@
             return false;
           }
         });
+
+    assertTasksMetricScheduledAndCompleted(1);
   }
 
   @Ignore
@@ -365,6 +402,7 @@
             sourceCommit.getId().getName(),
             TEST_REPLICATION_REMOTE);
     pullReplicationQueue.onEvent(event);
+    waitUntilReplicationCompleted(1);
 
     try (Repository repo = repoManager.openRepository(project)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -374,4 +412,27 @@
       assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
     }
   }
+
+  private void waitUntilReplicationCompleted(int expected) throws InterruptedException {
+    waitUntil(
+        () ->
+            inMemoryMetrics()
+                .counterValue("tasks/completed", TEST_REPLICATION_REMOTE)
+                .filter(counter -> counter == expected)
+                .isPresent());
+  }
+
+  private InMemoryMetricMaker inMemoryMetrics() {
+    return getInstance(InMemoryMetricMaker.class);
+  }
+
+  private void assertTasksMetric(String taskMetric, long value) {
+    assertThat(inMemoryMetrics().counterValue("tasks/" + taskMetric, TEST_REPLICATION_REMOTE))
+        .hasValue(value);
+  }
+
+  private void assertEmptyTasksMetric(String taskMetric) {
+    assertThat(inMemoryMetrics().counterValue("tasks/" + taskMetric, TEST_REPLICATION_REMOTE))
+        .isEmpty();
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
index 9b7e8c1..e40b90c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -98,6 +98,8 @@
   @Mock ApplyObjectsRefsFilter applyObjectsRefsFilter;
   ApplyObjectMetrics applyObjectMetrics;
   FetchReplicationMetrics fetchMetrics;
+  ReplicationQueueMetrics queueMetrics;
+  ShutdownState shutdownState;
 
   @Captor ArgumentCaptor<String> stringCaptor;
   @Captor ArgumentCaptor<Project.NameKey> projectNameKeyCaptor;
@@ -157,6 +159,8 @@
 
     applyObjectMetrics = new ApplyObjectMetrics("pull-replication", new DisabledMetricMaker());
     fetchMetrics = new FetchReplicationMetrics("pull-replication", new DisabledMetricMaker());
+    queueMetrics = new ReplicationQueueMetrics("pull-replication", new DisabledMetricMaker());
+    shutdownState = new ShutdownState();
 
     objectUnderTest =
         new ReplicationQueue(
@@ -169,8 +173,10 @@
             () -> revReader,
             applyObjectMetrics,
             fetchMetrics,
+            queueMetrics,
             LOCAL_INSTANCE_ID,
-            applyObjectsRefsFilter);
+            applyObjectsRefsFilter,
+            shutdownState);
   }
 
   @Test
@@ -345,8 +351,10 @@
             () -> revReader,
             applyObjectMetrics,
             fetchMetrics,
+            queueMetrics,
             LOCAL_INSTANCE_ID,
-            applyObjectsRefsFilter);
+            applyObjectsRefsFilter,
+            shutdownState);
     Event event = new TestEvent("refs/multi-site/version");
     objectUnderTest.onEvent(event);
 
@@ -354,6 +362,12 @@
   }
 
   @Test
+  public void shouldSetShutdownStateWhenStopping() throws IOException {
+    objectUnderTest.stop();
+    assertThat(shutdownState.isShuttingDown()).isTrue();
+  }
+
+  @Test
   public void shouldSkipEventWhenStarredChangesRef() {
     Event event = new TestEvent("refs/starred-changes/41/2941/1000000");
     objectUnderTest.onEvent(event);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
index 394a8a6..9640b82 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
@@ -18,12 +18,16 @@
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.server.events.RefUpdatedEvent;
 import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.replication.pull.ShutdownState;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -34,13 +38,18 @@
 public class EventsBrokerMessageConsumerTest {
 
   @Mock private StreamEventListener eventListener;
-  @Mock DynamicItem<BrokerApi> eventsBroker;
+  @Mock DynamicItem<BrokerApi> eventsBrokerDynamicItem;
+  @Mock BrokerApi eventsBroker;
 
   EventsBrokerMessageConsumer objectUnderTest;
+  ShutdownState shutdownState;
 
   @Before
   public void setup() {
-    objectUnderTest = new EventsBrokerMessageConsumer(eventsBroker, eventListener, "topicName");
+    shutdownState = new ShutdownState();
+    objectUnderTest =
+        new EventsBrokerMessageConsumer(
+            eventsBrokerDynamicItem, eventListener, shutdownState, "topicName");
   }
 
   @Test
@@ -63,4 +72,16 @@
     doNothing().when(eventListener).fetchRefsForEvent(any());
     objectUnderTest.accept(new RefUpdatedEvent());
   }
+
+  @Test
+  public void shouldStillAcceptLastEventDuringShutdownAndThenDisconnect()
+      throws AuthException, PermissionBackendException {
+    doNothing().when(eventListener).fetchRefsForEvent(any());
+    when(eventsBrokerDynamicItem.get()).thenReturn(eventsBroker);
+
+    shutdownState.setIsShuttingDown(true);
+
+    objectUnderTest.accept(new RefUpdatedEvent());
+    verify(eventsBroker, times(1)).disconnect();
+  }
 }