Merge branch 'stable-3.4' into 'stable-3.5'

* stable-3.4:
  Do not retry replication when local repository not found
  Ensure states are updated for canceled replication tasks

Change-Id: Ic0e523c7c92222b49916aed7067836d4e6d2f6e7
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 00a46de..ac4b671 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -458,6 +458,7 @@
     synchronized (stateLock) {
       URIish uri = pushOp.getURI();
       pending.remove(uri);
+      pushOp.notifyNotAttempted(pushOp.getRefs());
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 8b3c9e2..8c53028 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -432,8 +432,12 @@
           "Replication to %s completed in %dms, %dms delay, %d retries",
           uri, elapsed, delay, retryCount);
     } catch (RepositoryNotFoundException e) {
+      retryDone();
       stateLog.error(
-          "Cannot replicate " + projectName + "; Local repository error: " + e.getMessage(),
+          "Cannot replicate "
+              + projectName
+              + "; Local repository does not exist: "
+              + e.getMessage(),
           getStatesAsArray());
 
     } catch (RemoteRepositoryException e) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index bb3e886..f9d15f2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.google.common.truth.Truth.assertThat;
 import static org.eclipse.jgit.lib.Ref.Storage.NEW;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -49,6 +50,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.RepositoryNotFoundException;
 import org.eclipse.jgit.errors.TransportException;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.lib.ObjectId;
@@ -261,6 +263,17 @@
     verify(transportMock, times(1)).push(any(), any());
   }
 
+  @Test
+  public void shouldNotKeepRetryingWhenRepositoryNotFound() throws Exception {
+    when(gitRepositoryManagerMock.openRepository(projectNameKey))
+        .thenThrow(new RepositoryNotFoundException("not found"));
+    PushOne pushOne = createPushOne(null);
+    pushOne.addRef(PushOne.ALL_REFS);
+    pushOne.setToRetry();
+    pushOne.run();
+    assertThat(pushOne.isRetrying()).isFalse();
+  }
+
   private void replicateTwoRefs(PushOne pushOne) throws InterruptedException {
     ObjectIdRef barLocalRef =
         new ObjectIdRef.Unpeeled(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 33bd91d..eb2b999 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -27,10 +27,19 @@
 import com.google.gerrit.extensions.common.ProjectInfo;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
@@ -217,6 +226,75 @@
   }
 
   @Test
+  public void pushAllWait() throws Exception {
+    createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    ReplicationState state = new ReplicationState(NO_OP);
+
+    Future<?> future =
+        plugin
+            .getSysInjector()
+            .getInstance(PushAll.Factory.class)
+            .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false)
+            .schedule(0, TimeUnit.SECONDS);
+
+    future.get();
+    state.waitForReplication();
+  }
+
+  @Test
+  public void pushAllWaitCancelNotRunningTask() throws Exception {
+    createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    ReplicationState state = new ReplicationState(NO_OP);
+
+    Future<?> future =
+        plugin
+            .getSysInjector()
+            .getInstance(PushAll.Factory.class)
+            .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false)
+            .schedule(0, TimeUnit.SECONDS);
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Executor service = Executors.newSingleThreadExecutor();
+    service.execute(
+        new Runnable() {
+          @Override
+          public void run() {
+            try {
+              future.get();
+              state.waitForReplication();
+              latch.countDown();
+            } catch (Exception e) {
+              // fails the test because we don't countDown
+            }
+          }
+        });
+
+    // Cancel the replication task
+    waitUntil(() -> getProjectTasks().size() != 0);
+    WorkQueue.Task<?> task = getProjectTasks().get(0);
+    assertThat(task.getState()).isAnyOf(WorkQueue.Task.State.READY, WorkQueue.Task.State.SLEEPING);
+    task.cancel(false);
+
+    // Confirm our waiting thread completed
+    boolean receivedSignal = latch.await(5, TimeUnit.SECONDS); // FIXME Choose a good timeout
+    assertThat(receivedSignal).isTrue();
+  }
+
+  private List<WorkQueue.Task<?>> getProjectTasks() {
+    return getInstance(WorkQueue.class).getTasks().stream()
+        .filter(t -> t instanceof WorkQueue.ProjectTask)
+        .collect(Collectors.toList());
+  }
+
+  @Test
   public void shouldReplicateHeadUpdate() throws Exception {
     setReplicationDestination("foo", "replica", ALL_PROJECTS);
     reloadConfig();