Merge "ReplicationState: Streamline getRefStatus()"
diff --git a/BUILD b/BUILD
index 4c74bf2..ee97660 100644
--- a/BUILD
+++ b/BUILD
@@ -14,9 +14,6 @@
         "Gerrit-SshModule: com.googlesource.gerrit.plugins.replication.SshModule",
     ],
     resources = glob(["src/main/resources/**/*"]),
-    deps = [
-        "//lib/auto:auto-value-gson",
-    ],
 )
 
 junit_tests(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index e0a9354..8ef21d0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -63,6 +63,7 @@
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.servlet.RequestScoped;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState;
 import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
 import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
@@ -70,7 +71,6 @@
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -463,7 +463,7 @@
   }
 
   void scheduleDeleteProject(URIish uri, Project.NameKey project, ProjectDeletionState state) {
-    repLog.atFine().log("scheduling deletion of project {} at {}", project, uri);
+    repLog.atFine().log("scheduling deletion of project %s at %s", project, uri);
     @SuppressWarnings("unused")
     ScheduledFuture<?> ignored =
         pool.schedule(deleteProjectFactory.create(uri, project, state), 0, TimeUnit.SECONDS);
@@ -608,15 +608,15 @@
     }
   }
 
-  public Set<String> getPrunableTaskNames() {
-    Set<String> names = new HashSet<>();
+  public Map<ReplicateRefUpdate, String> getTaskNamesByReplicateRefUpdate() {
+    Map<ReplicateRefUpdate, String> taskNameByReplicateRefUpdate = new HashMap<>();
     for (PushOne push : pending.values()) {
-      if (!replicationTasksStorage.get().isWaiting(push)) {
-        repLog.atFine().log("No longer isWaiting, can prune %s", push.getURI());
-        names.add(push.toString());
+      String taskName = push.toString();
+      for (ReplicateRefUpdate refUpdate : push.getReplicateRefUpdates()) {
+        taskNameByReplicateRefUpdate.put(refUpdate, taskName);
       }
     }
-    return names;
+    return taskNameByReplicateRefUpdate;
   }
 
   boolean wouldPush(URIish uri, Project.NameKey project, String ref) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
index d195aa3..0fa02ef 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
@@ -68,7 +68,8 @@
       if (exitCode == 1) {
         logger.atInfo().log(
             "DeleteProject plugin is not installed on %s;"
-                + " will not try to forward this operation to that host");
+                + " will not try to forward this operation to that host",
+            uri);
         withoutDeleteProjectPlugin.add(uri);
       }
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 404d4bd..87c35ee 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -468,6 +468,7 @@
             pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR);
           }
         } else {
+          retryDone();
           repLog.atSevere().log(
               "Giving up after %d '%s' failures during replication to %s",
               updateRefRetryCount, e.getMessage(), uri);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index ed474ae..5310c14 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -41,6 +41,7 @@
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,6 +70,11 @@
   private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
   private Distributor distributor;
 
+  protected enum Prune {
+    TRUE,
+    FALSE;
+  }
+
   @Inject
   ReplicationQueue(
       ReplicationConfig rc,
@@ -94,7 +100,7 @@
       destinations.get().startup(workQueue);
       running = true;
       replicationTasksStorage.recoverAll();
-      firePendingEvents();
+      synchronizePendingEvents(Prune.FALSE);
       fireBeforeStartupEvents();
       distributor = new Distributor(workQueue);
     }
@@ -193,8 +199,14 @@
     }
   }
 
-  private void firePendingEvents() {
+  private void synchronizePendingEvents(Prune prune) {
     if (replaying.compareAndSet(false, true)) {
+      final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate = new ConcurrentHashMap<>();
+      if (Prune.TRUE.equals(prune)) {
+        for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
+          taskNamesByReplicateRefUpdate.putAll(destination.getTaskNamesByReplicateRefUpdate());
+        }
+      }
       new ChainedScheduler.StreamScheduler<>(
           workQueue.getDefaultQueue(),
           replicationTasksStorage.streamWaiting(),
@@ -203,6 +215,9 @@
             public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
               try {
                 fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref());
+                if (Prune.TRUE.equals(prune)) {
+                  taskNamesByReplicateRefUpdate.remove(u);
+                }
               } catch (URISyntaxException e) {
                 repLog.atSevere().withCause(e).log(
                     "Encountered malformed URI for persisted event %s", u);
@@ -213,6 +228,9 @@
 
             @Override
             public void onDone() {
+              if (Prune.TRUE.equals(prune)) {
+                pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values()));
+              }
               replaying.set(false);
             }
 
@@ -224,17 +242,12 @@
     }
   }
 
-  private void pruneCompleted() {
+  private void pruneNoLongerPending(Set<String> prunableTaskNames) {
     // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes.
     // We also cannot access them by taskId since PushOnes don't have a taskId, they do have
-    // and Id, but it not the id assigned to the task in the queues. The tasks in the queue
-    // do use the same name as returned by toString() though, so that be used to correlate
+    // an Id, but it is not the id assigned to the task in the queues. The tasks in the queue
+    // do use the same name as returned by toString() though, so that can be used to correlate
     // PushOnes with queue tasks despite their wrappers.
-    Set<String> prunableTaskNames = new HashSet<>();
-    for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
-      prunableTaskNames.addAll(destination.getPrunableTaskNames());
-    }
-
     for (WorkQueue.Task<?> task : workQueue.getTasks()) {
       WorkQueue.Task.State state = task.getState();
       if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) {
@@ -310,8 +323,7 @@
         return;
       }
       try {
-        firePendingEvents();
-        pruneCompleted();
+        synchronizePendingEvents(Prune.TRUE);
       } catch (Exception e) {
         repLog.atSevere().withCause(e).log("error distributing tasks");
       }
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index ded9d84..d216366 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -16,11 +16,9 @@
 local path as replication target. This makes e.g. sense if a network
 share is mounted to which the repositories should be replicated.
 
-In multi-primary scenario, any replication work which is already
-in-flight or completed by the other nodes is not performed to
-avoid extra work. This is because, the storage for replication
-events is shared between multiple primaries.(The storage location
-is specified in the config using: `replication.eventsDirectory`).
+It is possible to
+[configure](config.html#configuring-cluster-replication) the plugin so
+that multiple primaries share the replication work approximately evenly.
 
 Replication of account data (NoteDb)
 ------------------------------------
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index f4ea9d6..8843671 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -46,6 +46,59 @@
 To manually trigger replication at runtime, see
 SSH command [start](cmd-start.md).
 
+<a name="configuring-cluster-replication"></a>
+Configuring Cluster Replication
+-------------------------------
+
+The replication plugin is designed to allow multiple primaries in a
+cluster to efficiently cooperate together. This cooperation is based on
+the replication event persistence subsystem and thus the directory
+pointed to by the replication.eventsDirectory config key must reside on
+a shared filesystem, such as NFS, to enable this cooperation.  By
+default simply pointing multiple primaries to the same eventsDirectory
+will enable some cooperation by preventing the same replication push
+from being duplicated by more than one primary.
+
+To further improve cooperation across the cluster, the
+replication.distributionInterval config value can be set. With
+distribution enabled, the replication queues for all the nodes sharing
+the same eventsDirectory will reflect approximately the same outstanding
+replication work (i.e. tasks waiting in the queue). Replication pushes
+which are running will continue to only be visible in the queue of the
+node on which the push is actually happening. This feature not only
+helps administrators get a cluster wide view of outstanding replication
+tasks, it allows replication tasks triggered by one primary to be
+fullfilled by another node which is less busy.
+
+This enhanced replication work distribution allows the amount of
+replication work a cluster can handle to scale more evenly and linearly
+with the amount of primaries in the cluster. Adding more nodes to a
+cluster without distribution enabled will generally not allow the thread
+count per remote to be reduced without impacting service levels to those
+remotes. This is because without distribution, all events triggered by a
+node will only be fullfilled by the node which triggered the event, even
+if all the other nodes in the cluster are idle. This behavior implies
+that each node should be configured in a way that allows it alone to
+provide the level of service which each remote requires. However with
+distribution enabled, it becomes possible to reduce the amount of
+replication threads configured per remote proportionally to the amount
+of nodes in the cluster while maintaining the same approximate service
+level as before adding new nodes.
+
+Thread per remote reduction without service impacts is possible with
+distribution because when configuring a node it can be expected that
+other nodes will pick up some of the work it triggers and it no longer
+needs to be configured as if it were the only node in the cluster. For
+example, if a remote requires 6 threads with one node to achieve
+acceptable service, it should only take 2 threads on 3 equivalently
+powered nodes to provide the same service level with distribution
+enabled. Scaling down of the thread requirements per remote results in a
+reduced memory footprint per remote on each node in the cluster and this
+enables the nodes in the cluster to now scale to handle more remotes
+with the approximate same service level than without distribution. The
+amount of extra supported remotes then also scales approximately
+linearly with the extra nodes in a cluster.
+
 File `replication.config`
 -------------------------
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
index 202d77e..9606371 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -26,6 +26,7 @@
 import com.google.gerrit.extensions.api.changes.NotifyHandling;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.git.LocalDiskRepositoryManager;
 import com.google.inject.Inject;
 import java.io.IOException;
 import java.nio.file.Path;
@@ -71,6 +72,18 @@
   protected Path gitPath;
   protected FileBasedConfig config;
 
+  protected void setDistributionInterval(int interval) throws IOException {
+    config.setInt("replication", null, "distributionInterval", interval);
+    config.save();
+  }
+
+  protected String getProjectUri(Project.NameKey project) throws Exception {
+    return ((LocalDiskRepositoryManager) repoManager)
+        .getBasePath(project)
+        .resolve(project.get() + ".git")
+        .toString();
+  }
+
   protected void setReplicationDestination(
       String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
     setReplicationDestination(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
new file mode 100644
index 0000000..3d0dfc1
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
@@ -0,0 +1,126 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.WaitUtil;
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.WorkQueue;
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Test;
+
+/**
+ * The tests in this class ensure the correctness of {@link
+ * com.googlesource.gerrit.plugins.replication.ReplicationQueue.Distributor}
+ */
+@UseLocalDisk
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationDistributorIT extends ReplicationStorageDaemon {
+  private static final int TEST_DISTRIBUTION_INTERVAL_SECONDS = 3;
+  private static final int TEST_DISTRIBUTION_DURATION_SECONDS = 1;
+  private static final int TEST_DISTRIBUTION_CYCLE_SECONDS =
+      TEST_DISTRIBUTION_INTERVAL_SECONDS + TEST_DISTRIBUTION_DURATION_SECONDS;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    initConfig();
+    setDistributionInterval(TEST_DISTRIBUTION_INTERVAL_SECONDS);
+    super.setUpTestPlugin();
+  }
+
+  @Test
+  public void distributorAddingTaskFromStorage() throws Exception {
+    String remote = "foo";
+    String replica = "replica";
+    String master = "refs/heads/master";
+    String newBranch = "refs/heads/foo_branch";
+    Project.NameKey targetProject = createTestProject(project + replica);
+    ReplicationTasksStorage.ReplicateRefUpdate ref =
+        ReplicationTasksStorage.ReplicateRefUpdate.create(
+            project.get(), newBranch, new URIish(getProjectUri(targetProject)), remote);
+    createBranch(project, master, newBranch);
+    setReplicationDestination(remote, replica, ALL_PROJECTS);
+    reloadConfig();
+
+    tasksStorage.create(ref); // Mimics RefUpdate inserted into storage by other Primary
+    WaitUtil.waitUntil(
+        () -> getProjectTasks().size() != 0, Duration.ofSeconds(TEST_DISTRIBUTION_CYCLE_SECONDS));
+
+    List<WorkQueue.Task<?>> tasks = getProjectTasks();
+    assertThat(tasks).hasSize(1); // ReplicationTask for the created ref in queue
+    assertThat(waitForProjectTaskCount(0, TEST_PUSH_TIMEOUT)).isTrue();
+
+    try (Repository targetRepo = repoManager.openRepository(targetProject);
+        Repository sourceRepo = repoManager.openRepository(project)) {
+      Ref masterRef = getRef(sourceRepo, master);
+      Ref targetBranchRef = getRef(targetRepo, newBranch);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(masterRef.getObjectId());
+    }
+  }
+
+  @Test
+  public void distributorPrunesTaskFromWorkQueue() throws Exception {
+    createTestProject(project + "replica");
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+    reloadConfig();
+
+    String newBranch = "refs/heads/foo_branch";
+    createBranch(BranchNameKey.create(project, newBranch));
+
+    assertThat(listWaitingReplicationTasks(newBranch)).hasSize(1);
+    deleteWaitingReplicationTasks(newBranch); // This simulates the work being started by other node
+
+    assertThat(waitForProjectTaskCount(0, Duration.ofSeconds(TEST_DISTRIBUTION_CYCLE_SECONDS)))
+        .isTrue();
+  }
+
+  private List<WorkQueue.Task<?>> getProjectTasks() {
+    return getInstance(WorkQueue.class).getTasks().stream()
+        .filter(t -> t instanceof WorkQueue.ProjectTask)
+        .collect(Collectors.toList());
+  }
+
+  private void createBranch(Project.NameKey project, String fromRef, String refToCreate)
+      throws Exception {
+    try (Repository repo = repoManager.openRepository(project)) {
+      Ref from = repo.exactRef(fromRef);
+      RefUpdate createBranch = repo.updateRef(refToCreate);
+      createBranch.setNewObjectId(from.getObjectId());
+      createBranch.update();
+    }
+  }
+
+  private boolean waitForProjectTaskCount(int count, Duration duration) {
+    try {
+      WaitUtil.waitUntil(() -> getProjectTasks().size() == count, duration);
+      return true;
+    } catch (InterruptedException e) {
+      return false;
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
index e508b32..e2e1e21 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -250,26 +250,6 @@
     replicateBranchDeletion(false);
   }
 
-  private void replicateBranchDeletion(boolean mirror) throws Exception {
-    setReplicationDestination("foo", "replica", ALL_PROJECTS);
-    reloadConfig();
-
-    Project.NameKey targetProject = createTestProject(project + "replica");
-    String branchToDelete = "refs/heads/todelete";
-    String master = "refs/heads/master";
-    BranchInput input = new BranchInput();
-    input.revision = master;
-    gApi.projects().name(project.get()).branch(branchToDelete).create(input);
-    isPushCompleted(targetProject, branchToDelete, TEST_PUSH_TIMEOUT);
-
-    setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE, mirror);
-    reloadConfig();
-
-    gApi.projects().name(project.get()).branch(branchToDelete).delete();
-
-    assertThat(listWaitingReplicationTasks(branchToDelete)).hasSize(1);
-  }
-
   @Test
   public void shouldCleanupTasksAfterNewProjectReplication() throws Exception {
     setReplicationDestination("task_cleanup_project", "replica", ALL_PROJECTS);
@@ -332,6 +312,26 @@
     WaitUtil.waitUntil(() -> isTaskCleanedUp(), TEST_TASK_FINISH_TIMEOUT);
   }
 
+  private void replicateBranchDeletion(boolean mirror) throws Exception {
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Project.NameKey targetProject = createTestProject(project + "replica");
+    String branchToDelete = "refs/heads/todelete";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(project.get()).branch(branchToDelete).create(input);
+    isPushCompleted(targetProject, branchToDelete, TEST_PUSH_TIMEOUT);
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE, mirror);
+    reloadConfig();
+
+    gApi.projects().name(project.get()).branch(branchToDelete).delete();
+
+    assertThat(listWaitingReplicationTasks(branchToDelete)).hasSize(1);
+  }
+
   private boolean isTaskRescheduled(QueueInfo queue, URIish uri) {
     PushOne pushOne = queue.pending.get(uri);
     return pushOne == null ? false : pushOne.isRetrying();