Merge branch 'stable-3.8' into stable-3.10

* stable-3.8:
  PushOne: Remove unused addRef() method
  TasksStorage: Remove synchronized from methods
  Place the replaying flag clearing in a finally
  Demote delete errors when distributor is enabled
  distributor: Reduce log level for no-op consolidations

Change-Id: I15eb65d0f379a6e562d3c86e1089cfb7d9e6752c
Release-Notes: skip
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 6054a4a..c186493 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -392,8 +392,23 @@
     schedule(project, refs, uri, state, false);
   }
 
+  void scheduleFromStorage(
+      Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state) {
+    schedule(project, refs, uri, state, false, true);
+  }
+
   void schedule(
       Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state, boolean now) {
+    schedule(project, refs, uri, state, now, false);
+  }
+
+  void schedule(
+      Project.NameKey project,
+      Set<String> refs,
+      URIish uri,
+      ReplicationState state,
+      boolean now,
+      boolean fromStorage) {
     ImmutableSet.Builder<String> toSchedule = ImmutableSet.builder();
     for (String ref : refs) {
       if (!shouldReplicate(project, ref, state)) {
@@ -445,11 +460,14 @@
             "scheduled %s:%s => %s to run %s",
             project, refsToSchedule, task, now ? "now" : "after " + config.getDelay() + "s");
       } else {
-        task.addRefBatch(refsToSchedule);
+        boolean added = task.addRefBatch(refsToSchedule);
         task.addState(refsToSchedule, state);
-        repLog.atInfo().log(
-            "consolidated %s:%s => %s with an existing pending push",
-            project, refsToSchedule, task);
+        String message = "consolidated %s:%s => %s with an existing pending push";
+        if (added || !fromStorage) {
+          repLog.atInfo().log(message, project, refsToSchedule, task);
+        } else {
+          repLog.atFine().log(message, project, refsToSchedule, task);
+        }
       }
       for (String ref : refsToSchedule) {
         state.increasePushTaskCount(project.get(), ref);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 9d152f6..2b60145 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -289,18 +289,20 @@
     return uri;
   }
 
-  void addRef(String ref) {
-    addRefBatch(ImmutableSet.of(ref));
-  }
-
-  void addRefBatch(ImmutableSet<String> refBatch) {
+  /** Returns false if all refs were already included in the push, true otherwise */
+  boolean addRefBatch(ImmutableSet<String> refBatch) {
     if (refBatch.size() == 1 && refBatch.contains(ALL_REFS)) {
       refBatchesToPush.clear();
+      boolean pushAllRefsChanged = !pushAllRefs;
       pushAllRefs = true;
       repLog.atFinest().log("Added all refs for replication to %s", uri);
-    } else if (!pushAllRefs && refBatchesToPush.add(refBatch)) {
-      repLog.atFinest().log("Added ref %s for replication to %s", refBatch, uri);
+      return pushAllRefsChanged;
     }
+    if (!pushAllRefs && refBatchesToPush.add(refBatch)) {
+      repLog.atFinest().log("Added ref %s for replication to %s", refBatch, uri);
+      return true;
+    }
+    return false;
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 666eb64..267ba4e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -176,10 +176,10 @@
     }
   }
 
-  private void fire(URIish uri, Project.NameKey project, ImmutableSet<String> refNames) {
+  private void fireFromStorage(URIish uri, Project.NameKey project, ImmutableSet<String> refNames) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
     for (Destination dest : destinations.get().getDestinations(uri, project, refNames)) {
-      dest.schedule(project, refNames, uri, state);
+      dest.scheduleFromStorage(project, refNames, uri, state);
     }
     state.markAllPushTasksScheduled();
   }
@@ -237,7 +237,7 @@
             @Override
             public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
               try {
-                fire(new URIish(u.uri()), Project.nameKey(u.project()), u.refs());
+                fireFromStorage(new URIish(u.uri()), Project.nameKey(u.project()), u.refs());
                 if (Prune.TRUE.equals(prune)) {
                   taskNamesByReplicateRefUpdate.remove(u);
                 }
@@ -251,10 +251,13 @@
 
             @Override
             public void onDone() {
-              if (Prune.TRUE.equals(prune)) {
-                pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values()));
+              try {
+                if (Prune.TRUE.equals(prune)) {
+                  pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values()));
+                }
+              } finally {
+                replaying.set(false);
               }
-              replaying.set(false);
             }
 
             @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index aa711b6..40f5278 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -168,11 +168,11 @@
     return isMultiPrimary;
   }
 
-  public synchronized String create(ReplicateRefUpdate r) {
+  public String create(ReplicateRefUpdate r) {
     return new Task(r).create();
   }
 
-  public synchronized Set<ImmutableSet<String>> start(UriUpdates uriUpdates) {
+  public Set<ImmutableSet<String>> start(UriUpdates uriUpdates) {
     Set<ImmutableSet<String>> startedRefs = new HashSet<>();
     for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
       Task t = new Task(update);
@@ -183,13 +183,13 @@
     return startedRefs;
   }
 
-  public synchronized void reset(UriUpdates uriUpdates) {
+  public void reset(UriUpdates uriUpdates) {
     for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
       new Task(update).reset();
     }
   }
 
-  public synchronized void recoverAll() {
+  public void recoverAll() {
     streamRunning().forEach(r -> new Task(r).recover());
   }
 
@@ -401,7 +401,15 @@
         logger.atFine().log("DELETE %s %s", running, updateLog());
         Files.delete(running);
       } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+        String message = "Error while deleting task %s";
+        if (isMultiPrimary() && e instanceof NoSuchFileException) {
+          logger.atFine().log(
+              message
+                  + " (expected after recovery from another node's startup with multi-primaries and distributor enabled)",
+              taskKey);
+        } else {
+          logger.atSevere().withCause(e).log(message, taskKey);
+        }
       }
     }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index 4670dff..aa29846 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -204,7 +204,7 @@
 
     PushOne pushOne = createPushOne(replicationPushFilter);
 
-    pushOne.addRef(PushOne.ALL_REFS);
+    pushOne.addRefBatch(ImmutableSet.of(PushOne.ALL_REFS));
     pushOne.run();
 
     isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
@@ -226,7 +226,7 @@
 
     PushOne pushOne = createPushOne(replicationPushFilter);
 
-    pushOne.addRef(PushOne.ALL_REFS);
+    pushOne.addRefBatch(ImmutableSet.of(PushOne.ALL_REFS));
     pushOne.run();
 
     isCallFinished.await(10, TimeUnit.SECONDS);
@@ -353,7 +353,7 @@
     when(gitRepositoryManagerMock.openRepository(projectNameKey))
         .thenThrow(new RepositoryNotFoundException("not found"));
     PushOne pushOne = createPushOne(null);
-    pushOne.addRef(PushOne.ALL_REFS);
+    pushOne.addRefBatch(ImmutableSet.of(PushOne.ALL_REFS));
     pushOne.setToRetry();
     pushOne.run();
     assertThat(pushOne.isRetrying()).isFalse();
@@ -365,7 +365,7 @@
             NEW, "bar", ObjectId.fromString("0000000000000000000000000000000000000001"));
     localRefs.add(barLocalRef);
 
-    pushOne.addRef(PushOne.ALL_REFS);
+    pushOne.addRefBatch(ImmutableSet.of(PushOne.ALL_REFS));
     pushOne.run();
 
     isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);