Fix issue with task cleanup after retry

Destination.notifyFinished method calls finish on
ReplicationTasksStorage.Task objects which are not scheduled for retry.

The issue is that for rescheduled tasks PushOne.isRetrying
will always returns true even if task is already replicated.
That creates a situation where tasks scheduled for retry are
never cleaned up.

Bug: Issue 12754
Change-Id: I4b10c2752da6aa7444f57c3ce4ab70eb00c3f14e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index f2a077e..45a9201 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -225,6 +225,10 @@
     return maxRetries == 0 || retryCount <= maxRetries;
   }
 
+  private void retryDone() {
+    this.retrying = false;
+  }
+
   void canceledByReplication() {
     canceled = true;
   }
@@ -353,6 +357,7 @@
       git = gitManager.openRepository(projectName);
       runImpl();
       long elapsed = NANOSECONDS.toMillis(context.stop());
+      retryDone();
       repLog.info(
           "Replication to {} completed in {}ms, {}ms delay, {} retries",
           uri,
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
index 8fb0a19..29908b5 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -51,8 +51,13 @@
   protected static final int TEST_REPLICATION_DELAY_SECONDS = 1;
   protected static final int TEST_REPLICATION_RETRY_MINUTES = 1;
   protected static final int TEST_PUSH_TIME_SECONDS = 1;
+  protected static final int TEST_PROJECT_CREATION_SECONDS = 10;
   protected static final Duration TEST_PUSH_TIMEOUT =
       Duration.ofSeconds(TEST_REPLICATION_DELAY_SECONDS + TEST_PUSH_TIME_SECONDS);
+  protected static final Duration TEST_NEW_PROJECT_TIMEOUT =
+      Duration.ofSeconds(
+          (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
+              + TEST_PROJECT_CREATION_SECONDS);
 
   @Inject protected SitePaths sitePaths;
   protected Path gitPath;
@@ -153,4 +158,12 @@
   protected void reloadConfig() {
     plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).forceReload();
   }
+
+  protected boolean nonEmptyProjectExists(Project.NameKey name) {
+    try (Repository r = repoManager.openRepository(name)) {
+      return !r.getAllRefsByPeeledObjectId().isEmpty();
+    } catch (Exception e) {
+      return false;
+    }
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 706a2c6..a6ccbec 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -45,16 +45,10 @@
     name = "replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
 public class ReplicationIT extends ReplicationDaemon {
-  private static final int TEST_PROJECT_CREATION_SECONDS = 10;
   private static final Duration TEST_TIMEOUT =
       Duration.ofSeconds(
           (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60) + 1);
 
-  private static final Duration TEST_NEW_PROJECT_TIMEOUT =
-      Duration.ofSeconds(
-          (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
-              + TEST_PROJECT_CREATION_SECONDS);
-
   @Inject private DynamicSet<ProjectDeletedListener> deletedListeners;
 
   @Test
@@ -311,14 +305,6 @@
     plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).shutdown();
   }
 
-  private boolean nonEmptyProjectExists(Project.NameKey name) {
-    try (Repository r = repoManager.openRepository(name)) {
-      return !r.getAllRefsByPeeledObjectId().isEmpty();
-    } catch (Exception e) {
-      return false;
-    }
-  }
-
   private ObjectId createNewBranchWithoutPush(String fromBranch, String newBranch)
       throws Exception {
     try (Repository repo = repoManager.openRepository(project);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
index db0f7ee..9392f0d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -22,6 +22,7 @@
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.reviewdb.client.Project;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -41,6 +42,10 @@
     name = "replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
 public class ReplicationStorageIT extends ReplicationDaemon {
+  protected static final int TEST_TASK_FINISH_SECONDS = 1;
+  protected static final int TEST_REPLICATION_MAX_RETRIES = 1;
+  protected static final Duration TEST_TASK_FINISH_TIMEOUT =
+      Duration.ofSeconds(TEST_TASK_FINISH_SECONDS);
   private ReplicationTasksStorage tasksStorage;
 
   @Override
@@ -214,6 +219,21 @@
     }
   }
 
+  @Test
+  public void shouldCleanupTasksAfterNewProjectReplication() throws Exception {
+    setReplicationDestination("task_cleanup_project", "replica", ALL_PROJECTS);
+    config.setInt("remote", "task_cleanup_project", "replicationRetry", 0);
+    config.save();
+    reloadConfig();
+    assertThat(tasksStorage.list()).hasSize(0);
+    Project.NameKey sourceProject = createTestProject("task_cleanup_project");
+
+    WaitUtil.waitUntil(
+        () -> nonEmptyProjectExists(new Project.NameKey(sourceProject + "replica.git")),
+        TEST_NEW_PROJECT_TIMEOUT);
+    WaitUtil.waitUntil(() -> tasksStorage.list().size() == 0, TEST_TASK_FINISH_TIMEOUT);
+  }
+
   private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
       String changeRef, String remote) {
     return tasksStorage.list().stream()