Ensure states are updated for canceled replication tasks

When using 'replication start --wait' we need
ReplicationState.waitForReplication() to return when the task we're
waiting on has been canceled, either through an admin action or because
the replication distributor determined another node already completed
it.

Add a couple tests for PushAll that confirm this behavior was previously
broken and is fixed now.

Change-Id: I36320ae079af5d7673e05d20ddc94b42a9b04347
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 8ef21d0..baf0328 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -459,6 +459,7 @@
     synchronized (stateLock) {
       URIish uri = pushOp.getURI();
       pending.remove(uri);
+      pushOp.notifyNotAttempted(pushOp.getRefs());
     }
   }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index a174e91..17c8933 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -27,10 +27,19 @@
 import com.google.gerrit.extensions.common.ProjectInfo;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
@@ -217,6 +226,75 @@
   }
 
   @Test
+  public void pushAllWait() throws Exception {
+    createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    ReplicationState state = new ReplicationState(NO_OP);
+
+    Future<?> future =
+        plugin
+            .getSysInjector()
+            .getInstance(PushAll.Factory.class)
+            .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false)
+            .schedule(0, TimeUnit.SECONDS);
+
+    future.get();
+    state.waitForReplication();
+  }
+
+  @Test
+  public void pushAllWaitCancelNotRunningTask() throws Exception {
+    createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    ReplicationState state = new ReplicationState(NO_OP);
+
+    Future<?> future =
+        plugin
+            .getSysInjector()
+            .getInstance(PushAll.Factory.class)
+            .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false)
+            .schedule(0, TimeUnit.SECONDS);
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Executor service = Executors.newSingleThreadExecutor();
+    service.execute(
+        new Runnable() {
+          @Override
+          public void run() {
+            try {
+              future.get();
+              state.waitForReplication();
+              latch.countDown();
+            } catch (Exception e) {
+              // fails the test because we don't countDown
+            }
+          }
+        });
+
+    // Cancel the replication task
+    waitUntil(() -> getProjectTasks().size() != 0);
+    WorkQueue.Task<?> task = getProjectTasks().get(0);
+    assertThat(task.getState()).isAnyOf(WorkQueue.Task.State.READY, WorkQueue.Task.State.SLEEPING);
+    task.cancel(false);
+
+    // Confirm our waiting thread completed
+    boolean receivedSignal = latch.await(5, TimeUnit.SECONDS); // FIXME Choose a good timeout
+    assertThat(receivedSignal).isTrue();
+  }
+
+  private List<WorkQueue.Task<?>> getProjectTasks() {
+    return getInstance(WorkQueue.class).getTasks().stream()
+        .filter(t -> t instanceof WorkQueue.ProjectTask)
+        .collect(Collectors.toList());
+  }
+
+  @Test
   public void shouldReplicateHeadUpdate() throws Exception {
     setReplicationDestination("foo", "replica", ALL_PROJECTS);
     reloadConfig();