Merge branch 'stable-2.15' into stable-2.16

* stable-2.15:
  Cancel pending replications upon shutdown
  Remove replication event from pending when runway is allowed

Change-Id: I81209708a42832cbe2be818e2dd50f625362b392
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index bff96ac..02304f3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -70,6 +70,7 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import org.apache.commons.io.FilenameUtils;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.Ref;
@@ -223,12 +224,34 @@
   public int shutdown() {
     int cnt = 0;
     if (pool != null) {
+      repLog.warn("Cancelling replication events");
+
+      foreachPushOp(
+          pending,
+          push -> {
+            push.cancel();
+            return null;
+          });
+      pending.clear();
+      foreachPushOp(
+          inFlight,
+          push -> {
+            push.setCanceledWhileRunning();
+            return null;
+          });
+      inFlight.clear();
       cnt = pool.shutdownNow().size();
       pool = null;
     }
     return cnt;
   }
 
+  private void foreachPushOp(Map<URIish, PushOne> opsMap, Function<PushOne, Void> pushOneFunction) {
+    for (PushOne pushOne : ImmutableList.copyOf(opsMap.values())) {
+      pushOneFunction.apply(pushOne);
+    }
+  }
+
   private boolean shouldReplicate(ProjectState state, CurrentUser user)
       throws PermissionBackendException {
     if (!config.replicateHiddenProjects()
@@ -341,7 +364,7 @@
     if (!config.replicatePermissions()) {
       PushOne e;
       synchronized (stateLock) {
-        e = pending.get(uri);
+        e = getPendingPush(uri);
       }
       if (e == null) {
         try (Repository git = gitManager.openRepository(project)) {
@@ -364,7 +387,7 @@
     }
 
     synchronized (stateLock) {
-      PushOne e = pending.get(uri);
+      PushOne e = getPendingPush(uri);
       if (e == null) {
         e = opFactory.create(project, uri);
         addRef(e, ref);
@@ -380,6 +403,14 @@
     }
   }
 
+  private PushOne getPendingPush(URIish uri) {
+    PushOne e = pending.get(uri);
+    if (e != null && !e.wasCanceled()) {
+      return e;
+    }
+    return null;
+  }
+
   void pushWasCanceled(PushOne pushOp) {
     synchronized (stateLock) {
       URIish uri = pushOp.getURI();
@@ -416,7 +447,7 @@
   void reschedule(PushOne pushOp, RetryReason reason) {
     synchronized (stateLock) {
       URIish uri = pushOp.getURI();
-      PushOne pendingPushOp = pending.get(uri);
+      PushOne pendingPushOp = getPendingPush(uri);
 
       if (pendingPushOp != null) {
         // There is one PushOp instance already pending to same URI.
@@ -493,11 +524,11 @@
       if (op.wasCanceled()) {
         return RunwayStatus.canceled();
       }
-      pending.remove(op.getURI());
       PushOne inFlightOp = inFlight.get(op.getURI());
       if (inFlightOp != null) {
         return RunwayStatus.denied(inFlightOp.getId());
       }
+      pending.remove(op.getURI());
       inFlight.put(op.getURI(), op);
     }
     return RunwayStatus.allowed();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index d02482e..88a301b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -151,14 +151,17 @@
 
   @Override
   public void cancel() {
-    repLog.info("Replication {} was canceled", getURI());
+    repLog.info("Replication [{}] to {} was canceled", HexFormat.fromInt(id), getURI());
     canceledByReplication();
     pool.pushWasCanceled(this);
   }
 
   @Override
   public void setCanceledWhileRunning() {
-    repLog.info("Replication {} was canceled while being executed", getURI());
+    repLog.info(
+        "Replication [{}] to {} was canceled while being executed",
+        HexFormat.fromInt(id),
+        getURI());
     canceledWhileRunning.set(true);
   }
 
@@ -202,7 +205,7 @@
   }
 
   boolean wasCanceled() {
-    return canceled;
+    return canceled || canceledWhileRunning.get();
   }
 
   URIish getURI() {