Merge branch 'stable-3.4'

* stable-3.4:
  Ensure states are updated for canceled replication tasks

Change-Id: Ifc514a151819f5d809a1834d382857baf334a8a2
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
index 89b97e9..c61a123 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
@@ -85,7 +85,7 @@
       super(
           threadPool,
           stream.iterator(),
-          new ForwardingRunner<T>(runner) {
+          new ForwardingRunner<>(runner) {
             @Override
             public void onDone() {
               stream.close();
@@ -109,7 +109,7 @@
       try {
         runner.run(item);
       } catch (RuntimeException e) { // catch to prevent chain from breaking
-        logger.atSevere().withCause(e).log("Error while running: " + item);
+        logger.atSevere().withCause(e).log("Error while running: %s", item);
       }
       if (!scheduledNext) {
         runner.onDone();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index baf0328..a6a162b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -23,7 +23,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import com.google.gerrit.entities.AccountGroup;
@@ -219,7 +218,7 @@
 
   private void addRecursiveParents(
       AccountGroup.UUID g,
-      Builder<AccountGroup.UUID> builder,
+      ImmutableSet.Builder<AccountGroup.UUID> builder,
       GroupIncludeCache groupIncludeCache) {
     for (AccountGroup.UUID p : groupIncludeCache.parentGroupsOf(g)) {
       if (builder.build().contains(p)) {
@@ -276,6 +275,8 @@
   }
 
   private void foreachPushOp(Map<URIish, PushOne> opsMap, Function<PushOne, Void> pushOneFunction) {
+    // Callers may modify the provided opsMap concurrently, hence make a defensive copy of the
+    // values to loop over them.
     for (PushOne pushOne : ImmutableList.copyOf(opsMap.values())) {
       pushOneFunction.apply(pushOne);
     }
@@ -636,7 +637,7 @@
       return true;
     }
 
-    boolean matches = (new ReplicationFilter(projects)).matches(project);
+    boolean matches = new ReplicationFilter(projects).matches(project);
     if (!matches) {
       repLog.atFine().log(
           "Skipping replication of project %s; does not match filter", project.get());
@@ -776,6 +777,10 @@
     return config.getSlowLatencyThreshold();
   }
 
+  int getPushBatchSize() {
+    return config.getPushBatchSize();
+  }
+
   private static boolean matches(URIish uri, String urlMatch) {
     if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
       return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index 1b39374..a74d198 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -14,10 +14,14 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.google.common.base.Suppliers.memoize;
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.gerrit.server.config.ConfigUtil;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.transport.RemoteConfig;
 
@@ -45,6 +49,7 @@
   private final RemoteConfig remoteConfig;
   private final int maxRetries;
   private final int slowLatencyThreshold;
+  private final Supplier<Integer> pushBatchSize;
 
   protected DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) {
     this.remoteConfig = remoteConfig;
@@ -84,6 +89,31 @@
                 "slowLatencyThreshold",
                 DEFAULT_SLOW_LATENCY_THRESHOLD_SECS,
                 TimeUnit.SECONDS);
+
+    pushBatchSize =
+        memoize(
+            () -> {
+              int configuredBatchSize =
+                  Math.max(
+                      0,
+                      getInt(
+                          remoteConfig,
+                          cfg,
+                          "pushBatchSize",
+                          cfg.getInt("gerrit", "pushBatchSize", 0)));
+              if (configuredBatchSize > 0) {
+                int distributionInterval = cfg.getInt("replication", "distributionInterval", 0);
+                if (distributionInterval > 0) {
+                  repLog.atWarning().log(
+                      "Push in batches cannot be turned on for remote (%s) when 'Cluster"
+                          + " Replication' (replication.distributionInterval) is configured",
+                      name);
+                  return 0;
+                }
+                return configuredBatchSize;
+              }
+              return 0;
+            });
   }
 
   @Override
@@ -173,4 +203,9 @@
   public int getSlowLatencyThreshold() {
     return slowLatencyThreshold;
   }
+
+  @Override
+  public int getPushBatchSize() {
+    return pushBatchSize.get();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
index 747c0f6..4957a64 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -35,7 +35,6 @@
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.replication.Destination.Factory;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -50,7 +49,7 @@
 public class DestinationsCollection implements ReplicationDestinations {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
-  private final Factory destinationFactory;
+  private final Destination.Factory destinationFactory;
   private final Provider<ReplicationQueue> replicationQueue;
   private volatile List<Destination> destinations;
   private boolean shuttingDown;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java
index a347f3a..09a632d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java
@@ -22,7 +22,6 @@
   /**
    * Determine if a throwable or a cause in its causal chain is a Stale NFS File Handle
    *
-   * @param throwable
    * @return a boolean true if the throwable or a cause in its causal chain is a Stale NFS File
    *     Handle
    */
@@ -39,7 +38,6 @@
   /**
    * Determine if an IOException is a Stale NFS File Handle
    *
-   * @param ioe
    * @return a boolean true if the IOException is a Stale NFS FIle Handle
    */
   public static boolean isStaleFileHandle(IOException ioe) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 87c35ee..8afcf9b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -19,12 +19,14 @@
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Throwables;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
@@ -49,7 +51,6 @@
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
-import com.jcraft.jsch.JSchException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -453,10 +454,7 @@
     } catch (NotSupportedException e) {
       stateLog.error("Cannot replicate to " + uri, e, getStatesAsArray());
     } catch (TransportException e) {
-      Throwable cause = e.getCause();
-      if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) {
-        repLog.atSevere().log("Cannot replicate to %s: %s", uri, cause.getMessage());
-      } else if (e instanceof UpdateRefFailureException) {
+      if (e instanceof UpdateRefFailureException) {
         updateRefRetryCount++;
         repLog.atSevere().log("Cannot replicate to %s due to a lock or write ref failure", uri);
 
@@ -564,7 +562,36 @@
           lazy(() -> refUpdatesForLogging(todo.subList(0, replConfig.getMaxRefsToLog()))));
     }
 
-    return tn.push(NullProgressMonitor.INSTANCE, todo);
+    return pushInBatches(tn, todo);
+  }
+
+  private PushResult pushInBatches(Transport tn, List<RemoteRefUpdate> todo)
+      throws NotSupportedException, TransportException {
+    int batchSize = pool.getPushBatchSize();
+    if (batchSize == 0 || todo.size() <= batchSize) {
+      return tn.push(NullProgressMonitor.INSTANCE, todo);
+    }
+
+    List<List<RemoteRefUpdate>> batches = Lists.partition(todo, batchSize);
+    repLog.atInfo().log("Push to %s in %d batches", uri, batches.size());
+    AggregatedPushResult result = new AggregatedPushResult();
+    int completedBatch = 1;
+    for (List<RemoteRefUpdate> batch : batches) {
+      repLog.atInfo().log(
+          "Pushing %d/%d batches for replication to %s", completedBatch, batches.size(), uri);
+      result.addResult(tn.push(NullProgressMonitor.INSTANCE, batch));
+
+      //  check if push should be no longer continued
+      if (wasCanceled()) {
+        repLog.atInfo().log(
+            "Push for replication to %s was canceled after %d completed batch and thus won't be"
+                + " rescheduled",
+            uri, completedBatch);
+        break;
+      }
+      completedBatch++;
+    }
+    return result;
   }
 
   private static String refUpdatesForLogging(List<RemoteRefUpdate> refUpdates) {
@@ -836,4 +863,24 @@
       super(uri, message);
     }
   }
+
+  /**
+   * Internal class used to aggregate PushResult objects from all push batches. See {@link
+   * PushOne#pushInBatches} for usage.
+   */
+  private static class AggregatedPushResult extends PushResult {
+    private final List<PushResult> results = new ArrayList<>();
+
+    void addResult(PushResult result) {
+      results.add(result);
+    }
+
+    @Override
+    public Collection<RemoteRefUpdate> getRemoteUpdates() {
+      return results.stream()
+          .map(PushResult::getRemoteUpdates)
+          .flatMap(Collection::stream)
+          .collect(toList());
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
index 5450dd5..bac599f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -33,11 +33,11 @@
   /**
    * Invoked when a ref has been replicated to one node.
    *
-   * @param project
-   * @param ref
-   * @param uri
-   * @param status
-   * @param refStatus
+   * @param project the project name
+   * @param ref the ref name
+   * @param uri the URI
+   * @param status the status of the push
+   * @param refStatus the status for the ref
    */
   default void onRefReplicatedToOneNode(
       String project,
@@ -49,16 +49,16 @@
   /**
    * Invoked when a ref has been replicated to all nodes.
    *
-   * @param project
-   * @param ref
-   * @param nodesCount
+   * @param project the project name
+   * @param ref the ref name
+   * @param nodesCount the number of nodes
    */
   default void onRefReplicatedToAllNodes(String project, String ref, int nodesCount) {}
 
   /**
    * Invoked when all refs have been replicated to all nodes.
    *
-   * @param totalPushTasksCount
+   * @param totalPushTasksCount total number of push tasks
    */
   default void onAllRefsReplicatedToAllNodes(int totalPushTasksCount) {}
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
index b66e73c..5fe0323 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
@@ -98,6 +98,13 @@
   int getSlowLatencyThreshold();
 
   /**
+   * Returns the maximum number of refs that can be pushed in a single push operation.
+   *
+   * @return batch size, zero if unlimited.
+   */
+  int getPushBatchSize();
+
+  /**
    * Whether the remote configuration is for a single project only
    *
    * @return true, when configuration is for a single project, false otherwise
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
index 18ccc66..2fa6c34 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
@@ -54,7 +54,7 @@
    */
   List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref);
 
-  /** @return true if there are no destinations, false otherwise. */
+  /** Returns true if there are no destinations, false otherwise. */
   boolean isEmpty();
 
   /**
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index f5c6185..2424e71 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -71,16 +71,18 @@
     return null;
   }
 
-  /* (non-Javadoc)
-   * @see com.googlesource.gerrit.plugins.replication.ReplicationConfig#isReplicateAllOnPluginStart()
+  /**
+   * See {@link
+   * com.googlesource.gerrit.plugins.replication.ReplicationConfig#isReplicateAllOnPluginStart()}
    */
   @Override
   public boolean isReplicateAllOnPluginStart() {
     return replicateAllOnPluginStart;
   }
 
-  /* (non-Javadoc)
-   * @see com.googlesource.gerrit.plugins.replication.ReplicationConfig#isDefaultForceUpdate()
+  /**
+   * See {@link
+   * com.googlesource.gerrit.plugins.replication.ReplicationConfig#isDefaultForceUpdate()}
    */
   @Override
   public boolean isDefaultForceUpdate() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 4abb295..791b387 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -201,7 +201,8 @@
 
   private void synchronizePendingEvents(Prune prune) {
     if (replaying.compareAndSet(false, true)) {
-      final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate = new ConcurrentHashMap<>();
+      final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate =
+          new ConcurrentHashMap<>();
       if (Prune.TRUE.equals(prune)) {
         for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
           taskNamesByReplicateRefUpdate.putAll(destination.getTaskNamesByReplicateRefUpdate());
@@ -229,7 +230,7 @@
             @Override
             public void onDone() {
               if (Prune.TRUE.equals(prune)) {
-                pruneNoLongerPending(taskNamesByReplicateRefUpdate.values());
+                pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values()));
               }
               replaying.set(false);
             }
@@ -242,7 +243,7 @@
     }
   }
 
-  private void pruneNoLongerPending(Collection<String> prunableTaskNames) {
+  private void pruneNoLongerPending(Set<String> prunableTaskNames) {
     // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes.
     // We also cannot access them by taskId since PushOnes don't have a taskId, they do have
     // an Id, but it is not the id assigned to the task in the queues. The tasks in the queue
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
index 871ed52..fa65803 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -140,12 +140,12 @@
   }
 
   private RefReplicationStatus getRefStatus(String project, String ref) {
-    if (!statusByProjectRef.contains(project, ref)) {
-      RefReplicationStatus refStatus = new RefReplicationStatus(project, ref);
+    RefReplicationStatus refStatus = statusByProjectRef.get(project, ref);
+    if (refStatus == null) {
+      refStatus = new RefReplicationStatus(project, ref);
       statusByProjectRef.put(project, ref, refStatus);
-      return refStatus;
     }
-    return statusByProjectRef.get(project, ref);
+    return refStatus;
   }
 
   public void waitForReplication() throws InterruptedException {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
index fa8b44c..f63df98 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
@@ -60,7 +60,6 @@
     }
 
     ReplicationState state = new ReplicationState(new CommandProcessing(this));
-    Future<?> future = null;
 
     ReplicationFilter projectFilter;
 
@@ -70,7 +69,8 @@
       projectFilter = new ReplicationFilter(projectPatterns);
     }
 
-    future = pushFactory.create(urlMatch, projectFilter, state, now).schedule(0, TimeUnit.SECONDS);
+    Future<?> future =
+        pushFactory.create(urlMatch, projectFilter, state, now).schedule(0, TimeUnit.SECONDS);
 
     if (wait) {
       if (future != null) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java
index b0c554e..c8347d1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java
@@ -16,22 +16,16 @@
 
 import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName;
 
-import com.google.gerrit.entities.Project;
-import com.google.gerrit.server.events.RefEvent;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
 import java.util.Objects;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.RemoteRefUpdate.Status;
 import org.eclipse.jgit.transport.URIish;
 
-public class RefReplicatedEvent extends RefEvent {
+public class RefReplicatedEvent extends RemoteRefReplicationEvent {
   public static final String TYPE = "ref-replicated";
 
-  public final String project;
-  public final String ref;
   @Deprecated public final String targetNode;
-  public final String targetUri;
-  public final String status;
   public final Status refStatus;
 
   public RefReplicatedEvent(
@@ -40,23 +34,9 @@
       URIish targetUri,
       RefPushResult status,
       RemoteRefUpdate.Status refStatus) {
-    super(TYPE);
-    this.project = project;
-    this.ref = ref;
+    super(TYPE, project, ref, targetUri, status.toString());
     this.targetNode = resolveNodeName(targetUri);
-    this.status = status.toString();
     this.refStatus = refStatus;
-    this.targetUri = targetUri.toASCIIString();
-  }
-
-  @Override
-  public Project.NameKey getProjectNameKey() {
-    return Project.nameKey(project);
-  }
-
-  @Override
-  public String getRefName() {
-    return ref;
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/RemoteRefReplicationEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RemoteRefReplicationEvent.java
new file mode 100644
index 0000000..3950c3f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RemoteRefReplicationEvent.java
@@ -0,0 +1,47 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.events;
+
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.events.RefEvent;
+import org.eclipse.jgit.transport.URIish;
+
+public class RemoteRefReplicationEvent extends RefEvent {
+
+  public final String project;
+  public final String ref;
+  public final String status;
+  public final String targetUri;
+
+  public RemoteRefReplicationEvent(
+      String type, String project, String ref, URIish targetUri, @Nullable String status) {
+    super(type);
+    this.project = project;
+    this.ref = ref;
+    this.status = status;
+    this.targetUri = targetUri.toASCIIString();
+  }
+
+  @Override
+  public Project.NameKey getProjectNameKey() {
+    return Project.nameKey(project);
+  }
+
+  @Override
+  public String getRefName() {
+    return ref;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java
index 4a1ade8..a952777 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java
@@ -17,23 +17,16 @@
 import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName;
 
 import com.google.gerrit.entities.Project;
-import com.google.gerrit.server.events.RefEvent;
 import org.eclipse.jgit.transport.URIish;
 
-public class ReplicationScheduledEvent extends RefEvent {
+public class ReplicationScheduledEvent extends RemoteRefReplicationEvent {
   public static final String TYPE = "ref-replication-scheduled";
 
-  public final String project;
-  public final String ref;
   @Deprecated public final String targetNode;
-  public final String targetUri;
 
   public ReplicationScheduledEvent(String project, String ref, URIish targetUri) {
-    super(TYPE);
-    this.project = project;
-    this.ref = ref;
+    super(TYPE, project, ref, targetUri, null);
     this.targetNode = resolveNodeName(targetUri);
-    this.targetUri = targetUri.toASCIIString();
   }
 
   @Override
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index ded9d84..deffe5e 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -16,11 +16,9 @@
 local path as replication target. This makes e.g. sense if a network
 share is mounted to which the repositories should be replicated.
 
-In multi-primary scenario, any replication work which is already
-in-flight or completed by the other nodes is not performed to
-avoid extra work. This is because, the storage for replication
-events is shared between multiple primaries.(The storage location
-is specified in the config using: `replication.eventsDirectory`).
+It is possible to
+[configure](config.md#configuring-cluster-replication) the plugin so
+that multiple primaries share the replication work approximately evenly.
 
 Replication of account data (NoteDb)
 ------------------------------------
@@ -30,7 +28,6 @@
 
 * `refs/users/*` (user branches)
 * `refs/meta/external-ids` (external IDs)
-* `refs/starred-changes/*` (star labels, not needed for Gerrit slaves)
+* `refs/starred-changes/*` (star labels, not needed for Gerrit replicas)
 * `refs/sequences/accounts` (account sequence numbers, not needed for Gerrit
-  slaves)
-
+  replicas)
diff --git a/src/main/resources/Documentation/cmd-list.md b/src/main/resources/Documentation/cmd-list.md
index b6688e0..e815e08 100644
--- a/src/main/resources/Documentation/cmd-list.md
+++ b/src/main/resources/Documentation/cmd-list.md
@@ -30,15 +30,15 @@
 -------
 
 `--remote <PATTERN>`
-:	Only print information for destinations whose remote name matches
-	the `PATTERN`.
+: Only print information for destinations whose remote name matches
+the `PATTERN`.
 
 `--detail`
-:	Print additional detailed information: AdminUrl, AuthGroup, Project
-	and queue (pending and in-flight).
+: Print additional detailed information: AdminUrl, AuthGroup, Project
+and queue (pending and in-flight).
 
 `--json`
-:	Output in json format.
+: Output in json format.
 
 EXAMPLES
 --------
diff --git a/src/main/resources/Documentation/cmd-start.md b/src/main/resources/Documentation/cmd-start.md
index e12ec92..1a77c72 100644
--- a/src/main/resources/Documentation/cmd-start.md
+++ b/src/main/resources/Documentation/cmd-start.md
@@ -129,10 +129,10 @@
   $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start documentation/*
 ```
 
-Replicate projects whose path includes a folder named `vendor` to host slave1:
+Replicate projects whose path includes a folder named `vendor` to host replica1:
 
 ```
-  $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --url slave1 ^(|.*/)vendor(|/.*)
+  $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --url replica1 ^(|.*/)vendor(|/.*)
 ```
 
 Replicate to only one specific destination URL:
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index af91032..d401030 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -18,9 +18,9 @@
   ssh-keygen -m PEM -t rsa -C "your_email@example.com"
 ```
 
-<a name="example_file">
+<a name="example_file"></a>
 Next, create `$site_path/etc/replication.config` as a Git-style config
-file, for example to replicate in parallel to four different hosts:</a>
+file, for example to replicate in parallel to four different hosts:
 
 ```
   [remote "host-one"]
@@ -46,6 +46,59 @@
 To manually trigger replication at runtime, see
 SSH command [start](cmd-start.md).
 
+<a name="configuring-cluster-replication"></a>
+Configuring Cluster Replication
+-------------------------------
+
+The replication plugin is designed to allow multiple primaries in a
+cluster to efficiently cooperate together via the replication event
+persistence subsystem. To enable this cooperation, the directory
+pointed to by the replication.eventsDirectory config key must reside on
+a shared filesystem, such as NFS. By default, simply pointing multiple
+primaries to the same eventsDirectory will enable some cooperation by
+preventing the same replication push from being duplicated by more
+than one primary.
+
+To further improve cooperation across the cluster, the
+replication.distributionInterval config value can be set. With
+distribution enabled, the replication queues for all the nodes sharing
+the same eventsDirectory will reflect approximately the same outstanding
+replication work (i.e. tasks waiting in the queue). Replication pushes
+which are running will continue to only be visible in the queue of the
+node on which the push is actually happening. This feature helps
+administrators get a cluster wide view of outstanding replication
+tasks, while allowing replication tasks triggered by one primary to be
+fulfilled by another node which is less busy.
+
+This enhanced replication work distribution allows the amount of
+replication work a cluster can handle to scale more evenly and linearly
+with the amount of primaries in the cluster. Adding more nodes to a
+cluster without distribution enabled will generally not allow the thread
+count per remote to be reduced without impacting service levels to those
+remotes. This is because without distribution, all events triggered by a
+node will only be fulfilled by the node which triggered the event, even
+if all the other nodes in the cluster are idle. This behavior implies
+that each node should be configured in a way that allows it alone to
+provide the level of service which each remote requires. However, with
+distribution enabled, it becomes possible to reduce the amount of
+replication threads configured per remote proportionally to the amount
+of nodes in the cluster, while maintaining the same approximate service
+level as before adding new nodes.
+
+Threads per remote reduction without service impacts is possible with
+distribution, because when configuring a node it can be expected that
+other nodes will pick up some of the work it triggers. Then the node no
+longer needs to be configured as if it were the only node in the
+cluster. For example, if a remote requires 6 threads with one node to
+achieve acceptable service, it should only take 2 threads on 3
+equivalently powered nodes to provide the same service level with
+distribution enabled. Scaling down such thread requirements per remote
+results in a reduced memory footprint per remote on each node in the
+cluster. This enables the nodes in the cluster to now scale to handle
+more remotes with the approximate same service level than without
+distribution. The amount of extra supported remotes then also scales
+approximately linearly with the extra nodes in a cluster.
+
 File `replication.config`
 -------------------------
 
@@ -95,6 +148,20 @@
 	`(retry 1) push aaa.com:/git/test.git [refs/heads/b1 refs/heads/b2 (+2)]`
 
 
+gerrit.pushBatchSize
+:	Max number of refs that are pushed in a single push operation. If more
+	than pushBatchSize are to be pushed then they are divided into batches
+	and pushed sequentially one-by-one.
+
+	Can be overridden at remote-level by setting pushBatchSize.
+
+	By default, `0`, which means that there are no limitations on number of
+	refs to be transferred in a single push operation. Note that negative
+	values are treated as `0`.
+
+	Note that `pushBatchSize` is ignored when *Cluster Replication* is configured
+	- when `replication.distributionInterval` has value > 0.
+
 gerrit.sshCommandTimeout
 :	Timeout for SSH command execution. If 0, there is no timeout and
 	the client waits indefinitely. By default, 0.
@@ -507,6 +574,19 @@
 
 	default: 15 minutes
 
+remote.NAME.pushBatchSize
+:	Max number of refs that are pushed in a single push operation to this
+	destination. If more than `pushBatchSize` are to be pushed then they are
+	divided into batches and pushed sequentially one-by-one.
+
+	By default it falls back to `gerrit.pushBatchSize` value (which is `0` if
+	not set, which means that there are no limitations on number of refs to
+	be transferred in a single push operation). Note that negative values are
+	treated as `0`.
+
+	Note that `pushBatchSize` is ignored when *Cluster Replication* is configured
+	- when `replication.distributionInterval` has value > 0.
+
 Directory `replication`
 --------------------
 The optional directory `$site_path/etc/replication` contains Git-style
diff --git a/src/main/resources/Documentation/extension-point.md b/src/main/resources/Documentation/extension-point.md
index 076aded..83e7e45 100644
--- a/src/main/resources/Documentation/extension-point.md
+++ b/src/main/resources/Documentation/extension-point.md
@@ -2,7 +2,7 @@
 ==============
 
 The replication plugin exposes an extension point to allow influencing its behaviour from another plugin or a script.
-Extension points can be defined from the replication plugin only when it is loaded as [libModule](/config-gerrit.html#gerrit.installModule) and
+Extension points can be defined from the replication plugin only when it is loaded as [libModule](../../../Documentation/config-gerrit.html#gerrit.installModule) and
 implemented by another plugin by declaring a `provided` dependency from the replication plugin.
 
 ### Install extension libModule
diff --git a/src/main/resources/Documentation/metrics.md b/src/main/resources/Documentation/metrics.md
index ef8b150..db11d23 100644
--- a/src/main/resources/Documentation/metrics.md
+++ b/src/main/resources/Documentation/metrics.md
@@ -1,4 +1,5 @@
-# Metrics
+Metrics
+=======
 
 Some metrics are emitted when replication occurs to a remote destination.
 The granularity of the metrics recorded is at destination level, however when a particular project replication is flagged
@@ -7,17 +8,21 @@
 The reason only slow metrics are published, rather than all, is to contain their number, which, on a big Gerrit installation
 could potentially be considerably big.
 
-### Project level
+Project level
+-------------
 
 * plugins_replication_latency_slower_than_<threshold>_<destinationName>_<ProjectName> - Time spent pushing <ProjectName> to remote <destinationName> (in ms)
 
-### Destination level
+Destination level
+-----------------
 
 * plugins_replication_replication_delay_<destinationName> - Time spent waiting before pushing to remote <destinationName> (in ms)
 * plugins_replication_replication_retries_<destinationName> - Number of retries when pushing to remote <destinationName>
 * plugins_replication_replication_latency_<destinationName> - Time spent pushing to remote <destinationName> (in ms)
 
-### Example
+Example
+-------
+
 ```
 # HELP plugins_replication_replication_delay_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_delay/destination, type=com.codahale.metrics.Histogram)
 # TYPE plugins_replication_replication_delay_destination summary
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
index e7339d9..725052c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
@@ -82,7 +82,7 @@
   }
 
   private Provider<ReplicationConfig> newVersionConfigProvider() {
-    return new Provider<ReplicationConfig>() {
+    return new Provider<>() {
       @Override
       public ReplicationConfig get() {
         return new ReplicationFileBasedConfig(sitePaths, sitePaths.data_dir) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
index 90191f2..242922d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
@@ -19,9 +19,9 @@
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 import com.google.common.collect.ForwardingIterator;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -367,7 +367,7 @@
     WaitingRunner runner = new WaitingRunner();
 
     int batchSize = 5; // how many tasks are started concurrently
-    Queue<CountDownLatch> batches = new LinkedList<>();
+    Queue<CountDownLatch> batches = new ArrayDeque<>();
     for (int b = 0; b < blockSize; b++) {
       batches.add(executeWaitingRunnableBatch(batchSize, executor));
     }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java
new file mode 100644
index 0000000..646f915
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java
@@ -0,0 +1,101 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DestinationConfigurationTest {
+  private static final String REMOTE = "foo";
+
+  @Mock private RemoteConfig remoteConfigMock;
+  @Mock private Config cfgMock;
+
+  private DestinationConfiguration objectUnderTest;
+
+  @Before
+  public void setUp() {
+    when(remoteConfigMock.getName()).thenReturn(REMOTE);
+    when(cfgMock.getStringList(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+        .thenReturn(new String[] {});
+    objectUnderTest = new DestinationConfiguration(remoteConfigMock, cfgMock);
+  }
+
+  @Test
+  public void shouldIgnoreRemotePushBatchSizeWhenClusterReplicationIsConfigured() {
+    // given
+    when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", 0)).thenReturn(1);
+    when(cfgMock.getInt("replication", "distributionInterval", 0)).thenReturn(1);
+
+    // when
+    int actual = objectUnderTest.getPushBatchSize();
+
+    // then
+    assertThat(actual).isEqualTo(0);
+  }
+
+  @Test
+  public void shouldIgnoreGlobalPushBatchSizeWhenClusterReplicationIsConfigured() {
+    // given
+    int globalPushBatchSize = 1;
+    when(cfgMock.getInt("gerrit", "pushBatchSize", 0)).thenReturn(globalPushBatchSize);
+    when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", globalPushBatchSize))
+        .thenReturn(globalPushBatchSize);
+    when(cfgMock.getInt("replication", "distributionInterval", 0)).thenReturn(1);
+
+    // when
+    int actual = objectUnderTest.getPushBatchSize();
+
+    // then
+    assertThat(actual).isEqualTo(0);
+  }
+
+  @Test
+  public void shouldReturnRemotePushBatchSizeWhenClusterReplicationIsNotConfigured() {
+    // given
+    when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", 0)).thenReturn(1);
+
+    // when
+    int actual = objectUnderTest.getPushBatchSize();
+
+    // then
+    assertThat(actual).isEqualTo(1);
+  }
+
+  @Test
+  public void shouldReturnGlobalPushBatchSizeWhenClusterReplicationIsNotConfigured() {
+    // given
+    int globalPushBatchSize = 1;
+    when(cfgMock.getInt("gerrit", "pushBatchSize", 0)).thenReturn(globalPushBatchSize);
+    when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", globalPushBatchSize))
+        .thenReturn(globalPushBatchSize);
+
+    // when
+    int actual = objectUnderTest.getPushBatchSize();
+
+    // then
+    assertThat(actual).isEqualTo(globalPushBatchSize);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
index a1f61fe..5e6bde8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
@@ -65,6 +65,7 @@
           return implementedByOverrider;
         }
       } catch (NoSuchMethodException | SecurityException e) {
+        return null;
       }
       return null;
     }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index 94f0dc4..cfe9002 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -17,8 +17,10 @@
 import static org.eclipse.jgit.lib.Ref.Storage.NEW;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -36,6 +38,7 @@
 import com.google.gerrit.server.util.IdGenerator;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -115,7 +118,8 @@
         new ObjectIdRef.Unpeeled(
             NEW, "foo", ObjectId.fromString("0000000000000000000000000000000000000001"));
 
-    localRefs = Arrays.asList(newLocalRef);
+    localRefs = new ArrayList<>();
+    localRefs.add(newLocalRef);
 
     Ref remoteRef = new ObjectIdRef.Unpeeled(NEW, "foo", ObjectId.zeroId());
     remoteRefs = new HashMap<>();
@@ -223,6 +227,52 @@
     verify(transportMock, never()).push(any(), any());
   }
 
+  @Test
+  public void shouldPushInSingleOperationWhenPushBatchSizeIsNotConfigured()
+      throws InterruptedException, IOException {
+    replicateTwoRefs(createPushOne(null));
+    verify(transportMock).push(any(), any());
+  }
+
+  @Test
+  public void shouldPushInBatchesWhenPushBatchSizeIsConfigured()
+      throws InterruptedException, IOException {
+    when(destinationMock.getPushBatchSize()).thenReturn(1);
+    replicateTwoRefs(createPushOne(null));
+    verify(transportMock, times(2)).push(any(), any());
+  }
+
+  @Test
+  public void shouldStopPushingInBatchesWhenPushOperationGetsCanceled()
+      throws InterruptedException, IOException {
+    when(destinationMock.getPushBatchSize()).thenReturn(1);
+    PushOne pushOne = createPushOne(null);
+
+    // cancel replication during the first push
+    doAnswer(
+            invocation -> {
+              pushOne.setCanceledWhileRunning();
+              return new PushResult();
+            })
+        .when(transportMock)
+        .push(any(), any());
+
+    replicateTwoRefs(pushOne);
+    verify(transportMock, times(1)).push(any(), any());
+  }
+
+  private void replicateTwoRefs(PushOne pushOne) throws InterruptedException {
+    ObjectIdRef barLocalRef =
+        new ObjectIdRef.Unpeeled(
+            NEW, "bar", ObjectId.fromString("0000000000000000000000000000000000000001"));
+    localRefs.add(barLocalRef);
+
+    pushOne.addRef(PushOne.ALL_REFS);
+    pushOne.run();
+
+    isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
+  }
+
   private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
     PushOne push =
         new PushOne(
@@ -272,7 +322,7 @@
               @Override
               public Callable<Object> answer(InvocationOnMock invocation) throws Throwable {
                 Callable<Object> originalCall = (Callable<Object>) invocation.getArguments()[0];
-                return new Callable<Object>() {
+                return new Callable<>() {
 
                   @Override
                   public Object call() throws Exception {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
index 9606371..b6a3ed1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -91,6 +91,18 @@
   }
 
   protected void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project, Integer pushBatchSize)
+      throws IOException {
+    setReplicationDestination(
+        remoteName,
+        Arrays.asList(replicaSuffix),
+        project,
+        TEST_REPLICATION_DELAY_SECONDS,
+        false,
+        Optional.ofNullable(pushBatchSize));
+  }
+
+  protected void setReplicationDestination(
       String remoteName, String replicaSuffix, Optional<String> project, boolean mirror)
       throws IOException {
     setReplicationDestination(
@@ -125,13 +137,37 @@
 
   protected void setReplicationDestination(
       String remoteName,
+      String replicaSuffix,
+      Optional<String> project,
+      int replicationDelay,
+      boolean mirror,
+      Optional<Integer> pushBatchSize)
+      throws IOException {
+    setReplicationDestination(
+        remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror, pushBatchSize);
+  }
+
+  protected void setReplicationDestination(
+      String remoteName,
       List<String> replicaSuffixes,
       Optional<String> project,
       int replicationDelay,
       boolean mirror)
       throws IOException {
     setReplicationDestination(
-        config, remoteName, replicaSuffixes, project, replicationDelay, mirror);
+        remoteName, replicaSuffixes, project, replicationDelay, mirror, Optional.empty());
+  }
+
+  protected void setReplicationDestination(
+      String remoteName,
+      List<String> replicaSuffixes,
+      Optional<String> project,
+      int replicationDelay,
+      boolean mirror,
+      Optional<Integer> pushBatchSize)
+      throws IOException {
+    setReplicationDestination(
+        config, remoteName, replicaSuffixes, project, replicationDelay, mirror, pushBatchSize);
     config.setBoolean("gerrit", null, "autoReload", true);
     config.save();
   }
@@ -142,7 +178,8 @@
       Optional<String> project,
       int replicationDelay)
       throws IOException {
-    setReplicationDestination(config, null, replicaSuffixes, project, replicationDelay, false);
+    setReplicationDestination(
+        config, null, replicaSuffixes, project, replicationDelay, false, Optional.empty());
   }
 
   protected void setReplicationDestination(
@@ -151,7 +188,8 @@
       List<String> replicaSuffixes,
       Optional<String> project,
       int replicationDelay,
-      boolean mirror)
+      boolean mirror,
+      Optional<Integer> pushBatchSize)
       throws IOException {
 
     List<String> replicaUrls =
@@ -163,6 +201,7 @@
     remoteConfig.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY_MINUTES);
     remoteConfig.setBoolean("remote", remoteName, "mirror", mirror);
     project.ifPresent(prj -> remoteConfig.setString("remote", remoteName, "projects", prj));
+    pushBatchSize.ifPresent(pbs -> remoteConfig.setInt("remote", remoteName, "pushBatchSize", pbs));
     remoteConfig.save();
   }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
index 5ade68d..3d0dfc1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
@@ -19,6 +19,7 @@
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.WaitUtil;
+import com.google.gerrit.entities.BranchNameKey;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.git.WorkQueue;
 import java.time.Duration;
@@ -89,8 +90,9 @@
     reloadConfig();
 
     String newBranch = "refs/heads/foo_branch";
-    createBranch(project, "refs/heads/master", newBranch);
+    createBranch(BranchNameKey.create(project, newBranch));
 
+    assertThat(listWaitingReplicationTasks(newBranch)).hasSize(1);
     deleteWaitingReplicationTasks(newBranch); // This simulates the work being started by other node
 
     assertThat(waitForProjectTaskCount(0, Duration.ofSeconds(TEST_DISTRIBUTION_CYCLE_SECONDS)))
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
index 2d69a47..b32829c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertTrue;
 
 import com.google.common.base.Objects;
 import com.google.gerrit.acceptance.PushOneCommit.Result;
@@ -42,11 +43,13 @@
 import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
 import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
 import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
+import java.net.URISyntaxException;
 import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.URIish;
 import org.junit.Before;
 import org.junit.Test;
@@ -308,6 +311,34 @@
     assertThat(origEvent).isEqualTo(gotEvent);
   }
 
+  @Test
+  public void shouldSerializeRefReplicatedEvent() throws URISyntaxException {
+    RefReplicatedEvent origEvent =
+        new RefReplicatedEvent(
+            project.get(),
+            "refs/heads/master",
+            new URIish(String.format("git://someHost/%s.git", project.get())),
+            ReplicationState.RefPushResult.SUCCEEDED,
+            RemoteRefUpdate.Status.OK);
+
+    assertThat(origEvent)
+        .isEqualTo(eventGson.fromJson(eventGson.toJson(origEvent), RefReplicatedEvent.class));
+  }
+
+  @Test
+  public void shouldSerializeReplicationScheduledEvent() throws URISyntaxException {
+    ReplicationScheduledEvent origEvent =
+        new ReplicationScheduledEvent(
+            project.get(),
+            "refs/heads/master",
+            new URIish(String.format("git://someHost/%s.git", project.get())));
+
+    assertTrue(
+        equals(
+            origEvent,
+            eventGson.fromJson(eventGson.toJson(origEvent), ReplicationScheduledEvent.class)));
+  }
+
   private <T extends RefEvent> void waitForRefEvent(Supplier<List<T>> events, String refName)
       throws InterruptedException {
     WaitUtil.waitUntil(
@@ -352,4 +383,25 @@
       return Objects.hashCode(event);
     }
   }
+
+  @SuppressWarnings("deprecation")
+  private boolean equals(ReplicationScheduledEvent scheduledEvent, Object other) {
+    if (!(other instanceof ReplicationScheduledEvent)) {
+      return false;
+    }
+    ReplicationScheduledEvent event = (ReplicationScheduledEvent) other;
+    if (!Objects.equal(event.project, scheduledEvent.project)) {
+      return false;
+    }
+    if (!Objects.equal(event.ref, scheduledEvent.ref)) {
+      return false;
+    }
+    if (!Objects.equal(event.targetUri, scheduledEvent.targetUri)) {
+      return false;
+    }
+    if (!Objects.equal(event.status, scheduledEvent.status)) {
+      return false;
+    }
+    return Objects.equal(event.targetNode, scheduledEvent.targetNode);
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 17c8933..eb2b999 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -426,6 +426,28 @@
     }
   }
 
+  @Test
+  public void shouldReplicateWithPushBatchSizeSetForRemote() throws Exception {
+    Project.NameKey targetProject = createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, 1);
+    reloadConfig();
+
+    // creating a change results in 2 refs creation therefore it already qualifies for push in two
+    // batches of size 1 each
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().refName();
+
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      WaitUtil.waitUntil(() -> checkedGetRef(repo, sourceRef) != null, Duration.ofSeconds(60));
+
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
   private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
     WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java
new file mode 100644
index 0000000..cf8dbe3
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java
@@ -0,0 +1,68 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.WaitUtil;
+import com.google.gerrit.entities.Project;
+import java.time.Duration;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.junit.Test;
+
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationPushInBatchesIT extends ReplicationDaemon {
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    initConfig();
+    config.setInt("gerrit", null, "pushBatchSize", 1);
+    config.save();
+    setReplicationDestination(
+        "remote1",
+        "suffix1",
+        Optional.of("not-used-project")); // Simulates a full replication.config initialization
+    super.setUpTestPlugin();
+  }
+
+  @Test
+  public void shouldReplicateWithPushBatchSizeSetGlobaly() throws Exception {
+    Project.NameKey targetProject = createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    // creating a change results in 2 refs creation therefore it already qualifies for push in two
+    // batches of size 1 each
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().refName();
+
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      WaitUtil.waitUntil(() -> checkedGetRef(repo, sourceRef) != null, Duration.ofSeconds(60));
+
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
index 901200b..5881ea7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
@@ -22,14 +22,14 @@
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.events.ProjectEvent;
 import com.google.gerrit.server.events.RefEvent;
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
 public class TestDispatcher implements EventDispatcher {
-  private final List<ProjectEvent> projectEvents = new LinkedList<>();
-  private final List<RefEvent> refEvents = new LinkedList<>();
-  private final List<Event> events = new LinkedList<>();
+  private final List<ProjectEvent> projectEvents = new ArrayList<>();
+  private final List<RefEvent> refEvents = new ArrayList<>();
+  private final List<Event> events = new ArrayList<>();
 
   @Override
   public void postEvent(Change change, ChangeEvent event) {} // Not used in replication
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
index f61114e..080f279 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableSet;
 import com.google.gerrit.entities.Project;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.net.URISyntaxException;
@@ -34,7 +35,7 @@
 
   public static TestUriUpdates create(
       Project.NameKey project, URIish uri, String remote, Set<String> refs) {
-    return new AutoValue_TestUriUpdates(project, uri, remote, refs);
+    return new AutoValue_TestUriUpdates(project, uri, remote, ImmutableSet.copyOf(refs));
   }
 
   @Override
@@ -47,5 +48,5 @@
   public abstract String getRemoteName();
 
   @Override
-  public abstract Set<String> getRefs();
+  public abstract ImmutableSet<String> getRefs();
 }