Merge branch 'stable-2.16' into stable-3.0

* stable-2.16:
  ReplicationIT: speedup drain replication queue tests
  ReplicationIT: Remove unused variables in tests
  Reformat with GJF
  Option to drain the event queue before shutdown

Change-Id: I02a30f057aba348f2977c12b669c17a560d710b1
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 2a329e2..679776f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -723,6 +723,14 @@
     return config.getMaxRetries();
   }
 
+  public int getDrainQueueAttempts() {
+    return config.getDrainQueueAttempts();
+  }
+
+  public int getReplicationDelaySeconds() {
+    return config.getDelay() * 1000;
+  }
+
   private static boolean matches(URIish uri, String urlMatch) {
     if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
       return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index b2d0de2..f688cfc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -22,10 +22,12 @@
 public class DestinationConfiguration {
   static final int DEFAULT_REPLICATION_DELAY = 15;
   static final int DEFAULT_RESCHEDULE_DELAY = 3;
+  static final int DEFAULT_DRAIN_QUEUE_ATTEMPTS = 0;
 
   private final int delay;
   private final int rescheduleDelay;
   private final int retryDelay;
+  private final int drainQueueAttempts;
   private final int lockErrorMaxRetries;
   private final ImmutableList<String> adminUrls;
   private final int poolThreads;
@@ -50,6 +52,8 @@
     projects = ImmutableList.copyOf(cfg.getStringList("remote", name, "projects"));
     adminUrls = ImmutableList.copyOf(cfg.getStringList("remote", name, "adminUrl"));
     retryDelay = Math.max(0, getInt(remoteConfig, cfg, "replicationretry", 1));
+    drainQueueAttempts =
+        Math.max(0, getInt(remoteConfig, cfg, "drainQueueAttempts", DEFAULT_DRAIN_QUEUE_ATTEMPTS));
     poolThreads = Math.max(0, getInt(remoteConfig, cfg, "threads", 1));
     authGroupNames = ImmutableList.copyOf(cfg.getStringList("remote", name, "authGroup"));
     lockErrorMaxRetries = cfg.getInt("replication", "lockErrorMaxRetries", 0);
@@ -77,6 +81,10 @@
     return retryDelay;
   }
 
+  public int getDrainQueueAttempts() {
+    return drainQueueAttempts;
+  }
+
   public int getPoolThreads() {
     return poolThreads;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index a075f12..ea6d361 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -311,11 +311,52 @@
   public int shutdown() {
     int discarded = 0;
     for (Destination cfg : destinations) {
-      discarded += cfg.shutdown();
+      try {
+        drainReplicationEvents(cfg);
+      } catch (EventQueueNotEmptyException e) {
+        logger.atWarning().log("Event queue not empty: %s", e.getMessage());
+      } finally {
+        discarded += cfg.shutdown();
+      }
     }
     return discarded;
   }
 
+  void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
+    int drainQueueAttempts = destination.getDrainQueueAttempts();
+    if (drainQueueAttempts == 0) {
+      return;
+    }
+    int pending = destination.getQueueInfo().pending.size();
+    int inFlight = destination.getQueueInfo().inFlight.size();
+
+    while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
+      try {
+        logger.atInfo().log(
+            "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
+            inFlight, pending);
+        Thread.sleep(destination.getReplicationDelaySeconds());
+      } catch (InterruptedException ie) {
+        logger.atWarning().withCause(ie).log(
+            "Wait for replication events to drain has been interrupted");
+      }
+      pending = destination.getQueueInfo().pending.size();
+      inFlight = destination.getQueueInfo().inFlight.size();
+      drainQueueAttempts--;
+    }
+
+    if (pending > 0 || inFlight > 0) {
+      throw new EventQueueNotEmptyException(
+          String.format("Pending: %d - InFlight: %d", pending, inFlight));
+    }
+  }
+
+  public static class EventQueueNotEmptyException extends Exception {
+    public EventQueueNotEmptyException(String errorMessage) {
+      super(errorMessage);
+    }
+  }
+
   FileBasedConfig getConfig() {
     return config;
   }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index f58845d..32bc630 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -297,6 +297,20 @@
 
 	By default, use replication.maxRetries.
 
+remote.NAME.drainQueueAttempts
+:	Maximum number of attempts to drain the replication event queue before
+	stopping the plugin.
+
+	When stopping the plugin, the shutdown will be delayed trying to drain
+	the event queue.
+
+	The maximum delay is "drainQueueAttempts" * "replicationDelay" seconds.
+
+	When not set or set to 0, the queue is not drained and the pending
+	replication events are cancelled.
+
+	By default, do not drain replication events.
+
 remote.NAME.threads
 :	Number of worker threads to dedicate to pushing to the
 	repositories described by this remote.  Each thread can push
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 31aab02..03d9295 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -245,6 +245,52 @@
     }
   }
 
+  @Test
+  public void shouldNotDrainTheQueueWhenReloading() throws Exception {
+    // Setup repo to replicate
+    Project.NameKey targetProject =
+        projectOperations.newProject().name(project + "replica").create();
+    String remoteName = "doNotDrainQueue";
+    setReplicationDestination(remoteName, "replica", ALL_PROJECTS);
+
+    Result pushResult = createChange();
+    shutdownConfig();
+
+    pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().getRefName();
+
+    exception.expect(InterruptedException.class);
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+    }
+  }
+
+  @Test
+  public void shouldDrainTheQueueWhenReloading() throws Exception {
+    // Setup repo to replicate
+    Project.NameKey targetProject =
+        projectOperations.newProject().name(project + "replica").create();
+    String remoteName = "drainQueue";
+    setReplicationDestination(remoteName, "replica", ALL_PROJECTS);
+
+    config.setInt("remote", remoteName, "drainQueueAttempts", 2);
+    config.save();
+    reloadConfig();
+
+    Result pushResult = createChange();
+    shutdownConfig();
+
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().getRefName();
+
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
   private Ref getRef(Repository repo, String branchName) throws IOException {
     return repo.getRefDatabase().exactRef(branchName);
   }
@@ -285,6 +331,10 @@
     plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).forceReload();
   }
 
+  private void shutdownConfig() {
+    plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).shutdown();
+  }
+
   private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
     Pattern refmaskPattern = Pattern.compile(refRegex);
     return tasksStorage.list().stream()