Add possibility to split large pushes to remote into batches

Allow to split push with a large number of refs into smaller,
size-configurable batches to:
1. allow task cancellation - after each batch gets pushed operation can
   be checked if it was requested to stop
2. set push operation timeout more predictably, with this parameter set
   one can more definitely specify when timeout should occur
3. successfully push to remotes like GitHub, GitLab that enforce limits
   on the number of refs that could be accepted in a single push operation

Note that in case when failure occurs all batches in push are re-attempted
(similarly to what it is now) but refs already pushed result in 'UP_TO_DATE'
status.

One can specify the batch size for all remotes with 'gerrit.pushBatchSize'
parameter of 'replication.config'. It can be also overridden at remote-level
by setting `pushBatchSize`.

By default `0` is assumed which means that there is no limitation on number
of refs in a single push operation (current behaviour is preserved). Note
that negative values are treated as `0`.

Note that when 'Cluster Replication' is enabled pushInBatches results in
more (they happen also when pushInBatches is disabled but in lower
number) ERROR log entries like

  EERROR com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage : Error while deleting task...

Therefore pushInBatches is disabled in that case (even if positive value
is given globaly or for specific remote) and warning is logged once per
remote.

Bug: Issue 13216
Change-Id: I1514855cbb5ac0ea78397a17eb1c5926bf07a4a3
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 400d6ff..00a46de 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -774,6 +774,10 @@
     return config.getSlowLatencyThreshold();
   }
 
+  int getPushBatchSize() {
+    return config.getPushBatchSize();
+  }
+
   private static boolean matches(URIish uri, String urlMatch) {
     if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
       return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index 1b39374..a74d198 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -14,10 +14,14 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.google.common.base.Suppliers.memoize;
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.gerrit.server.config.ConfigUtil;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.transport.RemoteConfig;
 
@@ -45,6 +49,7 @@
   private final RemoteConfig remoteConfig;
   private final int maxRetries;
   private final int slowLatencyThreshold;
+  private final Supplier<Integer> pushBatchSize;
 
   protected DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) {
     this.remoteConfig = remoteConfig;
@@ -84,6 +89,31 @@
                 "slowLatencyThreshold",
                 DEFAULT_SLOW_LATENCY_THRESHOLD_SECS,
                 TimeUnit.SECONDS);
+
+    pushBatchSize =
+        memoize(
+            () -> {
+              int configuredBatchSize =
+                  Math.max(
+                      0,
+                      getInt(
+                          remoteConfig,
+                          cfg,
+                          "pushBatchSize",
+                          cfg.getInt("gerrit", "pushBatchSize", 0)));
+              if (configuredBatchSize > 0) {
+                int distributionInterval = cfg.getInt("replication", "distributionInterval", 0);
+                if (distributionInterval > 0) {
+                  repLog.atWarning().log(
+                      "Push in batches cannot be turned on for remote (%s) when 'Cluster"
+                          + " Replication' (replication.distributionInterval) is configured",
+                      name);
+                  return 0;
+                }
+                return configuredBatchSize;
+              }
+              return 0;
+            });
   }
 
   @Override
@@ -173,4 +203,9 @@
   public int getSlowLatencyThreshold() {
     return slowLatencyThreshold;
   }
+
+  @Override
+  public int getPushBatchSize() {
+    return pushBatchSize.get();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 87c35ee..8b3c9e2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -19,12 +19,14 @@
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Throwables;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
@@ -564,7 +566,36 @@
           lazy(() -> refUpdatesForLogging(todo.subList(0, replConfig.getMaxRefsToLog()))));
     }
 
-    return tn.push(NullProgressMonitor.INSTANCE, todo);
+    return pushInBatches(tn, todo);
+  }
+
+  private PushResult pushInBatches(Transport tn, List<RemoteRefUpdate> todo)
+      throws NotSupportedException, TransportException {
+    int batchSize = pool.getPushBatchSize();
+    if (batchSize == 0 || todo.size() <= batchSize) {
+      return tn.push(NullProgressMonitor.INSTANCE, todo);
+    }
+
+    List<List<RemoteRefUpdate>> batches = Lists.partition(todo, batchSize);
+    repLog.atInfo().log("Push to %s in %d batches", uri, batches.size());
+    AggregatedPushResult result = new AggregatedPushResult();
+    int completedBatch = 1;
+    for (List<RemoteRefUpdate> batch : batches) {
+      repLog.atInfo().log(
+          "Pushing %d/%d batches for replication to %s", completedBatch, batches.size(), uri);
+      result.addResult(tn.push(NullProgressMonitor.INSTANCE, batch));
+
+      //  check if push should be no longer continued
+      if (wasCanceled()) {
+        repLog.atInfo().log(
+            "Push for replication to %s was canceled after %d completed batch and thus won't be"
+                + " rescheduled",
+            uri, completedBatch);
+        break;
+      }
+      completedBatch++;
+    }
+    return result;
   }
 
   private static String refUpdatesForLogging(List<RemoteRefUpdate> refUpdates) {
@@ -836,4 +867,24 @@
       super(uri, message);
     }
   }
+
+  /**
+   * Internal class used to aggregate PushResult objects from all push batches. See {@link
+   * PushOne#pushInBatches} for usage.
+   */
+  private static class AggregatedPushResult extends PushResult {
+    private final List<PushResult> results = new ArrayList<>();
+
+    void addResult(PushResult result) {
+      results.add(result);
+    }
+
+    @Override
+    public Collection<RemoteRefUpdate> getRemoteUpdates() {
+      return results.stream()
+          .map(PushResult::getRemoteUpdates)
+          .flatMap(Collection::stream)
+          .collect(toList());
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
index b66e73c..5fe0323 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
@@ -98,6 +98,13 @@
   int getSlowLatencyThreshold();
 
   /**
+   * Returns the maximum number of refs that can be pushed in a single push operation.
+   *
+   * @return batch size, zero if unlimited.
+   */
+  int getPushBatchSize();
+
+  /**
    * Whether the remote configuration is for a single project only
    *
    * @return true, when configuration is for a single project, false otherwise
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 2d02022..5c8e2c7 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -148,6 +148,20 @@
 	`(retry 1) push aaa.com:/git/test.git [refs/heads/b1 refs/heads/b2 (+2)]`
 
 
+gerrit.pushBatchSize
+:	Max number of refs that are pushed in a single push operation. If more
+	than pushBatchSize are to be pushed then they are divided into batches
+	and pushed sequentially one-by-one.
+
+	Can be overridden at remote-level by setting pushBatchSize.
+
+	By default, `0`, which means that there are no limitations on number of
+	refs to be transferred in a single push operation. Note that negative
+	values are treated as `0`.
+
+	Note that `pushBatchSize` is ignored when *Cluster Replication* is configured
+	- when `replication.distributionInterval` has value > 0.
+
 gerrit.sshCommandTimeout
 :	Timeout for SSH command execution. If 0, there is no timeout and
 	the client waits indefinitely. By default, 0.
@@ -551,6 +565,19 @@
 
 	default: 15 minutes
 
+remote.NAME.pushBatchSize
+:	Max number of refs that are pushed in a single push operation to this
+	destination. If more than `pushBatchSize` are to be pushed then they are
+	divided into batches and pushed sequentially one-by-one.
+
+	By default it falls back to `gerrit.pushBatchSize` value (which is `0` if
+	not set, which means that there are no limitations on number of refs to
+	be transferred in a single push operation). Note that negative values are
+	treated as `0`.
+
+	Note that `pushBatchSize` is ignored when *Cluster Replication* is configured
+	- when `replication.distributionInterval` has value > 0.
+
 Directory `replication`
 --------------------
 The optional directory `$site_path/etc/replication` contains Git-style
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java
new file mode 100644
index 0000000..646f915
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java
@@ -0,0 +1,101 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DestinationConfigurationTest {
+  private static final String REMOTE = "foo";
+
+  @Mock private RemoteConfig remoteConfigMock;
+  @Mock private Config cfgMock;
+
+  private DestinationConfiguration objectUnderTest;
+
+  @Before
+  public void setUp() {
+    when(remoteConfigMock.getName()).thenReturn(REMOTE);
+    when(cfgMock.getStringList(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+        .thenReturn(new String[] {});
+    objectUnderTest = new DestinationConfiguration(remoteConfigMock, cfgMock);
+  }
+
+  @Test
+  public void shouldIgnoreRemotePushBatchSizeWhenClusterReplicationIsConfigured() {
+    // given
+    when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", 0)).thenReturn(1);
+    when(cfgMock.getInt("replication", "distributionInterval", 0)).thenReturn(1);
+
+    // when
+    int actual = objectUnderTest.getPushBatchSize();
+
+    // then
+    assertThat(actual).isEqualTo(0);
+  }
+
+  @Test
+  public void shouldIgnoreGlobalPushBatchSizeWhenClusterReplicationIsConfigured() {
+    // given
+    int globalPushBatchSize = 1;
+    when(cfgMock.getInt("gerrit", "pushBatchSize", 0)).thenReturn(globalPushBatchSize);
+    when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", globalPushBatchSize))
+        .thenReturn(globalPushBatchSize);
+    when(cfgMock.getInt("replication", "distributionInterval", 0)).thenReturn(1);
+
+    // when
+    int actual = objectUnderTest.getPushBatchSize();
+
+    // then
+    assertThat(actual).isEqualTo(0);
+  }
+
+  @Test
+  public void shouldReturnRemotePushBatchSizeWhenClusterReplicationIsNotConfigured() {
+    // given
+    when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", 0)).thenReturn(1);
+
+    // when
+    int actual = objectUnderTest.getPushBatchSize();
+
+    // then
+    assertThat(actual).isEqualTo(1);
+  }
+
+  @Test
+  public void shouldReturnGlobalPushBatchSizeWhenClusterReplicationIsNotConfigured() {
+    // given
+    int globalPushBatchSize = 1;
+    when(cfgMock.getInt("gerrit", "pushBatchSize", 0)).thenReturn(globalPushBatchSize);
+    when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", globalPushBatchSize))
+        .thenReturn(globalPushBatchSize);
+
+    // when
+    int actual = objectUnderTest.getPushBatchSize();
+
+    // then
+    assertThat(actual).isEqualTo(globalPushBatchSize);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index 94f0dc4..bb3e886 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -17,8 +17,10 @@
 import static org.eclipse.jgit.lib.Ref.Storage.NEW;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -36,6 +38,7 @@
 import com.google.gerrit.server.util.IdGenerator;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -115,7 +118,8 @@
         new ObjectIdRef.Unpeeled(
             NEW, "foo", ObjectId.fromString("0000000000000000000000000000000000000001"));
 
-    localRefs = Arrays.asList(newLocalRef);
+    localRefs = new ArrayList<>();
+    localRefs.add(newLocalRef);
 
     Ref remoteRef = new ObjectIdRef.Unpeeled(NEW, "foo", ObjectId.zeroId());
     remoteRefs = new HashMap<>();
@@ -223,6 +227,52 @@
     verify(transportMock, never()).push(any(), any());
   }
 
+  @Test
+  public void shouldPushInSingleOperationWhenPushBatchSizeIsNotConfigured()
+      throws InterruptedException, IOException {
+    replicateTwoRefs(createPushOne(null));
+    verify(transportMock).push(any(), any());
+  }
+
+  @Test
+  public void shouldPushInBatchesWhenPushBatchSizeIsConfigured()
+      throws InterruptedException, IOException {
+    when(destinationMock.getPushBatchSize()).thenReturn(1);
+    replicateTwoRefs(createPushOne(null));
+    verify(transportMock, times(2)).push(any(), any());
+  }
+
+  @Test
+  public void shouldStopPushingInBatchesWhenPushOperationGetsCanceled()
+      throws InterruptedException, IOException {
+    when(destinationMock.getPushBatchSize()).thenReturn(1);
+    PushOne pushOne = createPushOne(null);
+
+    // cancel replication during the first push
+    doAnswer(
+            invocation -> {
+              pushOne.setCanceledWhileRunning();
+              return new PushResult();
+            })
+        .when(transportMock)
+        .push(any(), any());
+
+    replicateTwoRefs(pushOne);
+    verify(transportMock, times(1)).push(any(), any());
+  }
+
+  private void replicateTwoRefs(PushOne pushOne) throws InterruptedException {
+    ObjectIdRef barLocalRef =
+        new ObjectIdRef.Unpeeled(
+            NEW, "bar", ObjectId.fromString("0000000000000000000000000000000000000001"));
+    localRefs.add(barLocalRef);
+
+    pushOne.addRef(PushOne.ALL_REFS);
+    pushOne.run();
+
+    isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
+  }
+
   private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
     PushOne push =
         new PushOne(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
index 9606371..b6a3ed1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -91,6 +91,18 @@
   }
 
   protected void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project, Integer pushBatchSize)
+      throws IOException {
+    setReplicationDestination(
+        remoteName,
+        Arrays.asList(replicaSuffix),
+        project,
+        TEST_REPLICATION_DELAY_SECONDS,
+        false,
+        Optional.ofNullable(pushBatchSize));
+  }
+
+  protected void setReplicationDestination(
       String remoteName, String replicaSuffix, Optional<String> project, boolean mirror)
       throws IOException {
     setReplicationDestination(
@@ -125,13 +137,37 @@
 
   protected void setReplicationDestination(
       String remoteName,
+      String replicaSuffix,
+      Optional<String> project,
+      int replicationDelay,
+      boolean mirror,
+      Optional<Integer> pushBatchSize)
+      throws IOException {
+    setReplicationDestination(
+        remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror, pushBatchSize);
+  }
+
+  protected void setReplicationDestination(
+      String remoteName,
       List<String> replicaSuffixes,
       Optional<String> project,
       int replicationDelay,
       boolean mirror)
       throws IOException {
     setReplicationDestination(
-        config, remoteName, replicaSuffixes, project, replicationDelay, mirror);
+        remoteName, replicaSuffixes, project, replicationDelay, mirror, Optional.empty());
+  }
+
+  protected void setReplicationDestination(
+      String remoteName,
+      List<String> replicaSuffixes,
+      Optional<String> project,
+      int replicationDelay,
+      boolean mirror,
+      Optional<Integer> pushBatchSize)
+      throws IOException {
+    setReplicationDestination(
+        config, remoteName, replicaSuffixes, project, replicationDelay, mirror, pushBatchSize);
     config.setBoolean("gerrit", null, "autoReload", true);
     config.save();
   }
@@ -142,7 +178,8 @@
       Optional<String> project,
       int replicationDelay)
       throws IOException {
-    setReplicationDestination(config, null, replicaSuffixes, project, replicationDelay, false);
+    setReplicationDestination(
+        config, null, replicaSuffixes, project, replicationDelay, false, Optional.empty());
   }
 
   protected void setReplicationDestination(
@@ -151,7 +188,8 @@
       List<String> replicaSuffixes,
       Optional<String> project,
       int replicationDelay,
-      boolean mirror)
+      boolean mirror,
+      Optional<Integer> pushBatchSize)
       throws IOException {
 
     List<String> replicaUrls =
@@ -163,6 +201,7 @@
     remoteConfig.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY_MINUTES);
     remoteConfig.setBoolean("remote", remoteName, "mirror", mirror);
     project.ifPresent(prj -> remoteConfig.setString("remote", remoteName, "projects", prj));
+    pushBatchSize.ifPresent(pbs -> remoteConfig.setInt("remote", remoteName, "pushBatchSize", pbs));
     remoteConfig.save();
   }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index a174e91..33bd91d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -348,6 +348,28 @@
     }
   }
 
+  @Test
+  public void shouldReplicateWithPushBatchSizeSetForRemote() throws Exception {
+    Project.NameKey targetProject = createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS, 1);
+    reloadConfig();
+
+    // creating a change results in 2 refs creation therefore it already qualifies for push in two
+    // batches of size 1 each
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().refName();
+
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      WaitUtil.waitUntil(() -> checkedGetRef(repo, sourceRef) != null, Duration.ofSeconds(60));
+
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
   private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
     WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java
new file mode 100644
index 0000000..cf8dbe3
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java
@@ -0,0 +1,68 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.WaitUtil;
+import com.google.gerrit.entities.Project;
+import java.time.Duration;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.junit.Test;
+
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationPushInBatchesIT extends ReplicationDaemon {
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    initConfig();
+    config.setInt("gerrit", null, "pushBatchSize", 1);
+    config.save();
+    setReplicationDestination(
+        "remote1",
+        "suffix1",
+        Optional.of("not-used-project")); // Simulates a full replication.config initialization
+    super.setUpTestPlugin();
+  }
+
+  @Test
+  public void shouldReplicateWithPushBatchSizeSetGlobaly() throws Exception {
+    Project.NameKey targetProject = createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    // creating a change results in 2 refs creation therefore it already qualifies for push in two
+    // batches of size 1 each
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().refName();
+
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      WaitUtil.waitUntil(() -> checkedGetRef(repo, sourceRef) != null, Duration.ofSeconds(60));
+
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+}