Merge branch 'stable-3.5' into stable-3.8

* stable-3.5:
  TasksStorage: Remove synchronized from methods
  Place the replaying flag clearing in a finally
  Demote delete errors when distributor is enabled
  distributor: Reduce log level for no-op consolidations

Change-Id: Iaa6d4a59d5cb26879500f2d36852eaa30efc5d7b
Release-Notes: skip
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 04af9db..1f01e2a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -392,8 +392,23 @@
     schedule(project, refs, uri, state, false);
   }
 
+  void scheduleFromStorage(
+      Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state) {
+    schedule(project, refs, uri, state, false, true);
+  }
+
   void schedule(
       Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state, boolean now) {
+    schedule(project, refs, uri, state, now, false);
+  }
+
+  void schedule(
+      Project.NameKey project,
+      Set<String> refs,
+      URIish uri,
+      ReplicationState state,
+      boolean now,
+      boolean fromStorage) {
     Set<String> refsToSchedule = new HashSet<>();
     for (String ref : refs) {
       if (!shouldReplicate(project, ref, state)) {
@@ -443,11 +458,14 @@
             "scheduled %s:%s => %s to run %s",
             project, refsToSchedule, task, now ? "now" : "after " + config.getDelay() + "s");
       } else {
-        addRefs(task, ImmutableSet.copyOf(refsToSchedule));
+        boolean added = addRefs(task, ImmutableSet.copyOf(refsToSchedule));
         task.addState(refsToSchedule, state);
-        repLog.atInfo().log(
-            "consolidated %s:%s => %s with an existing pending push",
-            project, refsToSchedule, task);
+        String message = "consolidated %s:%s => %s with an existing pending push";
+        if (added || !fromStorage) {
+          repLog.atInfo().log(message, project, refsToSchedule, task);
+        } else {
+          repLog.atFine().log(message, project, refsToSchedule, task);
+        }
       }
       for (String ref : refsToSchedule) {
         state.increasePushTaskCount(project.get(), ref);
@@ -486,9 +504,10 @@
         pool.schedule(updateHeadFactory.create(uri, project, newHead), 0, TimeUnit.SECONDS);
   }
 
-  private void addRefs(PushOne e, ImmutableSet<String> refs) {
-    e.addRefBatch(refs);
+  private boolean addRefs(PushOne e, ImmutableSet<String> refs) {
+    boolean added = e.addRefBatch(refs);
     postReplicationScheduledEvent(e, refs);
+    return added;
   }
 
   /**
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 6feba66..f9947ae 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -287,14 +287,20 @@
     return uri;
   }
 
-  void addRefBatch(ImmutableSet<String> refBatch) {
+  /** Returns false if all refs were already included in the push, true otherwise */
+  boolean addRefBatch(ImmutableSet<String> refBatch) {
     if (refBatch.size() == 1 && refBatch.contains(ALL_REFS)) {
       refBatchesToPush.clear();
+      boolean pushAllRefsChanged = !pushAllRefs;
       pushAllRefs = true;
       repLog.atFinest().log("Added all refs for replication to %s", uri);
-    } else if (!pushAllRefs && refBatchesToPush.add(refBatch)) {
-      repLog.atFinest().log("Added ref %s for replication to %s", refBatch, uri);
+      return pushAllRefsChanged;
     }
+    if (!pushAllRefs && refBatchesToPush.add(refBatch)) {
+      repLog.atFinest().log("Added ref %s for replication to %s", refBatch, uri);
+      return true;
+    }
+    return false;
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 5691ac2..aa00634 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -175,10 +175,10 @@
     }
   }
 
-  private void fire(URIish uri, Project.NameKey project, ImmutableSet<String> refNames) {
+  private void fireFromStorage(URIish uri, Project.NameKey project, ImmutableSet<String> refNames) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
     for (Destination dest : destinations.get().getDestinations(uri, project, refNames)) {
-      dest.schedule(project, refNames, uri, state);
+      dest.scheduleFromStorage(project, refNames, uri, state);
     }
     state.markAllPushTasksScheduled();
   }
@@ -236,7 +236,7 @@
             @Override
             public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
               try {
-                fire(new URIish(u.uri()), Project.nameKey(u.project()), u.refs());
+                fireFromStorage(new URIish(u.uri()), Project.nameKey(u.project()), u.refs());
                 if (Prune.TRUE.equals(prune)) {
                   taskNamesByReplicateRefUpdate.remove(u);
                 }
@@ -250,10 +250,13 @@
 
             @Override
             public void onDone() {
-              if (Prune.TRUE.equals(prune)) {
-                pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values()));
+              try {
+                if (Prune.TRUE.equals(prune)) {
+                  pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values()));
+                }
+              } finally {
+                replaying.set(false);
               }
-              replaying.set(false);
             }
 
             @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 3f92983..2e4b619 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -167,11 +167,11 @@
     return isMultiPrimary;
   }
 
-  public synchronized String create(ReplicateRefUpdate r) {
+  public String create(ReplicateRefUpdate r) {
     return new Task(r).create();
   }
 
-  public synchronized Set<ImmutableSet<String>> start(UriUpdates uriUpdates) {
+  public Set<ImmutableSet<String>> start(UriUpdates uriUpdates) {
     Set<ImmutableSet<String>> startedRefs = new HashSet<>();
     for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
       Task t = new Task(update);
@@ -182,13 +182,13 @@
     return startedRefs;
   }
 
-  public synchronized void reset(UriUpdates uriUpdates) {
+  public void reset(UriUpdates uriUpdates) {
     for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
       new Task(update).reset();
     }
   }
 
-  public synchronized void recoverAll() {
+  public void recoverAll() {
     streamRunning().forEach(r -> new Task(r).recover());
   }
 
@@ -400,7 +400,15 @@
         logger.atFine().log("DELETE %s %s", running, updateLog());
         Files.delete(running);
       } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+        String message = "Error while deleting task %s";
+        if (isMultiPrimary() && e instanceof NoSuchFileException) {
+          logger.atFine().log(
+              message
+                  + " (expected after recovery from another node's startup with multi-primaries and distributor enabled)",
+              taskKey);
+        } else {
+          logger.atSevere().withCause(e).log(message, taskKey);
+        }
       }
     }