Merge "Avoid starting tasks which are not present under waiting storage area"
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 8d7227f..12f0273 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -583,7 +583,7 @@
       if (inFlightOp != null) {
         return RunwayStatus.denied(inFlightOp.getId());
       }
-      replicationTasksStorage.get().start(op);
+      op.setRefs(replicationTasksStorage.get().start(op));
       inFlight.put(op.getURI(), op);
     }
     return RunwayStatus.allowed();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index bdef2fa..565790c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -270,6 +270,12 @@
     }
   }
 
+  void setRefs(Set<String> refs) {
+    pushAllRefs = false;
+    delta.clear();
+    addRefs(refs);
+  }
+
   void addState(String ref, ReplicationState state) {
     stateMap.put(ref, state);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index d6cc8be..4736402 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -32,7 +32,9 @@
 import java.nio.file.NotDirectoryException;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
+import java.util.HashSet;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Stream;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.transport.URIish;
@@ -93,6 +95,11 @@
 
     public abstract String remote();
 
+    public String sha1() {
+      return ReplicationTasksStorage.sha1(project() + "\n" + ref() + "\n" + uri() + "\n" + remote())
+          .name();
+    }
+
     @Override
     public final String toString() {
       return "ref-update " + project() + ":" + ref() + " uri:" + uri() + " remote:" + remote();
@@ -127,10 +134,15 @@
     return new Task(r).create();
   }
 
-  public synchronized void start(UriUpdates uriUpdates) {
+  public synchronized Set<String> start(UriUpdates uriUpdates) {
+    Set<String> startedRefs = new HashSet<>();
     for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
-      new Task(update).start();
+      Task t = new Task(update);
+      if (t.start()) {
+        startedRefs.add(t.update.ref());
+      }
     }
+    return startedRefs;
   }
 
   public synchronized void reset(UriUpdates uriUpdates) {
@@ -182,7 +194,7 @@
   }
 
   @SuppressWarnings("deprecation")
-  private ObjectId sha1(String s) {
+  private static ObjectId sha1(String s) {
     return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
   }
 
@@ -203,9 +215,7 @@
 
     public Task(ReplicateRefUpdate update) {
       this.update = update;
-      String key =
-          update.project() + "\n" + update.ref() + "\n" + update.uri() + "\n" + update.remote();
-      taskKey = sha1(key).name();
+      taskKey = update.sha1();
       running = createDir(runningUpdates).resolve(taskKey);
       waiting = createDir(waitingUpdates).resolve(taskKey);
     }
@@ -228,8 +238,8 @@
       return taskKey;
     }
 
-    public void start() {
-      rename(waiting, running);
+    public boolean start() {
+      return rename(waiting, running);
     }
 
     public void reset() {
@@ -253,12 +263,14 @@
       }
     }
 
-    private void rename(Path from, Path to) {
+    private boolean rename(Path from, Path to) {
       try {
         logger.atFine().log("RENAME %s to %s %s", from, to, updateLog());
         Files.move(from, to, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+        return true;
       } catch (IOException e) {
         logger.atSevere().withCause(e).log("Error while renaming task %s", taskKey);
+        return false;
       }
     }
 
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index adb8d4c..ded9d84 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -16,6 +16,12 @@
 local path as replication target. This makes e.g. sense if a network
 share is mounted to which the repositories should be replicated.
 
+In multi-primary scenario, any replication work which is already
+in-flight or completed by the other nodes is not performed to
+avoid extra work. This is because, the storage for replication
+events is shared between multiple primaries.(The storage location
+is specified in the config using: `replication.eventsDirectory`).
+
 Replication of account data (NoteDb)
 ------------------------------------
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
index 420cdf8..48d14e2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -50,11 +50,14 @@
   protected static final Optional<String> ALL_PROJECTS = Optional.empty();
 
   protected static final int TEST_REPLICATION_DELAY_SECONDS = 1;
+  protected static final int TEST_LONG_REPLICATION_DELAY_SECONDS = 30;
   protected static final int TEST_REPLICATION_RETRY_MINUTES = 1;
   protected static final int TEST_PUSH_TIME_SECONDS = 1;
   protected static final int TEST_PROJECT_CREATION_SECONDS = 10;
   protected static final Duration TEST_PUSH_TIMEOUT =
       Duration.ofSeconds(TEST_REPLICATION_DELAY_SECONDS + TEST_PUSH_TIME_SECONDS);
+  protected static final Duration TEST_PUSH_TIMEOUT_LONG =
+      Duration.ofSeconds(TEST_LONG_REPLICATION_DELAY_SECONDS + TEST_PUSH_TIME_SECONDS);
   protected static final Duration TEST_NEW_PROJECT_TIMEOUT =
       Duration.ofSeconds(
           (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java
new file mode 100644
index 0000000..bcbbc8a
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java
@@ -0,0 +1,89 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+/**
+ * This class can be extended by any ReplicationStorage*IT class and provides common setup and
+ * helper methods.
+ */
+public class ReplicationStorageDaemon extends ReplicationDaemon {
+  protected static final int TEST_TASK_FINISH_SECONDS = 1;
+  protected static final int TEST_REPLICATION_MAX_RETRIES = 1;
+  protected static final Duration TEST_TASK_FINISH_TIMEOUT =
+      Duration.ofSeconds(TEST_TASK_FINISH_SECONDS);
+  protected static final Duration MAX_RETRY_WITH_TOLERANCE_TIMEOUT =
+      Duration.ofSeconds(
+          (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
+                  * TEST_REPLICATION_MAX_RETRIES
+              + 10);
+  protected ReplicationTasksStorage tasksStorage;
+  protected DestinationsCollection destinationCollection;
+  protected ReplicationConfig replicationConfig;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    initConfig();
+    setReplicationDestination(
+        "remote1",
+        "suffix1",
+        Optional.of("not-used-project")); // Simulates a full replication.config initialization
+    super.setUpTestPlugin();
+    tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+    destinationCollection = plugin.getSysInjector().getInstance(DestinationsCollection.class);
+    replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class);
+  }
+
+  protected boolean noIncompleteTasks() {
+    Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates");
+    Path runningUpdates = refUpdates.resolve("running");
+    Path waitingUpdates = refUpdates.resolve("waiting");
+    try {
+      return Files.list(runningUpdates).count() == 0 && Files.list(waitingUpdates).count() == 0;
+    } catch (IOException e) {
+      throw new RuntimeException(e.getMessage(), e);
+    }
+  }
+
+  protected List<ReplicationTasksStorage.ReplicateRefUpdate> listWaitingReplicationTasks(
+      String refRegex) {
+    Pattern refmaskPattern = Pattern.compile(refRegex);
+    return tasksStorage
+        .streamWaiting()
+        .filter(task -> refmaskPattern.matcher(task.ref()).matches())
+        .collect(toList());
+  }
+
+  protected void deleteWaitingReplicationTasks(String refRegex) {
+    Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates");
+    Path waitingUpdates = refUpdates.resolve("waiting");
+    for (ReplicationTasksStorage.ReplicateRefUpdate r : listWaitingReplicationTasks(refRegex)) {
+      try {
+        Files.deleteIfExists(waitingUpdates.resolve(r.sha1()));
+      } catch (IOException e) {
+        throw new RuntimeException("Couldn't delete waiting task", e);
+      }
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
index a65b257..753e429 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -16,7 +16,6 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.NO_OP;
-import static java.util.stream.Collectors.toList;
 
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
@@ -29,7 +28,6 @@
 import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -51,32 +49,7 @@
 @TestPlugin(
     name = "replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
-public class ReplicationStorageIT extends ReplicationDaemon {
-  private static final int TEST_TASK_FINISH_SECONDS = 1;
-  private static final int TEST_REPLICATION_MAX_RETRIES = 1;
-  protected static final Duration TEST_TASK_FINISH_TIMEOUT =
-      Duration.ofSeconds(TEST_TASK_FINISH_SECONDS);
-  private static final Duration MAX_RETRY_WITH_TOLERANCE_TIMEOUT =
-      Duration.ofSeconds(
-          (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
-                  * TEST_REPLICATION_MAX_RETRIES
-              + 10);
-  protected ReplicationTasksStorage tasksStorage;
-  private DestinationsCollection destinationCollection;
-  private ReplicationConfig replicationConfig;
-
-  @Override
-  public void setUpTestPlugin() throws Exception {
-    initConfig();
-    setReplicationDestination(
-        "remote1",
-        "suffix1",
-        Optional.of("not-used-project")); // Simulates a full replication.config initialization
-    super.setUpTestPlugin();
-    tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
-    destinationCollection = plugin.getSysInjector().getInstance(DestinationsCollection.class);
-    replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class);
-  }
+public class ReplicationStorageIT extends ReplicationStorageDaemon {
 
   @Test
   public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
@@ -389,14 +362,6 @@
         .filter(task -> remote.equals(task.remote()));
   }
 
-  private List<ReplicateRefUpdate> listWaitingReplicationTasks(String refRegex) {
-    Pattern refmaskPattern = Pattern.compile(refRegex);
-    return tasksStorage
-        .streamWaiting()
-        .filter(task -> refmaskPattern.matcher(task.ref()).matches())
-        .collect(toList());
-  }
-
   private List<ReplicateRefUpdate> listWaiting() {
     return tasksStorage.streamWaiting().collect(Collectors.toList());
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageMPIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageMPIT.java
new file mode 100644
index 0000000..65fc4df
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageMPIT.java
@@ -0,0 +1,74 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.junit.Test;
+
+/**
+ * The tests in this class ensure that events in the storage are correctly managed under multi-
+ * primary scenarios.
+ *
+ * @see com.googlesource.gerrit.plugins.replication.ReplicationStorageIT
+ */
+@UseLocalDisk
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationStorageMPIT extends ReplicationStorageDaemon {
+
+  @Test
+  public void workFromOnlyWaitingIsPerformed() throws Exception {
+    Project.NameKey targetProject = createTestProject(project + "replica");
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, TEST_LONG_REPLICATION_DELAY_SECONDS);
+    reloadConfig();
+
+    String newBranchA = "refs/heads/foo_branch_a";
+    String newBranchB = "refs/heads/foo_branch_b";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(project.get()).branch(newBranchA).create(input);
+    gApi.projects().name(project.get()).branch(newBranchB).create(input);
+
+    deleteWaitingReplicationTasks(
+        newBranchA); // This simulates the work being completed by other node
+    assertThat(listWaitingReplicationTasks("refs/heads/foo_branch_.*")).hasSize(1);
+
+    try (Repository repo = repoManager.openRepository(targetProject);
+        Repository sourceRepo = repoManager.openRepository(project)) {
+      WaitUtil.waitUntil(
+          () -> checkedGetRef(repo, newBranchA) == null && checkedGetRef(repo, newBranchB) != null,
+          TEST_PUSH_TIMEOUT_LONG);
+
+      Ref masterRef = getRef(sourceRepo, master);
+      Ref targetBranchRefA = getRef(repo, newBranchA);
+      Ref targetBranchRefB = getRef(repo, newBranchB);
+      assertThat(targetBranchRefA).isNull();
+      assertThat(targetBranchRefB).isNotNull();
+      assertThat(targetBranchRefB.getObjectId()).isEqualTo(masterRef.getObjectId());
+    }
+
+    assertTrue(noIncompleteTasks());
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
index 25752b4..5cfb2d0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.google.common.truth.Truth.assertThat;
 import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertNoIncompleteTasks;
 import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertThatStream;
 
@@ -194,6 +195,18 @@
     assertNoIncompleteTasks(persistedView);
   }
 
+  @Test
+  public void duplicateWorkIsNotPerformed() {
+    nodeA.create(REF_UPDATE);
+    nodeB.create(REF_UPDATE);
+
+    assertThat(nodeA.start(URI_UPDATES)).containsExactly(REF_UPDATE.ref());
+    assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+
+    assertThat(nodeB.start(URI_UPDATES)).isEmpty();
+    assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+  }
+
   public static UriUpdates getUriUpdates(ReplicationTasksStorage.ReplicateRefUpdate refUpdate) {
     try {
       return TestUriUpdates.create(refUpdate);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
index 48e145b..16a0363 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -84,7 +84,7 @@
   @Test
   public void canStartWaitingUpdate() throws Exception {
     storage.create(REF_UPDATE);
-    storage.start(uriUpdates);
+    assertThat(storage.start(uriUpdates)).containsExactly(REF_UPDATE.ref());
     assertThatStream(storage.streamWaiting()).isEmpty();
     assertFalse(storage.isWaiting(uriUpdates));
     assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);