Execute refs fetch in batches

If number of refs to fetch is greater then
replication.refsBatchSize, refs are going to be split into
a separate fetch operations.

Feature: Issue 11605
Change-Id: If5f192a95bf5f0945b503f66dcdc012ce71539eb
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index bf776e6..d4e22d9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -98,7 +98,7 @@
   FetchOne(
       GitRepositoryManager grm,
       Source s,
-      RemoteConfig c,
+      SourceConfiguration c,
       PerThreadRequestScope.Scoper ts,
       IdGenerator ig,
       ReplicationStateListeners sl,
@@ -108,7 +108,7 @@
       @Assisted URIish u) {
     gitManager = grm;
     pool = s;
-    config = c;
+    config = c.getRemoteConfig();
     threadScoper = ts;
     projectName = d;
     uri = u;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 1daf92b..e9bb620 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -59,9 +59,11 @@
 import com.google.inject.servlet.RequestScoped;
 import com.googlesource.gerrit.plugins.replication.RemoteSiteUser;
 import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.BatchFetchClient;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.CGitFetch;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.CGitFetchValidator;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.Fetch;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchClientImplementation;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchFactory;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.JGitFetch;
 import java.io.IOException;
@@ -85,7 +87,6 @@
 import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.transport.RemoteConfig;
 import org.eclipse.jgit.transport.URIish;
 import org.slf4j.Logger;
 
@@ -175,13 +176,14 @@
                 bind(PerThreadRequestScope.Propagator.class);
 
                 bind(Source.class).toInstance(Source.this);
-                bind(RemoteConfig.class).toInstance(config.getRemoteConfig());
+                bind(SourceConfiguration.class).toInstance(config);
                 install(new FactoryModuleBuilder().build(FetchOne.Factory.class));
                 Class<? extends Fetch> clientClass =
                     cfg.useCGitClient() ? CGitFetch.class : JGitFetch.class;
                 install(
                     new FactoryModuleBuilder()
-                        .implement(Fetch.class, clientClass)
+                        .implement(Fetch.class, BatchFetchClient.class)
+                        .implement(Fetch.class, FetchClientImplementation.class, clientClass)
                         .build(FetchFactory.class));
               }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
index 01d46b1..e50461d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
@@ -51,6 +51,7 @@
   private final int maxRetries;
   private int slowLatencyThreshold;
   private boolean useCGitClient;
+  private int refsBatchSize;
 
   public SourceConfiguration(RemoteConfig remoteConfig, Config cfg) {
     this.remoteConfig = remoteConfig;
@@ -76,6 +77,9 @@
     replicatePermissions = cfg.getBoolean("remote", name, "replicatePermissions", true);
     replicateHiddenProjects = cfg.getBoolean("remote", name, "replicateHiddenProjects", false);
     useCGitClient = cfg.getBoolean("replication", "useCGitClient", false);
+    refsBatchSize = cfg.getInt("replication", "refsBatchSize", 50);
+    if (refsBatchSize <= 0)
+      throw new IllegalArgumentException("refsBatchSize must be greater than zero");
     remoteNameStyle =
         MoreObjects.firstNonNull(cfg.getString("remote", name, "remoteNameStyle"), "slash");
     maxRetries =
@@ -173,6 +177,10 @@
     return useCGitClient;
   }
 
+  public int getRefsBatchSize() {
+    return refsBatchSize;
+  }
+
   @Override
   public RemoteConfig getRemoteConfig() {
     return remoteConfig;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/BatchFetchClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/BatchFetchClient.java
new file mode 100644
index 0000000..f2fbcd1
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/BatchFetchClient.java
@@ -0,0 +1,49 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.fetch;
+
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration;
+import java.io.IOException;
+import java.util.List;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.URIish;
+
+public class BatchFetchClient implements Fetch {
+  private int batchSize;
+  private Fetch fetchClient;
+
+  @Inject
+  public BatchFetchClient(
+      SourceConfiguration config,
+      FetchFactory factory,
+      @Assisted URIish uri,
+      @Assisted Repository git) {
+    this.batchSize = config.getRefsBatchSize();
+    this.fetchClient = factory.createPlainImpl(uri, git);
+  }
+
+  @Override
+  public List<RefUpdateState> fetch(List<RefSpec> refs) throws IOException {
+    List<RefUpdateState> results = Lists.newArrayList();
+    for (List<RefSpec> refsBatch : Lists.partition(refs, batchSize)) {
+      results.addAll(fetchClient.fetch(refsBatch));
+    }
+    return results;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
index 4f8abd6..9f055c8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
@@ -19,6 +19,7 @@
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
@@ -30,7 +31,6 @@
 import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.transport.RemoteConfig;
 import org.eclipse.jgit.transport.URIish;
 
 public class CGitFetch implements Fetch {
@@ -40,11 +40,11 @@
   private int timeout;
 
   @Inject
-  public CGitFetch(RemoteConfig config, @Assisted URIish uri, @Assisted Repository git) {
+  public CGitFetch(SourceConfiguration config, @Assisted URIish uri, @Assisted Repository git) {
 
     this.localProjectDirectory = git.getDirectory();
     this.uri = uri;
-    this.timeout = config.getTimeout();
+    this.timeout = config.getRemoteConfig().getTimeout();
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchClientImplementation.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchClientImplementation.java
new file mode 100644
index 0000000..77317b7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchClientImplementation.java
@@ -0,0 +1,23 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.fetch;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@BindingAnnotation
+@Retention(RetentionPolicy.RUNTIME)
+public @interface FetchClientImplementation {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchFactory.java
index 3d65594..d356477 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchFactory.java
@@ -19,4 +19,7 @@
 
 public interface FetchFactory {
   Fetch create(URIish uri, Repository git);
+  // Return implementation without any decorators
+  @FetchClientImplementation
+  Fetch createPlainImpl(URIish uri, Repository git);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
index 4467752..5de82f1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
@@ -19,6 +19,7 @@
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
+import com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration;
 import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -39,11 +40,11 @@
 
   @Inject
   public JGitFetch(
-      RemoteConfig config,
+      SourceConfiguration sourceConfig,
       CredentialsFactory cpFactory,
       @Assisted URIish uri,
       @Assisted Repository git) {
-    this.config = config;
+    this.config = sourceConfig.getRemoteConfig();
     this.credentialsProvider = cpFactory.create(config.getName());
     this.uri = uri;
     this.git = git;
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index cc0c0ee..c5d13a0 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -138,6 +138,15 @@
 
 	Default: false
 
+replication.refsBatchSize
+:	Number of refs that are fetched in a single fetch call.
+	If number of refs to fetch is greater then this param,
+	refs are going to be split into a separate fetch operations.
+
+	Value must be greater than zero.
+
+	Default: 50
+
 remote.NAME.url
 :	Address of the remote server to fetch from. Single URL can be
 	specified within a single remote block. A remote node can request
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java
index 8134ba4..a02f43f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java
@@ -15,6 +15,11 @@
 package com.googlesource.gerrit.plugins.replication.pull;
 
 import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Lists;
 import com.google.common.flogger.FluentLogger;
@@ -30,17 +35,22 @@
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.BatchFetchClient;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.CGitFetch;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.Fetch;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchClientImplementation;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchFactory;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.file.Path;
 import java.time.Duration;
+import java.util.List;
 import java.util.function.Supplier;
 import org.eclipse.jgit.errors.TransportException;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.revwalk.RevCommit;
 import org.eclipse.jgit.transport.RefSpec;
@@ -117,7 +127,6 @@
     try (Repository repo = repoManager.openRepository(project)) {
 
       Result pushResult = createChange();
-      RevCommit sourceCommit = pushResult.getCommit();
       String sourceRef = pushResult.getPatchSet().refName();
 
       Fetch objectUnderTest = fetchFactory.create(new URIish("/not_existing_path/"), repo);
@@ -160,6 +169,34 @@
   }
 
   @Test
+  public void shouldFetchMultipleRefsInMultipleBatches() throws Exception {
+    Config cf = new Config();
+    cf.setInt("remote", "test_config", "timeout", 0);
+    cf.setInt("replication", null, "refsBatchSize", 2);
+    URIish uri = new URIish(testRepoPath.toString());
+    List<RefUpdateState> fetchResultList =
+        Lists.newArrayList(new RefUpdateState("test_config", RefUpdate.Result.NEW));
+    RemoteConfig remoteConfig = new RemoteConfig(cf, "test_config");
+    SourceConfiguration sourceConfig = new SourceConfiguration(remoteConfig, cf);
+
+    Repository repo = mock(Repository.class);
+    FetchFactory fetchFactory = mock(FetchFactory.class);
+    Fetch fetchClient = mock(Fetch.class);
+    when(fetchFactory.createPlainImpl(uri, repo)).thenReturn(fetchClient);
+    when(fetchClient.fetch(any())).thenReturn(fetchResultList);
+
+    Fetch objectUnderTest =
+        new BatchFetchClient(sourceConfig, fetchFactory, new URIish(testRepoPath.toString()), repo);
+
+    objectUnderTest.fetch(
+        Lists.newArrayList(
+            new RefSpec("refs/changes/01/1/1:refs/changes/01/1/1"),
+            new RefSpec("refs/changes/02/2/1:refs/changes/02/2/1"),
+            new RefSpec("refs/changes/03/3/1:refs/changes/03/3/1")));
+    verify(fetchClient, times(2)).fetch(any());
+  }
+
+  @Test
   public void shouldFetchNewBranch() throws Exception {
     String testProjectName = project + TEST_REPLICATION_SUFFIX;
     createTestProject(testProjectName);
@@ -232,10 +269,13 @@
       cf.setInt("remote", "test_config", "timeout", 0);
       try {
         RemoteConfig remoteConfig = new RemoteConfig(cf, "test_config");
-        bind(RemoteConfig.class).toInstance(remoteConfig);
+        SourceConfiguration sourceConfig = new SourceConfiguration(remoteConfig, cf);
+
+        bind(SourceConfiguration.class).toInstance(sourceConfig);
         install(
             new FactoryModuleBuilder()
                 .implement(Fetch.class, CGitFetch.class)
+                .implement(Fetch.class, FetchClientImplementation.class, CGitFetch.class)
                 .build(FetchFactory.class));
       } catch (URISyntaxException e) {
         throw new RuntimeException(e);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationTest.java
index c1c0981..3257fda 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationTest.java
@@ -15,10 +15,13 @@
 package com.googlesource.gerrit.plugins.replication.pull;
 
 import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
 import static com.googlesource.gerrit.plugins.replication.pull.Source.encode;
 import static com.googlesource.gerrit.plugins.replication.pull.Source.needsUrlEncoding;
 
 import java.net.URISyntaxException;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.transport.RemoteConfig;
 import org.eclipse.jgit.transport.URIish;
 import org.junit.Test;
 
@@ -43,4 +46,14 @@
     assertThat(encode("name/with a space")).isEqualTo("name/with%20a%20space");
     assertThat(encode("name\nwith-LF")).isEqualTo("name%0Awith-LF");
   }
+
+  @Test
+  public void testRefsBatchSizeMustBeGreaterThanZero() throws URISyntaxException {
+    Config cf = new Config();
+    cf.setInt("remote", "test_config", "timeout", 0);
+    cf.setInt("replication", null, "refsBatchSize", 0);
+    RemoteConfig remoteConfig = new RemoteConfig(cf, "test_config");
+
+    assertThrows(IllegalArgumentException.class, () -> new SourceConfiguration(remoteConfig, cf));
+  }
 }