Handle fetch tasks gracefully during shutdown

When the pull-replication receives a shutdown request, it currently
interrupts all fetch tasks that are in execution as well as
dropping the pending, scheduled and retrying ones.

This causes those fetch events to be lost and the Gerrit node to be
out-of-sync.

To shutdown gracefully two actions are required:
1. Stop getting new requests (via HTTP and/or via broker)
2. Allow pending, retrying and submitted tasks to terminate

Address point 1. by atomically setting a global shutdown state, so that
the EventsBrokerMessageConsumer can disconnect from the broker (and thus
stop consuming).

Note that the HTTP layer does not require any explicit handling, since
the HTTP servlets and filters exposed by the pull-replication plugin
will unregistered by Gerrit itself, so that no new requests can be
received over HTTP (the client will receive 404s or 403s depending on
the authentication method) as soon as the shutdown request is initiated.

Address point 2. by explicitly waiting until the pending, retrying and
executing fetch tasks are completed.
The maximum amount to wait for is configurable via replication.config
via the new configuration value: `replication.shutDownDrainTimeout`.

Bug: Issue 298084095
Change-Id: Ic672c7df5f52f6cdfcd6e0c8a8cd029c51a88ec5
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index fc77503..57612cb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -79,6 +79,7 @@
   private static final String REF_UDPATED_EVENT_TYPE = new RefUpdatedEvent().type;
   private static final String ZEROS_OBJECTID = ObjectId.zeroId().getName();
   private final ReplicationStateListener stateLog;
+  private final ShutdownState shutdownState;
 
   private final WorkQueue workQueue;
   private final DynamicItem<EventDispatcher> dispatcher;
@@ -107,11 +108,13 @@
       ApplyObjectMetrics applyObjectMetrics,
       FetchReplicationMetrics fetchMetrics,
       @GerritInstanceId String instanceId,
-      ApplyObjectsRefsFilter applyObjectsRefsFilter) {
+      ApplyObjectsRefsFilter applyObjectsRefsFilter,
+      ShutdownState shutdownState) {
     workQueue = wq;
     dispatcher = dis;
     sources = rd;
     stateLog = sl;
+    this.shutdownState = shutdownState;
     beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
     this.fetchClientFactory = fetchClientFactory;
     this.refsFilter = refsFilter;
@@ -141,6 +144,7 @@
   @Override
   public void stop() {
     running = false;
+    shutdownState.setIsShuttingDown(true);
     int discarded = sources.get().shutdown();
     if (discarded > 0) {
       repLog.warn("Canceled {} replication events during shutdown", discarded);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ShutdownState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ShutdownState.java
new file mode 100644
index 0000000..0f99a7f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ShutdownState.java
@@ -0,0 +1,41 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Singleton object representing this plugin shutdown state. This class can be injected any consumer
+ * interested in knowing if a shutdown was initiated.
+ */
+@Singleton
+public class ShutdownState {
+  private final AtomicBoolean isShuttingDown;
+
+  @Inject
+  public ShutdownState() {
+    isShuttingDown = new AtomicBoolean(false);
+  }
+
+  public void setIsShuttingDown(boolean shutting) {
+    isShuttingDown.set(shutting);
+  }
+
+  public boolean isShuttingDown() {
+    return isShuttingDown.get();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index c903656..82e2fea 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -17,7 +17,10 @@
 import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
 import static com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.resolveNodeName;
 import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.SYNC;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -73,6 +76,7 @@
 import java.io.UnsupportedEncodingException;
 import java.net.URISyntaxException;
 import java.net.URLEncoder;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -120,6 +124,8 @@
   private final DynamicItem<EventDispatcher> eventDispatcher;
   private CloseableHttpClient httpClient;
   private final DeleteProjectTask.Factory deleteProjectFactory;
+  private static final int DRAINED_CHECK_FREQUENCY_MS = 50;
+  private static final int DRAINED_LOGGING_FREQUENCY_SECS = 5;
 
   protected enum RetryReason {
     TRANSPORT_ERROR,
@@ -257,7 +263,13 @@
   public synchronized int shutdown() {
     int cnt = 0;
     if (pool != null) {
-      cnt = pool.shutdownNow().size();
+      try {
+        waitUntil(this::isDrained, Duration.ofSeconds(config.getShutDownDrainTimeout()));
+        cnt = pool.shutdownNow().size();
+      } catch (InterruptedException e) {
+        logger.atSevere().withCause(e).log("Interrupted during termination.");
+        cnt = pool.shutdownNow().size();
+      }
       pool = null;
     }
     if (httpClient != null) {
@@ -272,6 +284,34 @@
     return cnt;
   }
 
+  private boolean isDrained() {
+    int numberOfPending = pending.size();
+    int numberOfInFlight = inFlight.size();
+
+    boolean drained = numberOfPending == 0 && numberOfInFlight == 0;
+
+    if (!drained) {
+      logger.atWarning().atMostEvery(DRAINED_LOGGING_FREQUENCY_SECS, SECONDS).log(
+          String.format(
+              "Queue not drained: %d pending|%d in-flight", numberOfPending, numberOfInFlight));
+    } else {
+      logger.atWarning().log("Queue drained");
+    }
+
+    return drained;
+  }
+
+  private static void waitUntil(Supplier<Boolean> waitCondition, Duration timeout)
+      throws InterruptedException {
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    while (!waitCondition.get()) {
+      if (stopwatch.elapsed().compareTo(timeout) > 0) {
+        throw new InterruptedException();
+      }
+      MILLISECONDS.sleep(DRAINED_CHECK_FREQUENCY_MS);
+    }
+  }
+
   private boolean shouldReplicate(ProjectState state, CurrentUser user)
       throws PermissionBackendException {
     if (!config.replicateHiddenProjects()
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
index ab9c634..3cbe306 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
@@ -29,6 +29,7 @@
   static final int DEFAULT_MAX_CONNECTION_INACTIVITY_MS = 10000;
   static final int DEFAULT_CONNECTION_TIMEOUT_MS = 5000;
   static final int DEFAULT_CONNECTIONS_PER_ROUTE = 100;
+  static final int DEFAULT_DRAIN_SHUTDOWN_TIMEOUT_SECS = 300;
 
   private final int delay;
   private final int rescheduleDelay;
@@ -51,6 +52,7 @@
   private final int maxConnectionsPerRoute;
   private final int maxConnections;
   private final int maxRetries;
+  private final int shutDownDrainTimeout;
   private int slowLatencyThreshold;
   private boolean useCGitClient;
   private int refsBatchSize;
@@ -98,6 +100,16 @@
                 "slowLatencyThreshold",
                 DEFAULT_SLOW_LATENCY_THRESHOLD_SECS,
                 TimeUnit.SECONDS);
+
+    shutDownDrainTimeout =
+        (int)
+            ConfigUtil.getTimeUnit(
+                cfg,
+                "replication",
+                null,
+                "shutDownDrainTimeout",
+                DEFAULT_DRAIN_SHUTDOWN_TIMEOUT_SECS,
+                TimeUnit.SECONDS);
   }
 
   @Override
@@ -211,4 +223,8 @@
   public int getSlowLatencyThreshold() {
     return slowLatencyThreshold;
   }
+
+  public int getShutDownDrainTimeout() {
+    return shutDownDrainTimeout;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
index 6fda9d3..d06681b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
@@ -24,22 +24,26 @@
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
+import com.googlesource.gerrit.plugins.replication.pull.ShutdownState;
 import java.util.function.Consumer;
 
 public class EventsBrokerMessageConsumer implements Consumer<Event>, LifecycleListener {
 
   private final DynamicItem<BrokerApi> eventsBroker;
   private final StreamEventListener eventListener;
+  private final ShutdownState shutdownState;
   private final String eventsTopicName;
 
   @Inject
   public EventsBrokerMessageConsumer(
       DynamicItem<BrokerApi> eventsBroker,
       StreamEventListener eventListener,
+      ShutdownState shutdownState,
       @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName) {
 
     this.eventsBroker = eventsBroker;
     this.eventListener = eventListener;
+    this.shutdownState = shutdownState;
     this.eventsTopicName = eventsTopicName;
   }
 
@@ -47,6 +51,7 @@
   public void accept(Event event) {
     try {
       eventListener.fetchRefsForEvent(event);
+      if (shutdownState.isShuttingDown()) stop();
     } catch (AuthException | PermissionBackendException e) {
       throw new EventRejectedException(event, e);
     }
@@ -59,6 +64,7 @@
 
   @Override
   public void stop() {
+    shutdownState.setIsShuttingDown(true);
     eventsBroker.get().disconnect();
   }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 65b03b8..6287773 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -341,6 +341,30 @@
 
 	Default: 10000
 
+replication.shutDownDrainTimeout
+:   Maximum duration to wait for pending, retrying and executing fetch tasks to
+	complete after a request of plugin shutdown. Values should use common unit
+	suffixes to express their setting:
+
+	s, sec, second, seconds
+
+	m, min, minute, minutes
+
+	h, hr, hour, hours
+
+	d, day, days
+
+	w, week, weeks (1 week is treated as 7 days)
+
+	mon, month, months (1 month is treated as 30 days)
+
+	y, year, years (1 year is treated as 365 days)
+
+	If a unit suffix is not specified, seconds is assumed. If 0 is supplied, the maximum age
+	is infinite and items are never purged except when the cache is full.
+
+	Default: 5 minutes
+
 remote.NAME.url
 :	Address of the remote server to fetch from. Single URL can be
 	specified within a single remote block. A remote node can request
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
index d65322f..0a88ef3 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -98,6 +98,7 @@
   @Mock ApplyObjectsRefsFilter applyObjectsRefsFilter;
   ApplyObjectMetrics applyObjectMetrics;
   FetchReplicationMetrics fetchMetrics;
+  ShutdownState shutdownState;
 
   @Captor ArgumentCaptor<String> stringCaptor;
   @Captor ArgumentCaptor<Project.NameKey> projectNameKeyCaptor;
@@ -157,6 +158,7 @@
 
     applyObjectMetrics = new ApplyObjectMetrics("pull-replication", new DisabledMetricMaker());
     fetchMetrics = new FetchReplicationMetrics("pull-replication", new DisabledMetricMaker());
+    shutdownState = new ShutdownState();
 
     objectUnderTest =
         new ReplicationQueue(
@@ -170,7 +172,8 @@
             applyObjectMetrics,
             fetchMetrics,
             LOCAL_INSTANCE_ID,
-            applyObjectsRefsFilter);
+            applyObjectsRefsFilter,
+            shutdownState);
   }
 
   @Test
@@ -346,7 +349,8 @@
             applyObjectMetrics,
             fetchMetrics,
             LOCAL_INSTANCE_ID,
-            applyObjectsRefsFilter);
+            applyObjectsRefsFilter,
+            shutdownState);
     Event event = new TestEvent("refs/multi-site/version");
     objectUnderTest.onEvent(event);
 
@@ -354,6 +358,12 @@
   }
 
   @Test
+  public void shouldSetShutdownStateWhenStopping() throws IOException {
+    objectUnderTest.stop();
+    assertThat(shutdownState.isShuttingDown()).isTrue();
+  }
+
+  @Test
   public void shouldSkipEventWhenStarredChangesRef() {
     Event event = new TestEvent("refs/starred-changes/41/2941/1000000");
     objectUnderTest.onEvent(event);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
index 394a8a6..9640b82 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
@@ -18,12 +18,16 @@
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.server.events.RefUpdatedEvent;
 import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.replication.pull.ShutdownState;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -34,13 +38,18 @@
 public class EventsBrokerMessageConsumerTest {
 
   @Mock private StreamEventListener eventListener;
-  @Mock DynamicItem<BrokerApi> eventsBroker;
+  @Mock DynamicItem<BrokerApi> eventsBrokerDynamicItem;
+  @Mock BrokerApi eventsBroker;
 
   EventsBrokerMessageConsumer objectUnderTest;
+  ShutdownState shutdownState;
 
   @Before
   public void setup() {
-    objectUnderTest = new EventsBrokerMessageConsumer(eventsBroker, eventListener, "topicName");
+    shutdownState = new ShutdownState();
+    objectUnderTest =
+        new EventsBrokerMessageConsumer(
+            eventsBrokerDynamicItem, eventListener, shutdownState, "topicName");
   }
 
   @Test
@@ -63,4 +72,16 @@
     doNothing().when(eventListener).fetchRefsForEvent(any());
     objectUnderTest.accept(new RefUpdatedEvent());
   }
+
+  @Test
+  public void shouldStillAcceptLastEventDuringShutdownAndThenDisconnect()
+      throws AuthException, PermissionBackendException {
+    doNothing().when(eventListener).fetchRefsForEvent(any());
+    when(eventsBrokerDynamicItem.get()).thenReturn(eventsBroker);
+
+    shutdownState.setIsShuttingDown(true);
+
+    objectUnderTest.accept(new RefUpdatedEvent());
+    verify(eventsBroker, times(1)).disconnect();
+  }
 }