Introduce `batch-fetch` REST API Endpoint

Similar to the `batch-apply-object` endpoint, there are the same
benefits if we batch the refs into one fetch call.

Whether to use the batching endpoint is configurable (false by default)
with the existing behaviour preserved for backwards compatibility.

Bug: Issue 40015567
Change-Id: Idb83e5cfcc4e8aae33c28f906518cad56ae9fc13
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index dac27aa..de15b17 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -338,7 +338,11 @@
       }
 
       if (!callSuccessful) {
-        callFetch(source, project, refs, state);
+        if (source.enableBatchedRefs()) {
+          callBatchFetch(source, project, refs, state);
+        } else {
+          callFetch(source, project, refs, state);
+        }
       }
     };
   }
@@ -361,7 +365,7 @@
     } catch (UncheckedIOException e) {
       stateLog.error("Falling back to calling fetch", e, state);
     }
-    return ((source) -> callFetch(source, project, refs, state));
+    return ((source) -> callBatchFetch(source, project, refs, state));
   }
 
   private BatchApplyObjectData toBatchApplyObject(
@@ -589,6 +593,74 @@
     return revisionDataBuilder.build();
   }
 
+  private boolean callBatchFetch(
+      Source source,
+      Project.NameKey project,
+      List<ReferenceUpdatedEvent> refs,
+      ReplicationState state) {
+
+    boolean resultIsSuccessful = true;
+
+    List<String> filteredRefs =
+        refs.stream()
+            .map(ReferenceUpdatedEvent::refName)
+            .filter(refName -> source.wouldFetchProject(project) && source.wouldFetchRef(refName))
+            .collect(Collectors.toList());
+
+    String refsStr = String.join(",", filteredRefs);
+    FetchApiClient fetchClient = fetchClientFactory.create(source);
+
+    for (String apiUrl : source.getApis()) {
+      try {
+        URIish uri = new URIish(apiUrl);
+        Optional<HttpResult> result = Optional.empty();
+        repLog.info(
+            "Pull replication REST API batch fetch to {} for {}:[{}]", apiUrl, project, refsStr);
+        Context<String> timer = fetchMetrics.startEnd2End(source.getRemoteConfigName());
+        result = Optional.of(fetchClient.callBatchFetch(project, filteredRefs, uri));
+        long elapsedMs = TimeUnit.NANOSECONDS.toMillis(timer.stop());
+        boolean resultSuccessful = result.map(HttpResult::isSuccessful).orElse(false);
+        repLog.info(
+            "Pull replication REST API batch fetch to {} COMPLETED for {}:[{}], HTTP Result:"
+                + " {} - time:{} ms",
+            apiUrl,
+            project,
+            refsStr,
+            result,
+            elapsedMs);
+        if (!resultSuccessful
+            && result.map(r -> r.isProjectMissing(project)).orElse(false)
+            && source.isCreateMissingRepositories()) {
+          result = initProject(project, uri, fetchClient);
+          resultSuccessful = result.map(HttpResult::isSuccessful).orElse(false);
+        }
+        if (!resultSuccessful) {
+          stateLog.warn(
+              String.format(
+                  "Pull replication REST API batch fetch call failed. Endpoint url: %s, reason:%s",
+                  apiUrl, result.flatMap(HttpResult::getMessage).orElse("unknown")),
+              state);
+        }
+        resultIsSuccessful &= resultSuccessful;
+      } catch (URISyntaxException e) {
+        stateLog.error(
+            String.format("Cannot parse pull replication batch api url:%s", apiUrl), state);
+        resultIsSuccessful = false;
+      } catch (Exception e) {
+        stateLog.error(
+            String.format(
+                "Exception during the pull replication batch fetch rest api call. Endpoint url:%s,"
+                    + " message:%s",
+                apiUrl, e.getMessage()),
+            e,
+            state);
+        resultIsSuccessful = false;
+      }
+    }
+
+    return resultIsSuccessful;
+  }
+
   private boolean callFetch(
       Source source,
       Project.NameKey project,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchAction.java
new file mode 100644
index 0000000..c6ad47d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchAction.java
@@ -0,0 +1,47 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.extensions.restapi.RestModifyView;
+import com.google.gerrit.server.project.ProjectResource;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.Input;
+import java.util.ArrayList;
+import java.util.List;
+
+@Singleton
+public class BatchFetchAction implements RestModifyView<ProjectResource, List<Input>> {
+  private final FetchAction fetchAction;
+
+  @Inject
+  public BatchFetchAction(FetchAction fetchAction) {
+    this.fetchAction = fetchAction;
+  }
+
+  @Override
+  public Response<?> apply(ProjectResource resource, List<Input> inputs) throws RestApiException {
+
+    List<Response<?>> allResponses = new ArrayList<>();
+    for (Input input : inputs) {
+      Response<?> res = fetchAction.apply(resource, input);
+      allResponses.add(res);
+    }
+
+    return Response.ok(allResponses);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java
index b6b8fd0..71cbf2e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java
@@ -127,6 +127,7 @@
                 || requestURI.endsWith(String.format("/%s~apply-objects", pluginName))
                 || requestURI.endsWith(String.format("/%s~batch-apply-object", pluginName))
                 || requestURI.endsWith(String.format("/%s~fetch", pluginName))
+                || requestURI.endsWith(String.format("/%s~batch-fetch", pluginName))
                 || requestURI.endsWith(String.format("/%s~delete-project", pluginName))
                 || requestURI.contains(String.format("/%s/init-project/", pluginName))))
         || requestURI.matches(".*/projects/[^/]+/HEAD");
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
index 5f39811..134ed59 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
@@ -40,6 +40,16 @@
     return callFetch(project, refName, targetUri, MILLISECONDS.toNanos(System.currentTimeMillis()));
   }
 
+  HttpResult callBatchFetch(
+      Project.NameKey project, List<String> refsInBatch, URIish targetUri, long startTimeNanos)
+      throws IOException;
+
+  default HttpResult callBatchFetch(
+      Project.NameKey project, List<String> refsInBatch, URIish targetUri) throws IOException {
+    return callBatchFetch(
+        project, refsInBatch, targetUri, MILLISECONDS.toNanos(System.currentTimeMillis()));
+  }
+
   HttpResult initProject(Project.NameKey project, URIish uri) throws IOException;
 
   HttpResult deleteProject(Project.NameKey project, URIish apiUri) throws IOException;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index 2b9ef09..d031b5c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -131,6 +131,31 @@
     return executeRequest(post, bearerTokenProvider.get(), targetUri);
   }
 
+  @Override
+  public HttpResult callBatchFetch(
+      NameKey project, List<String> refsInBatch, URIish targetUri, long startTimeNanos)
+      throws IOException {
+    String msgBody =
+        refsInBatch.stream()
+            .map(
+                refName -> {
+                  Boolean callAsync = !syncRefsFilter.match(refName);
+                  return String.format(
+                      "{\"label\":\"%s\", \"ref_name\": \"%s\", \"async\":%s}",
+                      instanceId, refName, callAsync);
+                })
+            .collect(Collectors.joining(","));
+
+    String url = formatUrl(targetUri.toString(), project, "batch-fetch");
+    HttpPost post = new HttpPost(url);
+    post.setEntity(new StringEntity("[" + msgBody + "]", StandardCharsets.UTF_8));
+    post.addHeader(new BasicHeader(CONTENT_TYPE, "application/json"));
+    post.addHeader(
+        PullReplicationApiRequestMetrics.HTTP_HEADER_X_START_TIME_NANOS,
+        Long.toString(startTimeNanos));
+    return executeRequest(post, bearerTokenProvider.get(), targetUri);
+  }
+
   /* (non-Javadoc)
    * @see com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient#initProject(com.google.gerrit.entities.Project.NameKey, org.eclipse.jgit.transport.URIish)
    */
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
index a5bffd3..26bcc12 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -101,6 +101,7 @@
   @Mock RevisionData revisionData;
   @Mock HttpResult successfulHttpResult;
   @Mock HttpResult fetchHttpResult;
+  @Mock HttpResult batchFetchHttpResult;
   @Mock RevisionData revisionDataWithParents;
   List<ObjectId> revisionDataParentObjectIds;
   @Mock HttpResult httpResult;
@@ -169,11 +170,13 @@
         .when(fetchRestApiClient.callBatchSendObject(any(), any(), anyLong(), any()))
         .thenReturn(batchHttpResult);
     when(fetchRestApiClient.callFetch(any(), anyString(), any())).thenReturn(fetchHttpResult);
+    when(fetchRestApiClient.callBatchFetch(any(), any(), any())).thenReturn(batchFetchHttpResult);
     when(fetchRestApiClient.initProject(any(), any())).thenReturn(successfulHttpResult);
     when(successfulHttpResult.isSuccessful()).thenReturn(true);
     when(httpResult.isSuccessful()).thenReturn(true);
     when(batchHttpResult.isSuccessful()).thenReturn(true);
     when(fetchHttpResult.isSuccessful()).thenReturn(true);
+    when(batchFetchHttpResult.isSuccessful()).thenReturn(true);
     when(httpResult.isProjectMissing(any())).thenReturn(false);
     when(batchHttpResult.isProjectMissing(any())).thenReturn(false);
     when(applyObjectsRefsFilter.match(any())).thenReturn(false);
@@ -332,7 +335,7 @@
   }
 
   @Test
-  public void shouldFallbackToCallFetchWhenIOException()
+  public void shouldFallbackToCallBatchFetchWhenIOException()
       throws IOException, LargeObjectException, RefUpdateException {
     Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta");
     objectUnderTest.start();
@@ -341,11 +344,11 @@
 
     objectUnderTest.onEvent(event);
 
-    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+    verify(fetchRestApiClient).callBatchFetch(any(), any(), any());
   }
 
   @Test
-  public void shouldFallbackToCallFetchWhenLargeRef()
+  public void shouldFallbackToCallBatchFetchWhenLargeRef()
       throws IOException, LargeObjectException, RefUpdateException {
     Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
     objectUnderTest.start();
@@ -354,12 +357,12 @@
 
     objectUnderTest.onEvent(event);
 
-    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+    verify(fetchRestApiClient).callBatchFetch(any(), any(), any());
   }
 
   @Test
   public void
-      shouldFallbackToCallFetchWhenParentObjectIsMissingAndRefDoesntMatchApplyObjectsRefsFilter()
+      shouldFallbackToCallBatchFetchWhenParentObjectIsMissingAndRefDoesntMatchApplyObjectsRefsFilter()
           throws IOException {
     Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
     objectUnderTest.start();
@@ -369,7 +372,7 @@
 
     objectUnderTest.onEvent(event);
 
-    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+    verify(fetchRestApiClient).callBatchFetch(any(), any(), any());
   }
 
   @Test
@@ -391,7 +394,7 @@
   }
 
   @Test
-  public void shouldFallbackToCallFetchWhenSendBatchObjectNotAvailableAndApplyObjectFails()
+  public void shouldFallbackToCallBatchFetchWhenParentObjectNotMissingButApplyObjectFails()
       throws IOException {
     Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
     objectUnderTest.start();
@@ -403,7 +406,7 @@
 
     objectUnderTest.onEvent(event);
 
-    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+    verify(fetchRestApiClient).callBatchFetch(any(), any(), any());
   }
 
   @Test
@@ -462,6 +465,20 @@
   }
 
   @Test
+  public void shouldCallFetchIfBatchedRefsNotEnabledAtSource() throws IOException {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
+    when(source.enableBatchedRefs()).thenReturn(false);
+    when(httpResult.isSuccessful()).thenReturn(false);
+    when(httpResult.isParentObjectMissing()).thenReturn(false);
+
+    objectUnderTest.start();
+    objectUnderTest.onEvent(event);
+
+    verify(fetchRestApiClient, never()).callBatchFetch(any(), any(), any());
+    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+  }
+
+  @Test
   public void shouldCallBatchFetchForAllTheRefsInTheBatchIfApplyObjectFails()
       throws IOException, URISyntaxException {
     Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1", "refs/changes/02/1/1");
@@ -476,9 +493,10 @@
     verify(fetchRestApiClient, times(2))
         .callSendObjects(any(), anyString(), anyLong(), any(), any());
     verify(fetchRestApiClient)
-        .callFetch(PROJECT, "refs/changes/01/1/1", new URIish("http://localhost:18080"));
-    verify(fetchRestApiClient)
-        .callFetch(PROJECT, "refs/changes/02/1/1", new URIish("http://localhost:18080"));
+        .callBatchFetch(
+            PROJECT,
+            List.of("refs/changes/01/1/1", "refs/changes/02/1/1"),
+            new URIish("http://localhost:18080"));
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchActionTest.java
new file mode 100644
index 0000000..738815a
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchActionTest.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.apache.http.HttpStatus.SC_OK;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.extensions.restapi.MergeConflictException;
+import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.server.project.ProjectResource;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BatchFetchActionTest {
+
+  BatchFetchAction batchFetchAction;
+  String label = "instance-2-label";
+  String master = "refs/heads/master";
+  String test = "refs/heads/test";
+
+  @Mock ProjectResource projectResource;
+  @Mock FetchAction fetchAction;
+
+  @Before
+  public void setup() {
+    batchFetchAction = new BatchFetchAction(fetchAction);
+  }
+
+  @Test
+  public void shouldDelegateToFetchActionForEveryFetchInput() throws RestApiException {
+    FetchAction.Input first = createInput(master);
+    FetchAction.Input second = createInput(test);
+
+    batchFetchAction.apply(projectResource, List.of(first, second));
+
+    verify(fetchAction).apply(projectResource, first);
+    verify(fetchAction).apply(projectResource, second);
+  }
+
+  @Test
+  public void shouldReturnOkResponseCodeWhenAllInputsAreProcessedSuccessfully()
+      throws RestApiException {
+    FetchAction.Input first = createInput(master);
+    FetchAction.Input second = createInput(test);
+
+    when(fetchAction.apply(any(), any()))
+        .thenAnswer((Answer<Response<?>>) invocation -> Response.accepted("some-url"));
+    Response<?> response = batchFetchAction.apply(projectResource, List.of(first, second));
+
+    assertThat(response.statusCode()).isEqualTo(SC_OK);
+  }
+
+  @Test
+  public void shouldReturnAListWithAllResponsesOnSuccess() throws RestApiException {
+    FetchAction.Input first = createInput(master);
+    FetchAction.Input second = createInput(test);
+    String masterUrl = "master-url";
+    String testUrl = "test-url";
+    Response.Accepted firstResponse = Response.accepted(masterUrl);
+    Response.Accepted secondResponse = Response.accepted(testUrl);
+
+    when(fetchAction.apply(projectResource, first))
+        .thenAnswer((Answer<Response<?>>) invocation -> firstResponse);
+    when(fetchAction.apply(projectResource, second))
+        .thenAnswer((Answer<Response<?>>) invocation -> secondResponse);
+    Response<?> response = batchFetchAction.apply(projectResource, List.of(first, second));
+
+    assertThat((List<Response<?>>) response.value())
+        .isEqualTo(List.of(firstResponse, secondResponse));
+  }
+
+  @Test
+  public void shouldReturnAMixOfSyncAndAsyncResponses() throws RestApiException {
+    FetchAction.Input async = createInput(master);
+    FetchAction.Input sync = createInput(test);
+    String masterUrl = "master-url";
+    Response.Accepted asyncResponse = Response.accepted(masterUrl);
+    Response<?> syncResponse = Response.created(sync);
+
+    when(fetchAction.apply(projectResource, async))
+        .thenAnswer((Answer<Response<?>>) invocation -> asyncResponse);
+    when(fetchAction.apply(projectResource, sync))
+        .thenAnswer((Answer<Response<?>>) invocation -> syncResponse);
+    Response<?> response = batchFetchAction.apply(projectResource, List.of(async, sync));
+
+    assertThat((List<Response<?>>) response.value())
+        .isEqualTo(List.of(asyncResponse, syncResponse));
+  }
+
+  @Test(expected = RestApiException.class)
+  public void shouldThrowRestApiExceptionWhenProcessingFailsForAnInput() throws RestApiException {
+    FetchAction.Input first = createInput(master);
+    FetchAction.Input second = createInput(test);
+    String masterUrl = "master-url";
+
+    when(fetchAction.apply(projectResource, first))
+        .thenAnswer((Answer<Response<?>>) invocation -> Response.accepted(masterUrl));
+    when(fetchAction.apply(projectResource, second)).thenThrow(new MergeConflictException("BOOM"));
+
+    batchFetchAction.apply(projectResource, List.of(first, second));
+  }
+
+  private FetchAction.Input createInput(String refName) {
+    FetchAction.Input input = new FetchAction.Input();
+    input.label = label;
+    input.refName = refName;
+    return input;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java
index 9b0106c..dc6a12f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java
@@ -81,6 +81,11 @@
   }
 
   @Test
+  public void shouldAuthenticateWhenBatchFetch() throws ServletException, IOException {
+    authenticateAndFilter("any-prefix/pull-replication~batch-fetch", NO_QUERY_PARAMETERS);
+  }
+
+  @Test
   public void shouldAuthenticateWhenApplyObject() throws ServletException, IOException {
     authenticateAndFilter("any-prefix/pull-replication~apply-object", NO_QUERY_PARAMETERS);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java
index e64bc6f..c796034 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java
@@ -169,6 +169,25 @@
   }
 
   @Test
+  public void shouldCallBatchFetchEndpoint() throws IOException, URISyntaxException {
+
+    objectUnderTest.callBatchFetch(
+        Project.nameKey("test_repo"),
+        List.of(refName, RefNames.REFS_HEADS + "test"),
+        new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    assertThat(httpPost.getURI().getHost()).isEqualTo("gerrit-host");
+    assertThat(httpPost.getURI().getPath())
+        .isEqualTo(
+            String.format(
+                "%s/projects/test_repo/pull-replication~batch-fetch", urlAuthenticationPrefix()));
+    assertAuthentication(httpPost);
+  }
+
+  @Test
   public void shouldByDefaultCallSyncFetchForAllRefs() throws IOException, URISyntaxException {
 
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
@@ -205,6 +224,41 @@
   }
 
   @Test
+  public void shouldCallAsyncBatchFetchForAllRefs() throws IOException, URISyntaxException {
+
+    when(config.getStringList("replication", null, "syncRefs"))
+        .thenReturn(new String[] {"NO_SYNC_REFS"});
+    syncRefsFilter = new SyncRefsFilter(replicationConfig);
+    objectUnderTest =
+        new FetchRestApiClient(
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            instanceId,
+            bearerTokenProvider,
+            source);
+
+    String testRef = RefNames.REFS_HEADS + "test";
+    List<String> refs = List.of(refName, testRef);
+    objectUnderTest.callBatchFetch(Project.nameKey("test_repo"), refs, new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    String expectedPayload =
+        "[{\"label\":\"Replication\", \"ref_name\": \""
+            + refName
+            + "\", \"async\":true},"
+            + "{\"label\":\"Replication\", \"ref_name\": \""
+            + refs.get(1)
+            + "\", \"async\":true}"
+            + "]";
+    assertThat(readPayload(httpPost)).isEqualTo(expectedPayload);
+  }
+
+  @Test
   public void shouldCallSyncFetchOnlyForMetaRef() throws IOException, URISyntaxException {
     String metaRefName = "refs/changes/01/101/meta";
     String expectedMetaRefPayload =
@@ -236,6 +290,33 @@
   }
 
   @Test
+  public void shouldCallSyncBatchFetchOnlyForMetaRef() throws IOException, URISyntaxException {
+    String metaRefName = "refs/changes/01/101/meta";
+    String expectedMetaRefPayload =
+        "[{\"label\":\"Replication\", \"ref_name\": \"" + metaRefName + "\", \"async\":false}]";
+
+    when(config.getStringList("replication", null, "syncRefs"))
+        .thenReturn(new String[] {"^refs\\/changes\\/.*\\/meta"});
+    syncRefsFilter = new SyncRefsFilter(replicationConfig);
+    objectUnderTest =
+        new FetchRestApiClient(
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            instanceId,
+            bearerTokenProvider,
+            source);
+
+    objectUnderTest.callBatchFetch(
+        Project.nameKey("test_repo"), List.of(metaRefName), new URIish(api));
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+    HttpPost httpPost = httpPostCaptor.getValue();
+    assertThat(readPayload(httpPost)).isEqualTo(expectedMetaRefPayload);
+  }
+
+  @Test
   public void shouldCallFetchEndpointWithPayload() throws IOException, URISyntaxException {
 
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
@@ -247,6 +328,62 @@
   }
 
   @Test
+  public void shouldCallBatchFetchEndpointWithPayload() throws IOException, URISyntaxException {
+
+    String testRef = RefNames.REFS_HEADS + "test";
+    List<String> refs = List.of(refName, testRef);
+    objectUnderTest.callBatchFetch(Project.nameKey("test_repo"), refs, new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    String expectedPayload =
+        "[{\"label\":\"Replication\", \"ref_name\": \""
+            + refName
+            + "\", \"async\":false},"
+            + "{\"label\":\"Replication\", \"ref_name\": \""
+            + refs.get(1)
+            + "\", \"async\":false}"
+            + "]";
+    assertThat(readPayload(httpPost)).isEqualTo(expectedPayload);
+  }
+
+  @Test
+  public void shouldCallBatchFetchWithAMixOfSyncAndAsyncRefs()
+      throws IOException, URISyntaxException {
+
+    when(config.getStringList("replication", null, "syncRefs"))
+        .thenReturn(new String[] {"^refs\\/heads\\/test"});
+    syncRefsFilter = new SyncRefsFilter(replicationConfig);
+    String testRef = RefNames.REFS_HEADS + "test";
+    List<String> refs = List.of(refName, testRef);
+    objectUnderTest =
+        new FetchRestApiClient(
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            instanceId,
+            bearerTokenProvider,
+            source);
+    objectUnderTest.callBatchFetch(Project.nameKey("test_repo"), refs, new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    String expectedPayload =
+        "[{\"label\":\"Replication\", \"ref_name\": \""
+            + refName
+            + "\", \"async\":true},"
+            + "{\"label\":\"Replication\", \"ref_name\": \""
+            + refs.get(1)
+            + "\", \"async\":false}"
+            + "]";
+    assertThat(readPayload(httpPost)).isEqualTo(expectedPayload);
+  }
+
+  @Test
   public void shouldSetContentTypeHeader() throws IOException, URISyntaxException {
 
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
@@ -259,6 +396,18 @@
   }
 
   @Test
+  public void shouldSetContentTypeHeaderInBatchFetch() throws IOException, URISyntaxException {
+
+    objectUnderTest.callBatchFetch(Project.nameKey("test_repo"), List.of(refName), new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    assertThat(httpPost.getLastHeader("Content-Type").getValue())
+        .isEqualTo(expectedHeader.getValue());
+  }
+
+  @Test
   public void shouldCallSendObjectEndpoint() throws IOException, URISyntaxException {
 
     objectUnderTest.callSendObject(