Avoid checking for existing tasks while pruning

When the distributor runs, it now stores a snapshot of pending
pushes and then removes from this snapshot all the RefUpdates
which were found while adding pending persisted tasks. The
remaining pushes in the snapshot can now be pruned without
needing to do an existence check on them since they were no
longer stored persistently (and thus no longer needed to be
executed). This effectively makes pruning I/O less, thereby
reducing the load put by distributor on disk I/O.

Change-Id: I0916a57b302fd7d207fd31ec26df65d262a76124
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index dfe7e79..8ef21d0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -63,6 +63,7 @@
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.servlet.RequestScoped;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState;
 import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
 import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
@@ -70,7 +71,6 @@
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -608,15 +608,15 @@
     }
   }
 
-  public Set<String> getPrunableTaskNames() {
-    Set<String> names = new HashSet<>();
+  public Map<ReplicateRefUpdate, String> getTaskNamesByReplicateRefUpdate() {
+    Map<ReplicateRefUpdate, String> taskNameByReplicateRefUpdate = new HashMap<>();
     for (PushOne push : pending.values()) {
-      if (!replicationTasksStorage.get().isWaiting(push)) {
-        repLog.atFine().log("No longer isWaiting, can prune %s", push.getURI());
-        names.add(push.toString());
+      String taskName = push.toString();
+      for (ReplicateRefUpdate refUpdate : push.getReplicateRefUpdates()) {
+        taskNameByReplicateRefUpdate.put(refUpdate, taskName);
       }
     }
-    return names;
+    return taskNameByReplicateRefUpdate;
   }
 
   boolean wouldPush(URIish uri, Project.NameKey project, String ref) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index ed474ae..4abb295 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -41,6 +41,7 @@
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,6 +70,11 @@
   private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
   private Distributor distributor;
 
+  protected enum Prune {
+    TRUE,
+    FALSE;
+  }
+
   @Inject
   ReplicationQueue(
       ReplicationConfig rc,
@@ -94,7 +100,7 @@
       destinations.get().startup(workQueue);
       running = true;
       replicationTasksStorage.recoverAll();
-      firePendingEvents();
+      synchronizePendingEvents(Prune.FALSE);
       fireBeforeStartupEvents();
       distributor = new Distributor(workQueue);
     }
@@ -193,8 +199,14 @@
     }
   }
 
-  private void firePendingEvents() {
+  private void synchronizePendingEvents(Prune prune) {
     if (replaying.compareAndSet(false, true)) {
+      final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate = new ConcurrentHashMap<>();
+      if (Prune.TRUE.equals(prune)) {
+        for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
+          taskNamesByReplicateRefUpdate.putAll(destination.getTaskNamesByReplicateRefUpdate());
+        }
+      }
       new ChainedScheduler.StreamScheduler<>(
           workQueue.getDefaultQueue(),
           replicationTasksStorage.streamWaiting(),
@@ -203,6 +215,9 @@
             public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
               try {
                 fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref());
+                if (Prune.TRUE.equals(prune)) {
+                  taskNamesByReplicateRefUpdate.remove(u);
+                }
               } catch (URISyntaxException e) {
                 repLog.atSevere().withCause(e).log(
                     "Encountered malformed URI for persisted event %s", u);
@@ -213,6 +228,9 @@
 
             @Override
             public void onDone() {
+              if (Prune.TRUE.equals(prune)) {
+                pruneNoLongerPending(taskNamesByReplicateRefUpdate.values());
+              }
               replaying.set(false);
             }
 
@@ -224,17 +242,12 @@
     }
   }
 
-  private void pruneCompleted() {
+  private void pruneNoLongerPending(Collection<String> prunableTaskNames) {
     // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes.
     // We also cannot access them by taskId since PushOnes don't have a taskId, they do have
-    // and Id, but it not the id assigned to the task in the queues. The tasks in the queue
-    // do use the same name as returned by toString() though, so that be used to correlate
+    // an Id, but it is not the id assigned to the task in the queues. The tasks in the queue
+    // do use the same name as returned by toString() though, so that can be used to correlate
     // PushOnes with queue tasks despite their wrappers.
-    Set<String> prunableTaskNames = new HashSet<>();
-    for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
-      prunableTaskNames.addAll(destination.getPrunableTaskNames());
-    }
-
     for (WorkQueue.Task<?> task : workQueue.getTasks()) {
       WorkQueue.Task.State state = task.getState();
       if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) {
@@ -310,8 +323,7 @@
         return;
       }
       try {
-        firePendingEvents();
-        pruneCompleted();
+        synchronizePendingEvents(Prune.TRUE);
       } catch (Exception e) {
         repLog.atSevere().withCause(e).log("error distributing tasks");
       }