distributor: Reduce log level for no-op consolidations

It's expected that the replication distributor will generate many of
these log messages on a busy instance because it will repeatedly
re-schedule the same ref updates. Since those messages likely aren't
operationally interesting, reduce the log level to atFine when
scheduling is happening based on stored replication tasks and the ref is
already scheduled in the push task.

Change-Id: I07d071af848481fa84e1ba26fb8287c5e61d1d67
Release-Notes: Reduced log level for no-op distributor push consolidations
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 9cd0526..9a53aa3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -391,8 +391,23 @@
     schedule(project, ref, uri, state, false);
   }
 
+  void scheduleFromStorage(
+      Project.NameKey project, String ref, URIish uri, ReplicationState state) {
+    schedule(project, ref, uri, state, false, true);
+  }
+
   void schedule(
       Project.NameKey project, String ref, URIish uri, ReplicationState state, boolean now) {
+    schedule(project, ref, uri, state, now, false);
+  }
+
+  void schedule(
+      Project.NameKey project,
+      String ref,
+      URIish uri,
+      ReplicationState state,
+      boolean now,
+      boolean fromStorage) {
     if (!shouldReplicate(project, ref, state)) {
       repLog.atFine().log("Not scheduling replication %s:%s => %s", project, ref, uri);
       return;
@@ -438,10 +453,14 @@
             "scheduled %s:%s => %s to run %s",
             project, ref, task, now ? "now" : "after " + config.getDelay() + "s");
       } else {
-        addRef(task, ref);
+        boolean added = addRef(task, ref);
         task.addState(ref, state);
-        repLog.atInfo().log(
-            "consolidated %s:%s => %s with an existing pending push", project, ref, task);
+        String message = "consolidated %s:%s => %s with an existing pending push";
+        if (added || !fromStorage) {
+          repLog.atInfo().log(message, project, ref, task);
+        } else {
+          repLog.atFine().log(message, project, ref, task);
+        }
       }
       state.increasePushTaskCount(project.get(), ref);
     }
@@ -477,9 +496,10 @@
         pool.schedule(updateHeadFactory.create(uri, project, newHead), 0, TimeUnit.SECONDS);
   }
 
-  private void addRef(PushOne e, String ref) {
-    e.addRef(ref);
+  private boolean addRef(PushOne e, String ref) {
+    boolean added = e.addRef(ref);
     postReplicationScheduledEvent(e, ref);
+    return added;
   }
 
   /**
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 8c53028..c21b837 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -281,14 +281,20 @@
     return uri;
   }
 
-  void addRef(String ref) {
+  /** Returns true if the ref was not already included in the push and false otherwise */
+  boolean addRef(String ref) {
     if (ALL_REFS.equals(ref)) {
       delta.clear();
+      boolean pushAllRefsChanged = !pushAllRefs;
       pushAllRefs = true;
       repLog.atFinest().log("Added all refs for replication to %s", uri);
-    } else if (!pushAllRefs && delta.add(ref)) {
-      repLog.atFinest().log("Added ref %s for replication to %s", ref, uri);
+      return pushAllRefsChanged;
     }
+    if (!pushAllRefs && delta.add(ref)) {
+      repLog.atFinest().log("Added ref %s for replication to %s", ref, uri);
+      return true;
+    }
+    return false;
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 5310c14..ef75bad 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -161,10 +161,10 @@
     }
   }
 
-  private void fire(URIish uri, Project.NameKey project, String refName) {
+  private void fireFromStorage(URIish uri, Project.NameKey project, String refName) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
     for (Destination dest : destinations.get().getDestinations(uri, project, refName)) {
-      dest.schedule(project, refName, uri, state);
+      dest.scheduleFromStorage(project, refName, uri, state);
     }
     state.markAllPushTasksScheduled();
   }
@@ -201,7 +201,8 @@
 
   private void synchronizePendingEvents(Prune prune) {
     if (replaying.compareAndSet(false, true)) {
-      final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate = new ConcurrentHashMap<>();
+      final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate =
+          new ConcurrentHashMap<>();
       if (Prune.TRUE.equals(prune)) {
         for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
           taskNamesByReplicateRefUpdate.putAll(destination.getTaskNamesByReplicateRefUpdate());
@@ -214,7 +215,7 @@
             @Override
             public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
               try {
-                fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref());
+                fireFromStorage(new URIish(u.uri()), Project.nameKey(u.project()), u.ref());
                 if (Prune.TRUE.equals(prune)) {
                   taskNamesByReplicateRefUpdate.remove(u);
                 }