Merge "Move HttpClientProvider to the ReplicationModule"
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
index 89b97e9..c61a123 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
@@ -85,7 +85,7 @@
       super(
           threadPool,
           stream.iterator(),
-          new ForwardingRunner<T>(runner) {
+          new ForwardingRunner<>(runner) {
             @Override
             public void onDone() {
               stream.close();
@@ -109,7 +109,7 @@
       try {
         runner.run(item);
       } catch (RuntimeException e) { // catch to prevent chain from breaking
-        logger.atSevere().withCause(e).log("Error while running: " + item);
+        logger.atSevere().withCause(e).log("Error while running: %s", item);
       }
       if (!scheduledNext) {
         runner.onDone();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 151e584..e5f125b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -270,6 +270,8 @@
   }
 
   private void foreachPushOp(Map<URIish, PushOne> opsMap, Function<PushOne, Void> pushOneFunction) {
+    // Callers may modify the provided opsMap concurrently, hence make a defensive copy of the
+    // values to loop over them.
     for (PushOne pushOne : ImmutableList.copyOf(opsMap.values())) {
       pushOneFunction.apply(pushOne);
     }
@@ -453,6 +455,7 @@
     synchronized (stateLock) {
       URIish uri = pushOp.getURI();
       pending.remove(uri);
+      pushOp.notifyNotAttempted(pushOp.getRefs());
     }
   }
 
@@ -629,7 +632,7 @@
       return true;
     }
 
-    boolean matches = (new ReplicationFilter(projects)).matches(project);
+    boolean matches = new ReplicationFilter(projects).matches(project);
     if (!matches) {
       repLog.atFine().log(
           "Skipping replication of project %s; does not match filter", project.get());
@@ -769,6 +772,10 @@
     return config.getSlowLatencyThreshold();
   }
 
+  int getPushBatchSize() {
+    return config.getPushBatchSize();
+  }
+
   private static boolean matches(URIish uri, String urlMatch) {
     if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
       return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index 1b39374..a74d198 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -14,10 +14,14 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.google.common.base.Suppliers.memoize;
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.gerrit.server.config.ConfigUtil;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.transport.RemoteConfig;
 
@@ -45,6 +49,7 @@
   private final RemoteConfig remoteConfig;
   private final int maxRetries;
   private final int slowLatencyThreshold;
+  private final Supplier<Integer> pushBatchSize;
 
   protected DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) {
     this.remoteConfig = remoteConfig;
@@ -84,6 +89,31 @@
                 "slowLatencyThreshold",
                 DEFAULT_SLOW_LATENCY_THRESHOLD_SECS,
                 TimeUnit.SECONDS);
+
+    pushBatchSize =
+        memoize(
+            () -> {
+              int configuredBatchSize =
+                  Math.max(
+                      0,
+                      getInt(
+                          remoteConfig,
+                          cfg,
+                          "pushBatchSize",
+                          cfg.getInt("gerrit", "pushBatchSize", 0)));
+              if (configuredBatchSize > 0) {
+                int distributionInterval = cfg.getInt("replication", "distributionInterval", 0);
+                if (distributionInterval > 0) {
+                  repLog.atWarning().log(
+                      "Push in batches cannot be turned on for remote (%s) when 'Cluster"
+                          + " Replication' (replication.distributionInterval) is configured",
+                      name);
+                  return 0;
+                }
+                return configuredBatchSize;
+              }
+              return 0;
+            });
   }
 
   @Override
@@ -173,4 +203,9 @@
   public int getSlowLatencyThreshold() {
     return slowLatencyThreshold;
   }
+
+  @Override
+  public int getPushBatchSize() {
+    return pushBatchSize.get();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 87c35ee..8afcf9b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -19,12 +19,14 @@
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Throwables;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
@@ -49,7 +51,6 @@
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
-import com.jcraft.jsch.JSchException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -453,10 +454,7 @@
     } catch (NotSupportedException e) {
       stateLog.error("Cannot replicate to " + uri, e, getStatesAsArray());
     } catch (TransportException e) {
-      Throwable cause = e.getCause();
-      if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) {
-        repLog.atSevere().log("Cannot replicate to %s: %s", uri, cause.getMessage());
-      } else if (e instanceof UpdateRefFailureException) {
+      if (e instanceof UpdateRefFailureException) {
         updateRefRetryCount++;
         repLog.atSevere().log("Cannot replicate to %s due to a lock or write ref failure", uri);
 
@@ -564,7 +562,36 @@
           lazy(() -> refUpdatesForLogging(todo.subList(0, replConfig.getMaxRefsToLog()))));
     }
 
-    return tn.push(NullProgressMonitor.INSTANCE, todo);
+    return pushInBatches(tn, todo);
+  }
+
+  private PushResult pushInBatches(Transport tn, List<RemoteRefUpdate> todo)
+      throws NotSupportedException, TransportException {
+    int batchSize = pool.getPushBatchSize();
+    if (batchSize == 0 || todo.size() <= batchSize) {
+      return tn.push(NullProgressMonitor.INSTANCE, todo);
+    }
+
+    List<List<RemoteRefUpdate>> batches = Lists.partition(todo, batchSize);
+    repLog.atInfo().log("Push to %s in %d batches", uri, batches.size());
+    AggregatedPushResult result = new AggregatedPushResult();
+    int completedBatch = 1;
+    for (List<RemoteRefUpdate> batch : batches) {
+      repLog.atInfo().log(
+          "Pushing %d/%d batches for replication to %s", completedBatch, batches.size(), uri);
+      result.addResult(tn.push(NullProgressMonitor.INSTANCE, batch));
+
+      //  check if push should be no longer continued
+      if (wasCanceled()) {
+        repLog.atInfo().log(
+            "Push for replication to %s was canceled after %d completed batch and thus won't be"
+                + " rescheduled",
+            uri, completedBatch);
+        break;
+      }
+      completedBatch++;
+    }
+    return result;
   }
 
   private static String refUpdatesForLogging(List<RemoteRefUpdate> refUpdates) {
@@ -836,4 +863,24 @@
       super(uri, message);
     }
   }
+
+  /**
+   * Internal class used to aggregate PushResult objects from all push batches. See {@link
+   * PushOne#pushInBatches} for usage.
+   */
+  private static class AggregatedPushResult extends PushResult {
+    private final List<PushResult> results = new ArrayList<>();
+
+    void addResult(PushResult result) {
+      results.add(result);
+    }
+
+    @Override
+    public Collection<RemoteRefUpdate> getRemoteUpdates() {
+      return results.stream()
+          .map(PushResult::getRemoteUpdates)
+          .flatMap(Collection::stream)
+          .collect(toList());
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
index 4f33937..bac599f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -30,7 +30,15 @@
 public interface PushResultProcessing {
   public static final PushResultProcessing NO_OP = new PushResultProcessing() {};
 
-  /** Invoked when a ref has been replicated to one node. */
+  /**
+   * Invoked when a ref has been replicated to one node.
+   *
+   * @param project the project name
+   * @param ref the ref name
+   * @param uri the URI
+   * @param status the status of the push
+   * @param refStatus the status for the ref
+   */
   default void onRefReplicatedToOneNode(
       String project,
       String ref,
@@ -38,10 +46,20 @@
       RefPushResult status,
       RemoteRefUpdate.Status refStatus) {}
 
-  /** Invoked when a ref has been replicated to all nodes */
+  /**
+   * Invoked when a ref has been replicated to all nodes.
+   *
+   * @param project the project name
+   * @param ref the ref name
+   * @param nodesCount the number of nodes
+   */
   default void onRefReplicatedToAllNodes(String project, String ref, int nodesCount) {}
 
-  /** Invoked when all refs have been replicated to all nodes */
+  /**
+   * Invoked when all refs have been replicated to all nodes.
+   *
+   * @param totalPushTasksCount total number of push tasks
+   */
   default void onAllRefsReplicatedToAllNodes(int totalPushTasksCount) {}
 
   /**
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
index b66e73c..5fe0323 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
@@ -98,6 +98,13 @@
   int getSlowLatencyThreshold();
 
   /**
+   * Returns the maximum number of refs that can be pushed in a single push operation.
+   *
+   * @return batch size, zero if unlimited.
+   */
+  int getPushBatchSize();
+
+  /**
    * Whether the remote configuration is for a single project only
    *
    * @return true, when configuration is for a single project, false otherwise
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 5458b6c..2424e71 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -72,8 +72,8 @@
   }
 
   /**
-   * See
-   * {@link com.googlesource.gerrit.plugins.replication.ReplicationConfig#isReplicateAllOnPluginStart()}
+   * See {@link
+   * com.googlesource.gerrit.plugins.replication.ReplicationConfig#isReplicateAllOnPluginStart()}
    */
   @Override
   public boolean isReplicateAllOnPluginStart() {
@@ -81,8 +81,8 @@
   }
 
   /**
-   * See
-   * {@link com.googlesource.gerrit.plugins.replication.ReplicationConfig#isDefaultForceUpdate()}
+   * See {@link
+   * com.googlesource.gerrit.plugins.replication.ReplicationConfig#isDefaultForceUpdate()}
    */
   @Override
   public boolean isDefaultForceUpdate() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 5310c14..791b387 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -201,7 +201,8 @@
 
   private void synchronizePendingEvents(Prune prune) {
     if (replaying.compareAndSet(false, true)) {
-      final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate = new ConcurrentHashMap<>();
+      final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate =
+          new ConcurrentHashMap<>();
       if (Prune.TRUE.equals(prune)) {
         for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
           taskNamesByReplicateRefUpdate.putAll(destination.getTaskNamesByReplicateRefUpdate());
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index d216366..deffe5e 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -17,7 +17,7 @@
 share is mounted to which the repositories should be replicated.
 
 It is possible to
-[configure](config.html#configuring-cluster-replication) the plugin so
+[configure](config.md#configuring-cluster-replication) the plugin so
 that multiple primaries share the replication work approximately evenly.
 
 Replication of account data (NoteDb)
@@ -28,7 +28,6 @@
 
 * `refs/users/*` (user branches)
 * `refs/meta/external-ids` (external IDs)
-* `refs/starred-changes/*` (star labels, not needed for Gerrit slaves)
+* `refs/starred-changes/*` (star labels, not needed for Gerrit replicas)
 * `refs/sequences/accounts` (account sequence numbers, not needed for Gerrit
-  slaves)
-
+  replicas)
diff --git a/src/main/resources/Documentation/cmd-list.md b/src/main/resources/Documentation/cmd-list.md
index b6688e0..e815e08 100644
--- a/src/main/resources/Documentation/cmd-list.md
+++ b/src/main/resources/Documentation/cmd-list.md
@@ -30,15 +30,15 @@
 -------
 
 `--remote <PATTERN>`
-:	Only print information for destinations whose remote name matches
-	the `PATTERN`.
+: Only print information for destinations whose remote name matches
+the `PATTERN`.
 
 `--detail`
-:	Print additional detailed information: AdminUrl, AuthGroup, Project
-	and queue (pending and in-flight).
+: Print additional detailed information: AdminUrl, AuthGroup, Project
+and queue (pending and in-flight).
 
 `--json`
-:	Output in json format.
+: Output in json format.
 
 EXAMPLES
 --------
diff --git a/src/main/resources/Documentation/cmd-start.md b/src/main/resources/Documentation/cmd-start.md
index e12ec92..1a77c72 100644
--- a/src/main/resources/Documentation/cmd-start.md
+++ b/src/main/resources/Documentation/cmd-start.md
@@ -129,10 +129,10 @@
   $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start documentation/*
 ```
 
-Replicate projects whose path includes a folder named `vendor` to host slave1:
+Replicate projects whose path includes a folder named `vendor` to host replica1:
 
 ```
-  $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --url slave1 ^(|.*/)vendor(|/.*)
+  $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --url replica1 ^(|.*/)vendor(|/.*)
 ```
 
 Replicate to only one specific destination URL:
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 7fe9e15..d401030 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -18,9 +18,9 @@
   ssh-keygen -m PEM -t rsa -C "your_email@example.com"
 ```
 
-<a name="example_file">
+<a name="example_file"></a>
 Next, create `$site_path/etc/replication.config` as a Git-style config
-file, for example to replicate in parallel to four different hosts:</a>
+file, for example to replicate in parallel to four different hosts:
 
 ```
   [remote "host-one"]
@@ -148,6 +148,20 @@
 	`(retry 1) push aaa.com:/git/test.git [refs/heads/b1 refs/heads/b2 (+2)]`
 
 
+gerrit.pushBatchSize
+:	Max number of refs that are pushed in a single push operation. If more
+	than pushBatchSize are to be pushed then they are divided into batches
+	and pushed sequentially one-by-one.
+
+	Can be overridden at remote-level by setting pushBatchSize.
+
+	By default, `0`, which means that there are no limitations on number of
+	refs to be transferred in a single push operation. Note that negative
+	values are treated as `0`.
+
+	Note that `pushBatchSize` is ignored when *Cluster Replication* is configured
+	- when `replication.distributionInterval` has value > 0.
+
 gerrit.sshCommandTimeout
 :	Timeout for SSH command execution. If 0, there is no timeout and
 	the client waits indefinitely. By default, 0.
@@ -496,7 +510,8 @@
 remote.NAME.remoteNameStyle
 :	Provides possibilities to influence the name of the target
 	repository, e.g. by replacing slashes in the `${name}`
-	placeholder.
+	placeholder, when the target remote repository is not served
+	by Gerrit.
 
 	Github and Gitorious do not permit slashes "/" in repository
 	names and will change them to dashes "-" at repository creation
@@ -511,6 +526,14 @@
 	Gerrit server, e.g. `${name}` of `foo/bar/my-repo.git` would
 	be `my-repo`.
 
+	> **NOTE**: The use of repository name translation using `remoteNameStyle`
+	> may lead to dangerous situations if there are multiple repositories
+	> that may be mapped to the same target name. For instance when
+	> mapping `/foo/my-repo.git` to `my-repo` using "basenameOnly"
+	> would also map `/bar/my-repo.git` to the same `my-repo` leading
+	> to conflicts where commits can be lost between the two repositories
+	> replicating to the same target `my-repo`.
+
 	By default, "slash", i.e. remote names will contain slashes as
 	they do in Gerrit.
 
@@ -551,6 +574,19 @@
 
 	default: 15 minutes
 
+remote.NAME.pushBatchSize
+:	Max number of refs that are pushed in a single push operation to this
+	destination. If more than `pushBatchSize` are to be pushed then they are
+	divided into batches and pushed sequentially one-by-one.
+
+	By default it falls back to `gerrit.pushBatchSize` value (which is `0` if
+	not set, which means that there are no limitations on number of refs to
+	be transferred in a single push operation). Note that negative values are
+	treated as `0`.
+
+	Note that `pushBatchSize` is ignored when *Cluster Replication* is configured
+	- when `replication.distributionInterval` has value > 0.
+
 Directory `replication`
 --------------------
 The optional directory `$site_path/etc/replication` contains Git-style
diff --git a/src/main/resources/Documentation/extension-point.md b/src/main/resources/Documentation/extension-point.md
index 076aded..83e7e45 100644
--- a/src/main/resources/Documentation/extension-point.md
+++ b/src/main/resources/Documentation/extension-point.md
@@ -2,7 +2,7 @@
 ==============
 
 The replication plugin exposes an extension point to allow influencing its behaviour from another plugin or a script.
-Extension points can be defined from the replication plugin only when it is loaded as [libModule](/config-gerrit.html#gerrit.installModule) and
+Extension points can be defined from the replication plugin only when it is loaded as [libModule](../../../Documentation/config-gerrit.html#gerrit.installModule) and
 implemented by another plugin by declaring a `provided` dependency from the replication plugin.
 
 ### Install extension libModule
diff --git a/src/main/resources/Documentation/metrics.md b/src/main/resources/Documentation/metrics.md
index ef8b150..db11d23 100644
--- a/src/main/resources/Documentation/metrics.md
+++ b/src/main/resources/Documentation/metrics.md
@@ -1,4 +1,5 @@
-# Metrics
+Metrics
+=======
 
 Some metrics are emitted when replication occurs to a remote destination.
 The granularity of the metrics recorded is at destination level, however when a particular project replication is flagged
@@ -7,17 +8,21 @@
 The reason only slow metrics are published, rather than all, is to contain their number, which, on a big Gerrit installation
 could potentially be considerably big.
 
-### Project level
+Project level
+-------------
 
 * plugins_replication_latency_slower_than_<threshold>_<destinationName>_<ProjectName> - Time spent pushing <ProjectName> to remote <destinationName> (in ms)
 
-### Destination level
+Destination level
+-----------------
 
 * plugins_replication_replication_delay_<destinationName> - Time spent waiting before pushing to remote <destinationName> (in ms)
 * plugins_replication_replication_retries_<destinationName> - Number of retries when pushing to remote <destinationName>
 * plugins_replication_replication_latency_<destinationName> - Time spent pushing to remote <destinationName> (in ms)
 
-### Example
+Example
+-------
+
 ```
 # HELP plugins_replication_replication_delay_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_delay/destination, type=com.codahale.metrics.Histogram)
 # TYPE plugins_replication_replication_delay_destination summary
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
index e7339d9..725052c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
@@ -82,7 +82,7 @@
   }
 
   private Provider<ReplicationConfig> newVersionConfigProvider() {
-    return new Provider<ReplicationConfig>() {
+    return new Provider<>() {
       @Override
       public ReplicationConfig get() {
         return new ReplicationFileBasedConfig(sitePaths, sitePaths.data_dir) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
index 90191f2..242922d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
@@ -19,9 +19,9 @@
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 import com.google.common.collect.ForwardingIterator;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -367,7 +367,7 @@
     WaitingRunner runner = new WaitingRunner();
 
     int batchSize = 5; // how many tasks are started concurrently
-    Queue<CountDownLatch> batches = new LinkedList<>();
+    Queue<CountDownLatch> batches = new ArrayDeque<>();
     for (int b = 0; b < blockSize; b++) {
       batches.add(executeWaitingRunnableBatch(batchSize, executor));
     }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java
new file mode 100644
index 0000000..646f915
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java
@@ -0,0 +1,101 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DestinationConfigurationTest {
+  private static final String REMOTE = "foo";
+
+  @Mock private RemoteConfig remoteConfigMock;
+  @Mock private Config cfgMock;
+
+  private DestinationConfiguration objectUnderTest;
+
+  @Before
+  public void setUp() {
+    when(remoteConfigMock.getName()).thenReturn(REMOTE);
+    when(cfgMock.getStringList(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+        .thenReturn(new String[] {});
+    objectUnderTest = new DestinationConfiguration(remoteConfigMock, cfgMock);
+  }
+
+  @Test
+  public void shouldIgnoreRemotePushBatchSizeWhenClusterReplicationIsConfigured() {
+    // given
+    when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", 0)).thenReturn(1);
+    when(cfgMock.getInt("replication", "distributionInterval", 0)).thenReturn(1);
+
+    // when
+    int actual = objectUnderTest.getPushBatchSize();
+
+    // then
+    assertThat(actual).isEqualTo(0);
+  }
+
+  @Test
+  public void shouldIgnoreGlobalPushBatchSizeWhenClusterReplicationIsConfigured() {
+    // given
+    int globalPushBatchSize = 1;
+    when(cfgMock.getInt("gerrit", "pushBatchSize", 0)).thenReturn(globalPushBatchSize);
+    when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", globalPushBatchSize))
+        .thenReturn(globalPushBatchSize);
+    when(cfgMock.getInt("replication", "distributionInterval", 0)).thenReturn(1);
+
+    // when
+    int actual = objectUnderTest.getPushBatchSize();
+
+    // then
+    assertThat(actual).isEqualTo(0);
+  }
+
+  @Test
+  public void shouldReturnRemotePushBatchSizeWhenClusterReplicationIsNotConfigured() {
+    // given
+    when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", 0)).thenReturn(1);
+
+    // when
+    int actual = objectUnderTest.getPushBatchSize();
+
+    // then
+    assertThat(actual).isEqualTo(1);
+  }
+
+  @Test
+  public void shouldReturnGlobalPushBatchSizeWhenClusterReplicationIsNotConfigured() {
+    // given
+    int globalPushBatchSize = 1;
+    when(cfgMock.getInt("gerrit", "pushBatchSize", 0)).thenReturn(globalPushBatchSize);
+    when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", globalPushBatchSize))
+        .thenReturn(globalPushBatchSize);
+
+    // when
+    int actual = objectUnderTest.getPushBatchSize();
+
+    // then
+    assertThat(actual).isEqualTo(globalPushBatchSize);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
index a1f61fe..5e6bde8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
@@ -65,6 +65,7 @@
           return implementedByOverrider;
         }
       } catch (NoSuchMethodException | SecurityException e) {
+        return null;
       }
       return null;
     }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index 94f0dc4..cfe9002 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -17,8 +17,10 @@
 import static org.eclipse.jgit.lib.Ref.Storage.NEW;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -36,6 +38,7 @@
 import com.google.gerrit.server.util.IdGenerator;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -115,7 +118,8 @@
         new ObjectIdRef.Unpeeled(
             NEW, "foo", ObjectId.fromString("0000000000000000000000000000000000000001"));
 
-    localRefs = Arrays.asList(newLocalRef);
+    localRefs = new ArrayList<>();
+    localRefs.add(newLocalRef);
 
     Ref remoteRef = new ObjectIdRef.Unpeeled(NEW, "foo", ObjectId.zeroId());
     remoteRefs = new HashMap<>();
@@ -223,6 +227,52 @@
     verify(transportMock, never()).push(any(), any());
   }
 
+  @Test
+  public void shouldPushInSingleOperationWhenPushBatchSizeIsNotConfigured()
+      throws InterruptedException, IOException {
+    replicateTwoRefs(createPushOne(null));
+    verify(transportMock).push(any(), any());
+  }
+
+  @Test
+  public void shouldPushInBatchesWhenPushBatchSizeIsConfigured()
+      throws InterruptedException, IOException {
+    when(destinationMock.getPushBatchSize()).thenReturn(1);
+    replicateTwoRefs(createPushOne(null));
+    verify(transportMock, times(2)).push(any(), any());
+  }
+
+  @Test
+  public void shouldStopPushingInBatchesWhenPushOperationGetsCanceled()
+      throws InterruptedException, IOException {
+    when(destinationMock.getPushBatchSize()).thenReturn(1);
+    PushOne pushOne = createPushOne(null);
+
+    // cancel replication during the first push
+    doAnswer(
+            invocation -> {
+              pushOne.setCanceledWhileRunning();
+              return new PushResult();
+            })
+        .when(transportMock)
+        .push(any(), any());
+
+    replicateTwoRefs(pushOne);
+    verify(transportMock, times(1)).push(any(), any());
+  }
+
+  private void replicateTwoRefs(PushOne pushOne) throws InterruptedException {
+    ObjectIdRef barLocalRef =
+        new ObjectIdRef.Unpeeled(
+            NEW, "bar", ObjectId.fromString("0000000000000000000000000000000000000001"));
+    localRefs.add(barLocalRef);
+
+    pushOne.addRef(PushOne.ALL_REFS);
+    pushOne.run();
+
+    isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
+  }
+
   private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
     PushOne push =
         new PushOne(
@@ -272,7 +322,7 @@
               @Override
               public Callable<Object> answer(InvocationOnMock invocation) throws Throwable {
                 Callable<Object> originalCall = (Callable<Object>) invocation.getArguments()[0];
-                return new Callable<Object>() {
+                return new Callable<>() {
 
                   @Override
                   public Object call() throws Exception {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
index 9606371..b6a3ed1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -91,6 +91,18 @@
   }
 
   protected void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project, Integer pushBatchSize)
+      throws IOException {
+    setReplicationDestination(
+        remoteName,
+        Arrays.asList(replicaSuffix),
+        project,
+        TEST_REPLICATION_DELAY_SECONDS,
+        false,
+        Optional.ofNullable(pushBatchSize));
+  }
+
+  protected void setReplicationDestination(
       String remoteName, String replicaSuffix, Optional<String> project, boolean mirror)
       throws IOException {
     setReplicationDestination(
@@ -125,13 +137,37 @@
 
   protected void setReplicationDestination(
       String remoteName,
+      String replicaSuffix,
+      Optional<String> project,
+      int replicationDelay,
+      boolean mirror,
+      Optional<Integer> pushBatchSize)
+      throws IOException {
+    setReplicationDestination(
+        remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror, pushBatchSize);
+  }
+
+  protected void setReplicationDestination(
+      String remoteName,
       List<String> replicaSuffixes,
       Optional<String> project,
       int replicationDelay,
       boolean mirror)
       throws IOException {
     setReplicationDestination(
-        config, remoteName, replicaSuffixes, project, replicationDelay, mirror);
+        remoteName, replicaSuffixes, project, replicationDelay, mirror, Optional.empty());
+  }
+
+  protected void setReplicationDestination(
+      String remoteName,
+      List<String> replicaSuffixes,
+      Optional<String> project,
+      int replicationDelay,
+      boolean mirror,
+      Optional<Integer> pushBatchSize)
+      throws IOException {
+    setReplicationDestination(
+        config, remoteName, replicaSuffixes, project, replicationDelay, mirror, pushBatchSize);
     config.setBoolean("gerrit", null, "autoReload", true);
     config.save();
   }
@@ -142,7 +178,8 @@
       Optional<String> project,
       int replicationDelay)
       throws IOException {
-    setReplicationDestination(config, null, replicaSuffixes, project, replicationDelay, false);
+    setReplicationDestination(
+        config, null, replicaSuffixes, project, replicationDelay, false, Optional.empty());
   }
 
   protected void setReplicationDestination(
@@ -151,7 +188,8 @@
       List<String> replicaSuffixes,
       Optional<String> project,
       int replicationDelay,
-      boolean mirror)
+      boolean mirror,
+      Optional<Integer> pushBatchSize)
       throws IOException {
 
     List<String> replicaUrls =
@@ -163,6 +201,7 @@
     remoteConfig.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY_MINUTES);
     remoteConfig.setBoolean("remote", remoteName, "mirror", mirror);
     project.ifPresent(prj -> remoteConfig.setString("remote", remoteName, "projects", prj));
+    pushBatchSize.ifPresent(pbs -> remoteConfig.setInt("remote", remoteName, "pushBatchSize", pbs));
     remoteConfig.save();
   }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
index 4cd55f9..b32829c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
@@ -384,6 +384,7 @@
     }
   }
 
+  @SuppressWarnings("deprecation")
   private boolean equals(ReplicationScheduledEvent scheduledEvent, Object other) {
     if (!(other instanceof ReplicationScheduledEvent)) {
       return false;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index a174e91..eb2b999 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -27,10 +27,19 @@
 import com.google.gerrit.extensions.common.ProjectInfo;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
@@ -217,6 +226,75 @@
   }
 
   @Test
+  public void pushAllWait() throws Exception {
+    createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    ReplicationState state = new ReplicationState(NO_OP);
+
+    Future<?> future =
+        plugin
+            .getSysInjector()
+            .getInstance(PushAll.Factory.class)
+            .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false)
+            .schedule(0, TimeUnit.SECONDS);
+
+    future.get();
+    state.waitForReplication();
+  }
+
+  @Test
+  public void pushAllWaitCancelNotRunningTask() throws Exception {
+    createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    ReplicationState state = new ReplicationState(NO_OP);
+
+    Future<?> future =
+        plugin
+            .getSysInjector()
+            .getInstance(PushAll.Factory.class)
+            .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false)
+            .schedule(0, TimeUnit.SECONDS);
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Executor service = Executors.newSingleThreadExecutor();
+    service.execute(
+        new Runnable() {
+          @Override
+          public void run() {
+            try {
+              future.get();
+              state.waitForReplication();
+              latch.countDown();
+            } catch (Exception e) {
+              // fails the test because we don't countDown
+            }
+          }
+        });
+
+    // Cancel the replication task
+    waitUntil(() -> getProjectTasks().size() != 0);
+    WorkQueue.Task<?> task = getProjectTasks().get(0);
+    assertThat(task.getState()).isAnyOf(WorkQueue.Task.State.READY, WorkQueue.Task.State.SLEEPING);
+    task.cancel(false);
+
+    // Confirm our waiting thread completed
+    boolean receivedSignal = latch.await(5, TimeUnit.SECONDS); // FIXME Choose a good timeout
+    assertThat(receivedSignal).isTrue();
+  }
+
+  private List<WorkQueue.Task<?>> getProjectTasks() {
+    return getInstance(WorkQueue.class).getTasks().stream()
+        .filter(t -> t instanceof WorkQueue.ProjectTask)
+        .collect(Collectors.toList());
+  }
+
+  @Test
   public void shouldReplicateHeadUpdate() throws Exception {
     setReplicationDestination("foo", "replica", ALL_PROJECTS);
     reloadConfig();
@@ -348,6 +426,28 @@
     }
   }
 
+  @Test
+  public void shouldReplicateWithPushBatchSizeSetForRemote() throws Exception {
+    Project.NameKey targetProject = createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, 1);
+    reloadConfig();
+
+    // creating a change results in 2 refs creation therefore it already qualifies for push in two
+    // batches of size 1 each
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().refName();
+
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      WaitUtil.waitUntil(() -> checkedGetRef(repo, sourceRef) != null, Duration.ofSeconds(60));
+
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
   private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
     WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java
new file mode 100644
index 0000000..cf8dbe3
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java
@@ -0,0 +1,68 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.WaitUtil;
+import com.google.gerrit.entities.Project;
+import java.time.Duration;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.junit.Test;
+
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationPushInBatchesIT extends ReplicationDaemon {
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    initConfig();
+    config.setInt("gerrit", null, "pushBatchSize", 1);
+    config.save();
+    setReplicationDestination(
+        "remote1",
+        "suffix1",
+        Optional.of("not-used-project")); // Simulates a full replication.config initialization
+    super.setUpTestPlugin();
+  }
+
+  @Test
+  public void shouldReplicateWithPushBatchSizeSetGlobaly() throws Exception {
+    Project.NameKey targetProject = createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    // creating a change results in 2 refs creation therefore it already qualifies for push in two
+    // batches of size 1 each
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().refName();
+
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      WaitUtil.waitUntil(() -> checkedGetRef(repo, sourceRef) != null, Duration.ofSeconds(60));
+
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
index 901200b..5881ea7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
@@ -22,14 +22,14 @@
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.events.ProjectEvent;
 import com.google.gerrit.server.events.RefEvent;
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
 public class TestDispatcher implements EventDispatcher {
-  private final List<ProjectEvent> projectEvents = new LinkedList<>();
-  private final List<RefEvent> refEvents = new LinkedList<>();
-  private final List<Event> events = new LinkedList<>();
+  private final List<ProjectEvent> projectEvents = new ArrayList<>();
+  private final List<RefEvent> refEvents = new ArrayList<>();
+  private final List<Event> events = new ArrayList<>();
 
   @Override
   public void postEvent(Change change, ChangeEvent event) {} // Not used in replication
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
index f61114e..080f279 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableSet;
 import com.google.gerrit.entities.Project;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.net.URISyntaxException;
@@ -34,7 +35,7 @@
 
   public static TestUriUpdates create(
       Project.NameKey project, URIish uri, String remote, Set<String> refs) {
-    return new AutoValue_TestUriUpdates(project, uri, remote, refs);
+    return new AutoValue_TestUriUpdates(project, uri, remote, ImmutableSet.copyOf(refs));
   }
 
   @Override
@@ -47,5 +48,5 @@
   public abstract String getRemoteName();
 
   @Override
-  public abstract Set<String> getRefs();
+  public abstract ImmutableSet<String> getRefs();
 }