Merge branch 'stable-3.8'

* stable-3.8:
  Do not rely on async/wait for synchronous fetch replication
  Cover the replication failure scenario and fix the metrics
  Remove white-box unit tests on synchronous FetchCommand
  Throw Exception from tests

Change-Id: Ie67000a99bcbfd49eea7e21448920ffb91f6397e
diff --git a/BUILD b/BUILD
index dbbc7e9..53b55dc 100644
--- a/BUILD
+++ b/BUILD
@@ -18,6 +18,7 @@
     deps = [
         ":events-broker-neverlink",
         "//lib/commons:io",
+        "//plugins/delete-project",
         "//plugins/replication",
         "@commons-lang3//jar",
     ],
@@ -34,6 +35,7 @@
     deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
         ":pull-replication__plugin",
         ":pull_replication_util",
+        "//plugins/delete-project",
         "//plugins/replication",
         "//plugins/events-broker",
     ],
@@ -63,6 +65,7 @@
     ),
     deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
         ":pull-replication__plugin",
+        "//plugins/delete-project",
         "//plugins/replication",
     ],
 )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java
index 22bb073..13be628 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java
@@ -87,16 +87,6 @@
   }
 
   /**
-   * Start the end-to-end replication latency timer from a source.
-   *
-   * @param name the source name.
-   * @return the timer context.
-   */
-  public Timer1.Context<String> startEnd2End(String name) {
-    return end2EndExecutionTime.start(name);
-  }
-
-  /**
    * Record the end-to-end replication latency timer from a source.
    *
    * @param name the source name.
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 005d383..cbdfe1b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -53,7 +53,6 @@
 import com.googlesource.gerrit.plugins.replication.pull.client.HttpClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.SourceHttpClient;
 import com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule;
-import com.googlesource.gerrit.plugins.replication.pull.event.FetchRefReplicatedEventModule;
 import com.googlesource.gerrit.plugins.replication.pull.event.StreamEventModule;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
 import java.io.File;
@@ -94,8 +93,6 @@
     install(new FactoryModuleBuilder().build(FetchJob.Factory.class));
     install(new ApplyObjectCacheModule());
 
-    install(new FetchRefReplicatedEventModule());
-
     install(
         new FactoryModuleBuilder()
             .implement(HttpClient.class, SourceHttpClient.class)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index 1ba47de..e612efa 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -26,6 +26,9 @@
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.metrics.Timer1.Context;
 import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.config.GerritServerConfig;
+import com.google.gerrit.server.data.RefUpdateAttribute;
+import com.google.gerrit.server.events.BatchRefUpdateEvent;
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gerrit.server.events.RefUpdatedEvent;
@@ -35,15 +38,17 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.replication.ObservableQueue;
 import com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.GitUpdateProcessing;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.BatchApplyObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.HttpResult;
+import com.googlesource.gerrit.plugins.replication.pull.client.HttpResultUtils;
 import com.googlesource.gerrit.plugins.replication.pull.filter.ApplyObjectsRefsFilter;
 import com.googlesource.gerrit.plugins.replication.pull.filter.ExcludedRefsFilter;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.net.URISyntaxException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -55,12 +60,12 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
-import org.apache.http.client.ClientProtocolException;
+import java.util.stream.Collectors;
 import org.eclipse.jgit.errors.CorruptObjectException;
 import org.eclipse.jgit.errors.IncorrectObjectTypeException;
-import org.eclipse.jgit.errors.InvalidObjectIdException;
 import org.eclipse.jgit.errors.MissingObjectException;
 import org.eclipse.jgit.errors.RepositoryNotFoundException;
+import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.transport.URIish;
 import org.slf4j.Logger;
@@ -78,6 +83,7 @@
   static final Logger repLog = LoggerFactory.getLogger(PULL_REPLICATION_LOG_NAME);
 
   private static final Integer DEFAULT_FETCH_CALLS_TIMEOUT = 0;
+  private static final String BATCH_REF_UPDATED_EVENT_TYPE = BatchRefUpdateEvent.TYPE;
   private static final String REF_UDPATED_EVENT_TYPE = new RefUpdatedEvent().type;
   private static final String ZEROS_OBJECTID = ObjectId.zeroId().getName();
   private final ReplicationStateListener stateLog;
@@ -88,15 +94,15 @@
   private final Provider<SourcesCollection> sources; // For Guice circular dependency
   private volatile boolean running;
   private volatile boolean replaying;
-  private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
+  private final Queue<ReferenceBatchUpdatedEvent> beforeStartupEventsQueue;
   private FetchApiClient.Factory fetchClientFactory;
   private Integer fetchCallsTimeout;
   private ExcludedRefsFilter refsFilter;
   private Provider<RevisionReader> revReaderProvider;
   private final ApplyObjectMetrics applyObjectMetrics;
-  private final FetchReplicationMetrics fetchMetrics;
   private final ReplicationQueueMetrics queueMetrics;
   private final String instanceId;
+  private final boolean useBatchUpdateEvents;
   private ApplyObjectsRefsFilter applyObjectsRefsFilter;
 
   @Inject
@@ -109,9 +115,9 @@
       ExcludedRefsFilter refsFilter,
       Provider<RevisionReader> revReaderProvider,
       ApplyObjectMetrics applyObjectMetrics,
-      FetchReplicationMetrics fetchMetrics,
       ReplicationQueueMetrics queueMetrics,
       @GerritInstanceId String instanceId,
+      @GerritServerConfig Config gerritConfig,
       ApplyObjectsRefsFilter applyObjectsRefsFilter,
       ShutdownState shutdownState) {
     workQueue = wq;
@@ -124,9 +130,10 @@
     this.refsFilter = refsFilter;
     this.revReaderProvider = revReaderProvider;
     this.applyObjectMetrics = applyObjectMetrics;
-    this.fetchMetrics = fetchMetrics;
     this.queueMetrics = queueMetrics;
     this.instanceId = instanceId;
+    this.useBatchUpdateEvents =
+        gerritConfig.getBoolean("event", "stream-events", "enableBatchRefUpdatedEvents", false);
     this.applyObjectsRefsFilter = applyObjectsRefsFilter;
   }
 
@@ -170,18 +177,65 @@
 
   @Override
   public void onEvent(com.google.gerrit.server.events.Event e) {
-    if (e.type.equals(REF_UDPATED_EVENT_TYPE) && instanceId.equals(e.instanceId)) {
+    if (!instanceId.equals(e.instanceId)) {
+      return;
+    }
+
+    if (useBatchUpdateEvents) {
+      if (e.type.equals(BATCH_REF_UPDATED_EVENT_TYPE)) {
+        BatchRefUpdateEvent event = (BatchRefUpdateEvent) e;
+        repLog.info(
+            "Batch ref event received on project {} for refs: {}",
+            event.getProjectNameKey().get(),
+            String.join(",", event.getRefNames()));
+
+        long eventCreatedOn = e.eventCreatedOn;
+        List<ReferenceUpdatedEvent> refs =
+            event.refUpdates.get().stream()
+                .filter(u -> isRefToBeReplicated(u.refName))
+                .map(
+                    u -> {
+                      repLog.info(
+                          "Ref event received: {} on project {}:{} - {} => {}",
+                          refUpdateType(u.oldRev, u.newRev),
+                          event.getProjectNameKey().get(),
+                          u.refName,
+                          u.oldRev,
+                          u.newRev);
+                      return ReferenceUpdatedEvent.from(u, eventCreatedOn);
+                    })
+                .sorted(ReplicationQueue::sortByMetaRefAsLast)
+                .collect(Collectors.toList());
+
+        if (!refs.isEmpty()) {
+          ReferenceBatchUpdatedEvent referenceBatchUpdatedEvent =
+              ReferenceBatchUpdatedEvent.create(
+                  event.getProjectNameKey().get(), refs, eventCreatedOn);
+          fire(referenceBatchUpdatedEvent);
+        }
+      }
+      return;
+    }
+
+    if (e.type.equals(REF_UDPATED_EVENT_TYPE)) {
       RefUpdatedEvent event = (RefUpdatedEvent) e;
 
       if (isRefToBeReplicated(event.getRefName())) {
+        RefUpdateAttribute refUpdateAttribute = event.refUpdate.get();
         repLog.info(
             "Ref event received: {} on project {}:{} - {} => {}",
-            refUpdateType(event),
-            event.refUpdate.get().project,
-            event.getRefName(),
-            event.refUpdate.get().oldRev,
-            event.refUpdate.get().newRev);
-        fire(ReferenceUpdatedEvent.from(event));
+            refUpdateType(refUpdateAttribute.oldRev, refUpdateAttribute.newRev),
+            event.getProjectNameKey().get(),
+            refUpdateAttribute.refName,
+            refUpdateAttribute.oldRev,
+            refUpdateAttribute.newRev);
+
+        ReferenceBatchUpdatedEvent referenceBatchUpdatedEvent =
+            ReferenceBatchUpdatedEvent.create(
+                event.getProjectNameKey().get(),
+                List.of(ReferenceUpdatedEvent.from(refUpdateAttribute, e.eventCreatedOn)),
+                e.eventCreatedOn);
+        fire(referenceBatchUpdatedEvent);
       }
     }
   }
@@ -196,10 +250,16 @@
                 source.getApis().forEach(apiUrl -> source.scheduleDeleteProject(apiUrl, project)));
   }
 
-  private static String refUpdateType(RefUpdatedEvent event) {
-    if (ZEROS_OBJECTID.equals(event.refUpdate.get().oldRev)) {
+  private static int sortByMetaRefAsLast(ReferenceUpdatedEvent a, ReferenceUpdatedEvent b) {
+    repLog.debug("sortByMetaRefAsLast({} <=> {})", a.refName(), b.refName());
+    return Boolean.compare(
+        RefNames.isNoteDbMetaRef(a.refName()), RefNames.isNoteDbMetaRef(b.refName()));
+  }
+
+  private static String refUpdateType(String oldRev, String newRev) {
+    if (ZEROS_OBJECTID.equals(oldRev)) {
       return "CREATE";
-    } else if (ZEROS_OBJECTID.equals(event.refUpdate.get().newRev)) {
+    } else if (ZEROS_OBJECTID.equals(newRev)) {
       return "DELETE";
     } else {
       return "UPDATE";
@@ -210,13 +270,13 @@
     return !refsFilter.match(refName);
   }
 
-  private void fire(ReferenceUpdatedEvent event) {
+  private void fire(ReferenceBatchUpdatedEvent event) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
     fire(event, state);
     state.markAllFetchTasksScheduled();
   }
 
-  private void fire(ReferenceUpdatedEvent event, ReplicationState state) {
+  private void fire(ReferenceBatchUpdatedEvent event, ReplicationState state) {
     if (!running) {
       stateLog.warn(
           "Replication plugin did not finish startup before event, event replication is postponed",
@@ -238,12 +298,7 @@
 
       final Consumer<Source> callFunction =
           callFunction(
-              Project.nameKey(event.projectName()),
-              event.objectId(),
-              event.refName(),
-              event.eventCreatedOn(),
-              event.isDelete(),
-              state);
+              Project.nameKey(event.projectName()), event.refs(), event.eventCreatedOn(), state);
       fetchCallsPool
           .submit(() -> allSources.parallelStream().forEach(callFunction))
           .get(fetchCallsTimeout, TimeUnit.MILLISECONDS);
@@ -263,13 +318,10 @@
 
   private Consumer<Source> callFunction(
       NameKey project,
-      ObjectId objectId,
-      String refName,
+      List<ReferenceUpdatedEvent> refs,
       long eventCreatedOn,
-      boolean isDelete,
       ReplicationState state) {
-    CallFunction call =
-        getCallFunction(project, objectId, refName, eventCreatedOn, isDelete, state);
+    CallFunction call = getCallFunction(project, refs, eventCreatedOn, state);
 
     return (source) -> {
       boolean callSuccessful;
@@ -278,161 +330,250 @@
       } catch (Exception e) {
         repLog.warn(
             String.format(
-                "Failed to apply object %s on project %s:%s, falling back to git fetch",
-                objectId.name(), project, refName),
+                "Failed to batch apply object %s on project %s, falling back to git fetch",
+                refs.stream()
+                    .map(event -> String.format("%s:%s", event.refName(), event.objectId()))
+                    .collect(Collectors.joining(",")),
+                project),
             e);
         callSuccessful = false;
       }
 
       if (!callSuccessful) {
-        callFetch(source, project, refName, state);
+        if (source.enableBatchedRefs()) {
+          callBatchFetch(source, project, refs, state);
+        } else {
+          callFetch(source, project, refs, state);
+        }
       }
     };
   }
 
   private CallFunction getCallFunction(
       NameKey project,
-      ObjectId objectId,
-      String refName,
+      List<ReferenceUpdatedEvent> refs,
       long eventCreatedOn,
-      boolean isDelete,
       ReplicationState state) {
-    if (isDelete) {
-      return ((source) ->
-          callSendObject(source, project, refName, eventCreatedOn, isDelete, null, state));
-    }
 
     try {
-      Optional<RevisionData> revisionData =
-          revReaderProvider.get().read(project, objectId, refName, 0);
-      repLog.info(
-          "RevisionData is {} for {}:{}",
-          revisionData.map(RevisionData::toString).orElse("ABSENT"),
-          project,
-          refName);
+      List<BatchApplyObjectData> refsBatch =
+          refs.stream()
+              .map(ref -> toBatchApplyObject(project, ref, state))
+              .collect(Collectors.toList());
 
-      if (revisionData.isPresent()) {
-        return ((source) ->
-            callSendObject(
-                source,
-                project,
-                refName,
-                eventCreatedOn,
-                isDelete,
-                Arrays.asList(revisionData.get()),
-                state));
+      if (!containsLargeRef(refsBatch)) {
+        return ((source) -> callBatchSendObject(source, project, refsBatch, eventCreatedOn, state));
       }
-    } catch (InvalidObjectIdException | IOException e) {
+    } catch (UncheckedIOException e) {
+      stateLog.error("Falling back to calling fetch", e, state);
+    }
+    return ((source) -> callBatchFetch(source, project, refs, state));
+  }
+
+  private BatchApplyObjectData toBatchApplyObject(
+      NameKey project, ReferenceUpdatedEvent event, ReplicationState state) {
+    if (event.isDelete()) {
+      Optional<RevisionData> noRevisionData = Optional.empty();
+      return BatchApplyObjectData.create(event.refName(), noRevisionData, event.isDelete());
+    }
+    try {
+      Optional<RevisionData> maybeRevisionData =
+          revReaderProvider.get().read(project, event.objectId(), event.refName(), 0);
+      return BatchApplyObjectData.create(event.refName(), maybeRevisionData, event.isDelete());
+    } catch (IOException e) {
       stateLog.error(
           String.format(
               "Exception during reading ref: %s, project:%s, message: %s",
-              refName, project.get(), e.getMessage()),
+              event.refName(), project.get(), e.getMessage()),
           e,
           state);
+      throw new UncheckedIOException(e);
     }
-
-    return (source) -> callFetch(source, project, refName, state);
   }
 
-  private boolean callSendObject(
-      Source source,
+  private boolean containsLargeRef(List<BatchApplyObjectData> batchApplyObjectData) {
+    return batchApplyObjectData.stream().anyMatch(e -> e.revisionData().isEmpty() && !e.isDelete());
+  }
+
+  private Optional<HttpResult> callSendObject(
+      FetchApiClient fetchClient,
+      String remoteName,
+      URIish uri,
       NameKey project,
       String refName,
       long eventCreatedOn,
       boolean isDelete,
-      List<RevisionData> revision,
+      List<RevisionData> revision)
+      throws MissingParentObjectException, IOException {
+    String revisionDataStr =
+        Optional.ofNullable(revision).orElse(ImmutableList.of()).stream()
+            .map(RevisionData::toString)
+            .collect(Collectors.joining(","));
+    repLog.info(
+        "Pull replication REST API apply object to {} for {}:{} - {}",
+        uri,
+        project,
+        refName,
+        revisionDataStr);
+    Context<String> apiTimer = applyObjectMetrics.startEnd2End(remoteName);
+    HttpResult result =
+        isDelete
+            ? fetchClient.callSendObject(project, refName, eventCreatedOn, isDelete, null, uri)
+            : fetchClient.callSendObjects(project, refName, eventCreatedOn, revision, uri);
+    repLog.info(
+        "Pull replication REST API apply object to {} COMPLETED for {}:{} - {}, HTTP Result:"
+            + " {} - time:{} ms",
+        uri,
+        project,
+        refName,
+        revisionDataStr,
+        result,
+        apiTimer.stop() / 1000000.0);
+
+    return Optional.of(result);
+  }
+
+  private boolean callBatchSendObject(
+      Source source,
+      NameKey project,
+      List<BatchApplyObjectData> refsBatch,
+      long eventCreatedOn,
       ReplicationState state)
       throws MissingParentObjectException {
-    boolean resultIsSuccessful = true;
-    if (source.wouldFetchProject(project) && source.wouldFetchRef(refName)) {
-      for (String apiUrl : source.getApis()) {
-        try {
-          URIish uri = new URIish(apiUrl);
-          FetchApiClient fetchClient = fetchClientFactory.create(source);
+    boolean batchResultSuccessful = true;
+
+    List<BatchApplyObjectData> filteredRefsBatch =
+        refsBatch.stream()
+            .filter(r -> source.wouldFetchProject(project) && source.wouldFetchRef(r.refName()))
+            .collect(Collectors.toList());
+
+    String batchApplyObjectStr =
+        filteredRefsBatch.stream()
+            .map(BatchApplyObjectData::toString)
+            .collect(Collectors.joining(","));
+    FetchApiClient fetchClient = fetchClientFactory.create(source);
+    String remoteName = source.getRemoteConfigName();
+
+    for (String apiUrl : source.getApis()) {
+      try {
+        boolean resultSuccessful = true;
+        Optional<HttpResult> result = Optional.empty();
+        URIish uri = new URIish(apiUrl);
+        if (source.enableBatchedRefs()) {
           repLog.info(
-              "Pull replication REST API apply object to {} for {}:{} - {}",
+              "Pull replication REST API batch apply object to {} for {}:[{}]",
               apiUrl,
               project,
-              refName,
-              revision);
-          Context<String> apiTimer = applyObjectMetrics.startEnd2End(source.getRemoteConfigName());
-          HttpResult result =
-              isDelete
-                  ? fetchClient.callSendObject(
-                      project, refName, eventCreatedOn, isDelete, null, uri)
-                  : fetchClient.callSendObjects(project, refName, eventCreatedOn, revision, uri);
-          boolean resultSuccessful = result.isSuccessful();
+              batchApplyObjectStr);
+          Context<String> apiTimer = applyObjectMetrics.startEnd2End(remoteName);
+          result =
+              Optional.of(
+                  fetchClient.callBatchSendObject(project, filteredRefsBatch, eventCreatedOn, uri));
+          resultSuccessful = HttpResultUtils.isSuccessful(result);
           repLog.info(
-              "Pull replication REST API apply object to {} COMPLETED for {}:{} - {}, HTTP Result:"
+              "Pull replication REST API batch apply object to {} COMPLETED for {}:[{}], HTTP  Result:"
                   + " {} - time:{} ms",
               apiUrl,
               project,
-              refName,
-              revision,
-              result,
+              batchApplyObjectStr,
+              HttpResultUtils.status(result),
               apiTimer.stop() / 1000000.0);
-
-          if (!resultSuccessful
-              && result.isProjectMissing(project)
-              && source.isCreateMissingRepositories()) {
-            result = initProject(project, uri, fetchClient, result);
-            repLog.info("Missing project {} created, HTTP Result:{}", project, result);
-          }
-
-          if (!resultSuccessful) {
-            if (result.isParentObjectMissing()) {
-
-              if ((RefNames.isNoteDbMetaRef(refName) || applyObjectsRefsFilter.match(refName))
-                  && revision.size() == 1) {
-                List<RevisionData> allRevisions =
-                    fetchWholeMetaHistory(project, refName, revision.get(0));
-                repLog.info(
-                    "Pull replication REST API apply object to {} for {}:{} - {}",
-                    apiUrl,
+        } else {
+          repLog.info(
+              "REST API batch apply object not enabled for source {}, using REST API apply object to {} for {}:[{}]",
+              remoteName,
+              apiUrl,
+              project,
+              batchApplyObjectStr);
+          for (BatchApplyObjectData batchApplyObject : filteredRefsBatch) {
+            result =
+                callSendObject(
+                    fetchClient,
+                    remoteName,
+                    uri,
                     project,
-                    refName,
-                    allRevisions);
-                return callSendObject(
-                    source, project, refName, eventCreatedOn, isDelete, allRevisions, state);
-              }
+                    batchApplyObject.refName(),
+                    eventCreatedOn,
+                    batchApplyObject.isDelete(),
+                    batchApplyObject.revisionData().map(ImmutableList::of).orElse(null));
 
+            resultSuccessful = HttpResultUtils.isSuccessful(result);
+            if (!resultSuccessful) {
+              break;
+            }
+          }
+        }
+
+        if (!resultSuccessful
+            && HttpResultUtils.isProjectMissing(result, project)
+            && source.isCreateMissingRepositories()) {
+          result = initProject(project, uri, fetchClient);
+          repLog.info(
+              "Missing project {} created, HTTP Result:{}",
+              project,
+              HttpResultUtils.status(result));
+        }
+
+        if (!resultSuccessful && HttpResultUtils.isParentObjectMissing(result)) {
+          resultSuccessful = true;
+          for (BatchApplyObjectData batchApplyObject : filteredRefsBatch) {
+            String refName = batchApplyObject.refName();
+            if ((RefNames.isNoteDbMetaRef(refName) || applyObjectsRefsFilter.match(refName))
+                && batchApplyObject.revisionData().isPresent()) {
+
+              Optional<RevisionData> maybeRevisionData = batchApplyObject.revisionData();
+              List<RevisionData> allRevisions =
+                  fetchWholeMetaHistory(project, refName, maybeRevisionData.get());
+
+              Optional<HttpResult> sendObjectResult =
+                  callSendObject(
+                      fetchClient,
+                      remoteName,
+                      uri,
+                      project,
+                      refName,
+                      eventCreatedOn,
+                      batchApplyObject.isDelete(),
+                      allRevisions);
+              resultSuccessful = HttpResultUtils.isSuccessful(sendObjectResult);
+              if (!resultSuccessful) {
+                break;
+              }
+            } else {
               throw new MissingParentObjectException(
                   project, refName, source.getRemoteConfigName());
             }
           }
-
-          resultIsSuccessful &= resultSuccessful;
-        } catch (URISyntaxException e) {
-          repLog.warn(
-              "Pull replication REST API apply object to {} *FAILED* for {}:{} - {}",
-              apiUrl,
-              project,
-              refName,
-              revision,
-              e);
-          stateLog.error(String.format("Cannot parse pull replication api url:%s", apiUrl), state);
-          resultIsSuccessful = false;
-        } catch (IOException e) {
-          repLog.warn(
-              "Pull replication REST API apply object to {} *FAILED* for {}:{} - {}",
-              apiUrl,
-              project,
-              refName,
-              revision,
-              e);
-          stateLog.error(
-              String.format(
-                  "Exception during the pull replication fetch rest api call. Endpoint url:%s,"
-                      + " message:%s",
-                  apiUrl, e.getMessage()),
-              e,
-              state);
-          resultIsSuccessful = false;
         }
+
+        batchResultSuccessful &= resultSuccessful;
+      } catch (URISyntaxException e) {
+        repLog.warn(
+            "Pull replication REST API batch apply object to {} *FAILED* for {}:[{}]",
+            apiUrl,
+            project,
+            batchApplyObjectStr,
+            e);
+        stateLog.error(String.format("Cannot parse pull replication api url:%s", apiUrl), state);
+        batchResultSuccessful = false;
+      } catch (IOException | IllegalArgumentException e) {
+        repLog.warn(
+            "Pull replication REST API batch apply object to {} *FAILED* for {}:[{}]",
+            apiUrl,
+            project,
+            batchApplyObjectStr,
+            e);
+        stateLog.error(
+            String.format(
+                "Exception during the pull replication fetch rest api call. Endpoint url:%s,"
+                    + " message:%s",
+                apiUrl, e.getMessage()),
+            e,
+            state);
+        batchResultSuccessful = false;
       }
     }
-
-    return resultIsSuccessful;
+    return batchResultSuccessful;
   }
 
   private List<RevisionData> fetchWholeMetaHistory(
@@ -457,53 +598,129 @@
     return revisionDataBuilder.build();
   }
 
-  private boolean callFetch(
-      Source source, Project.NameKey project, String refName, ReplicationState state) {
-    boolean resultIsSuccessful = true;
-    if (source.wouldFetchProject(project) && source.wouldFetchRef(refName)) {
-      for (String apiUrl : source.getApis()) {
-        try {
-          URIish uri = new URIish(apiUrl);
-          FetchApiClient fetchClient = fetchClientFactory.create(source);
-          repLog.info("Pull replication REST API fetch to {} for {}:{}", apiUrl, project, refName);
-          Context<String> timer = fetchMetrics.startEnd2End(source.getRemoteConfigName());
-          HttpResult result = fetchClient.callFetch(project, refName, uri);
-          long elapsedMs = TimeUnit.NANOSECONDS.toMillis(timer.stop());
-          boolean resultSuccessful = result.isSuccessful();
-          repLog.info(
-              "Pull replication REST API fetch to {} COMPLETED for {}:{}, HTTP Result:"
-                  + " {} - time:{} ms",
-              apiUrl,
-              project,
-              refName,
-              result,
-              elapsedMs);
-          if (!resultSuccessful
-              && result.isProjectMissing(project)
-              && source.isCreateMissingRepositories()) {
-            result = initProject(project, uri, fetchClient, result);
-          }
-          if (!resultSuccessful) {
-            stateLog.warn(
-                String.format(
-                    "Pull replication rest api fetch call failed. Endpoint url: %s, reason:%s",
-                    apiUrl, result.getMessage().orElse("unknown")),
-                state);
-          }
+  private boolean callBatchFetch(
+      Source source,
+      Project.NameKey project,
+      List<ReferenceUpdatedEvent> refs,
+      ReplicationState state) {
 
-          resultIsSuccessful &= result.isSuccessful();
-        } catch (URISyntaxException e) {
-          stateLog.error(String.format("Cannot parse pull replication api url:%s", apiUrl), state);
-          resultIsSuccessful = false;
-        } catch (Exception e) {
-          stateLog.error(
+    boolean resultIsSuccessful = true;
+
+    List<String> filteredRefs =
+        refs.stream()
+            .map(ReferenceUpdatedEvent::refName)
+            .filter(refName -> source.wouldFetchProject(project) && source.wouldFetchRef(refName))
+            .collect(Collectors.toList());
+
+    String refsStr = String.join(",", filteredRefs);
+    FetchApiClient fetchClient = fetchClientFactory.create(source);
+
+    for (String apiUrl : source.getApis()) {
+      try {
+        URIish uri = new URIish(apiUrl);
+        Optional<HttpResult> result = Optional.empty();
+        repLog.info(
+            "Pull replication REST API batch fetch to {} for {}:[{}]", apiUrl, project, refsStr);
+        long startTime = System.currentTimeMillis();
+        result = Optional.of(fetchClient.callBatchFetch(project, filteredRefs, uri));
+        long endTime = System.currentTimeMillis();
+        boolean resultSuccessful = HttpResultUtils.isSuccessful(result);
+        repLog.info(
+            "Pull replication REST API batch fetch to {} COMPLETED for {}:[{}], HTTP Result:"
+                + " {} - time:{} ms",
+            apiUrl,
+            project,
+            refsStr,
+            HttpResultUtils.status(result),
+            endTime - startTime);
+        if (!resultSuccessful
+            && HttpResultUtils.isProjectMissing(result, project)
+            && source.isCreateMissingRepositories()) {
+          result = initProject(project, uri, fetchClient);
+          resultSuccessful = HttpResultUtils.isSuccessful(result);
+        }
+        if (!resultSuccessful) {
+          stateLog.warn(
               String.format(
-                  "Exception during the pull replication fetch rest api call. Endpoint url:%s,"
-                      + " message:%s",
-                  apiUrl, e.getMessage()),
-              e,
+                  "Pull replication REST API batch fetch call failed. Endpoint url: %s, reason:%s",
+                  apiUrl, HttpResultUtils.errorMsg(result)),
               state);
-          resultIsSuccessful = false;
+        }
+        resultIsSuccessful &= resultSuccessful;
+      } catch (URISyntaxException e) {
+        stateLog.error(
+            String.format("Cannot parse pull replication batch api url:%s", apiUrl), state);
+        resultIsSuccessful = false;
+      } catch (Exception e) {
+        stateLog.error(
+            String.format(
+                "Exception during the pull replication batch fetch rest api call. Endpoint url:%s,"
+                    + " message:%s",
+                apiUrl, e.getMessage()),
+            e,
+            state);
+        resultIsSuccessful = false;
+      }
+    }
+
+    return resultIsSuccessful;
+  }
+
+  private boolean callFetch(
+      Source source,
+      Project.NameKey project,
+      List<ReferenceUpdatedEvent> refs,
+      ReplicationState state) {
+    boolean resultIsSuccessful = true;
+    for (ReferenceUpdatedEvent refEvent : refs) {
+      String refName = refEvent.refName();
+      if (source.wouldFetchProject(project) && source.wouldFetchRef(refName)) {
+        for (String apiUrl : source.getApis()) {
+          try {
+            URIish uri = new URIish(apiUrl);
+            FetchApiClient fetchClient = fetchClientFactory.create(source);
+            repLog.info(
+                "Pull replication REST API fetch to {} for {}:{}", apiUrl, project, refName);
+            long startTime = System.currentTimeMillis();
+            Optional<HttpResult> result = Optional.of(fetchClient.callFetch(project, refName, uri));
+            long endTime = System.currentTimeMillis();
+            boolean resultSuccessful = HttpResultUtils.isSuccessful(result);
+            repLog.info(
+                "Pull replication REST API fetch to {} COMPLETED for {}:{}, HTTP Result:"
+                    + " {} - time: {} ms",
+                apiUrl,
+                project,
+                refName,
+                HttpResultUtils.status(result),
+                endTime - startTime);
+            if (!resultSuccessful
+                && HttpResultUtils.isProjectMissing(result, project)
+                && source.isCreateMissingRepositories()) {
+              result = initProject(project, uri, fetchClient);
+            }
+            if (!resultSuccessful) {
+              stateLog.warn(
+                  String.format(
+                      "Pull replication rest api fetch call failed. Endpoint url: %s, reason:%s",
+                      apiUrl, HttpResultUtils.errorMsg(result)),
+                  state);
+            }
+
+            resultIsSuccessful &= HttpResultUtils.isSuccessful(result);
+          } catch (URISyntaxException e) {
+            stateLog.error(
+                String.format("Cannot parse pull replication api url:%s", apiUrl), state);
+            resultIsSuccessful = false;
+          } catch (Exception e) {
+            stateLog.error(
+                String.format(
+                    "Exception during the pull replication fetch rest api call. Endpoint url:%s,"
+                        + " message:%s",
+                    apiUrl, e.getMessage()),
+                e,
+                state);
+            resultIsSuccessful = false;
+          }
         }
       }
     }
@@ -515,12 +732,12 @@
     return maxRetries == 0 || attempt < maxRetries;
   }
 
-  private HttpResult initProject(
-      Project.NameKey project, URIish uri, FetchApiClient fetchClient, HttpResult result)
-      throws IOException, ClientProtocolException {
+  private Optional<HttpResult> initProject(
+      Project.NameKey project, URIish uri, FetchApiClient fetchClient) throws IOException {
     HttpResult initProjectResult = fetchClient.initProject(project, uri);
+    Optional<HttpResult> result = Optional.empty();
     if (initProjectResult.isSuccessful()) {
-      result = fetchClient.callFetch(project, FetchOne.ALL_REFS, uri);
+      result = Optional.of(fetchClient.callFetch(project, FetchOne.ALL_REFS, uri));
     } else {
       String errorMessage = initProjectResult.getMessage().map(e -> " - Error: " + e).orElse("");
       repLog.error("Cannot create project " + project + errorMessage);
@@ -530,8 +747,14 @@
 
   private void fireBeforeStartupEvents() {
     Set<String> eventsReplayed = new HashSet<>();
-    for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) {
-      String eventKey = String.format("%s:%s", event.projectName(), event.refName());
+    for (ReferenceBatchUpdatedEvent event : beforeStartupEventsQueue) {
+      String eventKey =
+          String.format(
+              "%s:%s",
+              event.projectName(),
+              event.refs().stream()
+                  .map(ReferenceUpdatedEvent::refName)
+                  .collect(Collectors.joining()));
       if (!eventsReplayed.contains(eventKey)) {
         repLog.info("Firing pending task {}", event);
         fire(event);
@@ -556,6 +779,22 @@
   }
 
   @AutoValue
+  abstract static class ReferenceBatchUpdatedEvent {
+
+    static ReferenceBatchUpdatedEvent create(
+        String projectName, List<ReferenceUpdatedEvent> refs, long eventCreatedOn) {
+      return new AutoValue_ReplicationQueue_ReferenceBatchUpdatedEvent(
+          projectName, refs, eventCreatedOn);
+    }
+
+    public abstract String projectName();
+
+    public abstract List<ReferenceUpdatedEvent> refs();
+
+    public abstract long eventCreatedOn();
+  }
+
+  @AutoValue
   abstract static class ReferenceUpdatedEvent {
 
     static ReferenceUpdatedEvent create(
@@ -568,13 +807,13 @@
           projectName, refName, objectId, eventCreatedOn, isDelete);
     }
 
-    static ReferenceUpdatedEvent from(RefUpdatedEvent event) {
+    static ReferenceUpdatedEvent from(RefUpdateAttribute refUpdateAttribute, long eventCreatedOn) {
       return ReferenceUpdatedEvent.create(
-          event.refUpdate.get().project,
-          event.getRefName(),
-          ObjectId.fromString(event.refUpdate.get().newRev),
-          event.eventCreatedOn,
-          ZEROS_OBJECTID.equals(event.refUpdate.get().newRev));
+          refUpdateAttribute.project,
+          refUpdateAttribute.refName,
+          ObjectId.fromString(refUpdateAttribute.newRev),
+          eventCreatedOn,
+          ZEROS_OBJECTID.equals(refUpdateAttribute.newRev));
     }
 
     public abstract String projectName();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 3efd81c..fb8c239 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -891,6 +891,10 @@
     return config.replicateProjectDeletions();
   }
 
+  public boolean enableBatchedRefs() {
+    return config.enableBatchedRefs();
+  }
+
   void scheduleUpdateHead(String apiUrl, Project.NameKey project, String newHead) {
     try {
       URIish apiURI = new URIish(apiUrl);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
index 0a22a5a..1921e7b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
@@ -16,6 +16,7 @@
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.server.config.ConfigUtil;
 import com.googlesource.gerrit.plugins.replication.RemoteConfiguration;
 import java.util.concurrent.TimeUnit;
@@ -23,6 +24,7 @@
 import org.eclipse.jgit.transport.RemoteConfig;
 
 public class SourceConfiguration implements RemoteConfiguration {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   static final int DEFAULT_REPLICATION_DELAY = 4;
   static final int DEFAULT_RESCHEDULE_DELAY = 3;
   static final int DEFAULT_SLOW_LATENCY_THRESHOLD_SECS = 900;
@@ -56,6 +58,7 @@
   private int slowLatencyThreshold;
   private boolean useCGitClient;
   private int refsBatchSize;
+  private boolean enableBatchedRefs;
 
   public SourceConfiguration(RemoteConfig remoteConfig, Config cfg) {
     this.remoteConfig = remoteConfig;
@@ -110,6 +113,16 @@
                 "shutDownDrainTimeout",
                 DEFAULT_DRAIN_SHUTDOWN_TIMEOUT_SECS,
                 TimeUnit.SECONDS);
+
+    enableBatchedRefs = cfg.getBoolean("remote", name, "enableBatchedRefs", false);
+    if (!enableBatchedRefs) {
+      logger.atWarning().log(
+          "You haven't enabled batched refs in the %s node, as such you are not "
+              + "leveraging the performance improvements introduced by the batch-apply-object API. Consider "
+              + "upgrading the plugin to the latest version and consult the plugin's documentation for more "
+              + "details on the `enableBatchedRefs` configuration.",
+          name);
+    }
   }
 
   @Override
@@ -237,4 +250,8 @@
   public int getShutDownDrainTimeout() {
     return shutDownDrainTimeout;
   }
+
+  public boolean enableBatchedRefs() {
+    return enableBatchedRefs;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
index 968a03c..a0fae22 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
@@ -23,6 +23,7 @@
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.restapi.ResourceNotFoundException;
 import com.google.gerrit.metrics.Timer1;
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.permissions.PermissionBackendException;
@@ -84,7 +85,8 @@
       RevisionData revisionsData,
       String sourceLabel,
       long eventCreatedOn)
-      throws IOException, RefUpdateException, MissingParentObjectException {
+      throws IOException, RefUpdateException, MissingParentObjectException,
+          ResourceNotFoundException {
     applyObjects(name, refName, new RevisionData[] {revisionsData}, sourceLabel, eventCreatedOn);
   }
 
@@ -94,7 +96,8 @@
       RevisionData[] revisionsData,
       String sourceLabel,
       long eventCreatedOn)
-      throws IOException, RefUpdateException, MissingParentObjectException {
+      throws IOException, RefUpdateException, MissingParentObjectException,
+          ResourceNotFoundException {
 
     repLog.info(
         "Apply object from {} for {}:{} - {}",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchApplyObjectAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchApplyObjectAction.java
new file mode 100644
index 0000000..717b70e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchApplyObjectAction.java
@@ -0,0 +1,57 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
+
+import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.extensions.restapi.RestModifyView;
+import com.google.gerrit.server.project.ProjectResource;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionInput;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Singleton
+class BatchApplyObjectAction implements RestModifyView<ProjectResource, List<RevisionInput>> {
+
+  private final ApplyObjectAction applyObjectAction;
+
+  @Inject
+  BatchApplyObjectAction(ApplyObjectAction applyObjectAction) {
+    this.applyObjectAction = applyObjectAction;
+  }
+
+  @Override
+  public Response<?> apply(ProjectResource resource, List<RevisionInput> inputs)
+      throws RestApiException {
+
+    repLog.info(
+        "Batch Apply object API from {} for refs {}",
+        resource.getNameKey(),
+        inputs.stream().map(RevisionInput::getRefName).collect(Collectors.joining(",")));
+
+    List<Response<?>> allResponses = new ArrayList<>();
+    for (RevisionInput input : inputs) {
+      Response<?> individualResponse = applyObjectAction.apply(resource, input);
+      allResponses.add(individualResponse);
+    }
+
+    return Response.ok(allResponses);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchAction.java
new file mode 100644
index 0000000..c6ad47d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchAction.java
@@ -0,0 +1,47 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.extensions.restapi.RestModifyView;
+import com.google.gerrit.server.project.ProjectResource;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.Input;
+import java.util.ArrayList;
+import java.util.List;
+
+@Singleton
+public class BatchFetchAction implements RestModifyView<ProjectResource, List<Input>> {
+  private final FetchAction fetchAction;
+
+  @Inject
+  public BatchFetchAction(FetchAction fetchAction) {
+    this.fetchAction = fetchAction;
+  }
+
+  @Override
+  public Response<?> apply(ProjectResource resource, List<Input> inputs) throws RestApiException {
+
+    List<Response<?>> allResponses = new ArrayList<>();
+    for (Input input : inputs) {
+      Response<?> res = fetchAction.apply(resource, input);
+      allResponses.add(res);
+    }
+
+    return Response.ok(allResponses);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java
index 68dac6a..64a0bd8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java
@@ -125,7 +125,9 @@
     return (requestURI.contains(pluginName)
             && (requestURI.endsWith(String.format("/%s~apply-object", pluginName))
                 || requestURI.endsWith(String.format("/%s~apply-objects", pluginName))
+                || requestURI.endsWith(String.format("/%s~batch-apply-object", pluginName))
                 || requestURI.endsWith(String.format("/%s~fetch", pluginName))
+                || requestURI.endsWith(String.format("/%s~batch-fetch", pluginName))
                 || requestURI.endsWith(String.format("/%s~delete-project", pluginName))
                 || requestURI.contains(String.format("/%s/init-project/", pluginName))))
         || (requestURI.matches(String.format(".*/projects/[^/]+/%s~HEAD", pluginName))
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionAction.java
index adb333c..6a21104 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionAction.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication.pull.api;
 
+import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
+
 import com.google.gerrit.extensions.api.access.PluginPermission;
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.BadRequestException;
@@ -28,9 +30,12 @@
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.replication.LocalFS;
+import com.googlesource.gerrit.plugins.deleteproject.cache.CacheDeleteHandler;
+import com.googlesource.gerrit.plugins.deleteproject.fs.RepositoryDelete;
 import com.googlesource.gerrit.plugins.replication.pull.GerritConfigOps;
+import java.io.IOException;
 import java.util.Optional;
+import org.eclipse.jgit.errors.RepositoryNotFoundException;
 import org.eclipse.jgit.transport.URIish;
 
 @Singleton
@@ -44,15 +49,21 @@
   private final Provider<CurrentUser> userProvider;
   private final GerritConfigOps gerritConfigOps;
   private final PermissionBackend permissionBackend;
+  private final RepositoryDelete repositoryDelete;
+  private final CacheDeleteHandler cacheDeleteHandler;
 
   @Inject
   ProjectDeletionAction(
       GerritConfigOps gerritConfigOps,
       PermissionBackend permissionBackend,
-      Provider<CurrentUser> userProvider) {
+      Provider<CurrentUser> userProvider,
+      RepositoryDelete repositoryDelete,
+      CacheDeleteHandler cacheDeleteHandler) {
     this.gerritConfigOps = gerritConfigOps;
     this.permissionBackend = permissionBackend;
     this.userProvider = userProvider;
+    this.repositoryDelete = repositoryDelete;
+    this.cacheDeleteHandler = cacheDeleteHandler;
   }
 
   @Override
@@ -69,11 +80,24 @@
         gerritConfigOps.getGitRepositoryURI(String.format("%s.git", projectResource.getName()));
 
     if (maybeRepoURI.isPresent()) {
-      if (new LocalFS(maybeRepoURI.get()).deleteProject(projectResource.getNameKey())) {
+      try {
+        // reuse repo deletion logic from delete-project plugin, as it can successfully delete
+        // the git directories hosted on nfs.
+        repositoryDelete.execute(projectResource.getNameKey());
+        // delete the project from the local project cache, otherwise future ops
+        // will fail as the replica will think that the project still exists locally.
+        cacheDeleteHandler.delete(projectResource.getProjectState().getProject());
+        repLog.info(
+            "Deleted local repository {} and removed it from the local project cache",
+            projectResource.getName());
         return Response.ok();
+      } catch (RepositoryNotFoundException e) {
+        throw new ResourceNotFoundException(
+            String.format("Repository %s not found", projectResource.getName()), e);
+      } catch (IOException e) {
+        throw new UnprocessableEntityException(
+            String.format("Could not delete project %s", projectResource.getName()));
       }
-      throw new UnprocessableEntityException(
-          String.format("Could not delete project %s", projectResource.getName()));
     }
     throw new ResourceNotFoundException(
         String.format("Could not compute URI for repo: %s", projectResource.getName()));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
index 8711379..47a75ec 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
@@ -22,11 +22,11 @@
 import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.Url;
-import com.google.gerrit.index.project.ProjectIndexer;
 import com.google.gerrit.server.CurrentUser;
 import com.google.gerrit.server.permissions.GlobalPermission;
 import com.google.gerrit.server.permissions.PermissionBackend;
 import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.gerrit.server.project.ProjectCache;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
@@ -50,18 +50,18 @@
   private final GerritConfigOps gerritConfigOps;
   private final Provider<CurrentUser> userProvider;
   private final PermissionBackend permissionBackend;
-  private final ProjectIndexer projectIndexer;
+  private final ProjectCache projectCache;
 
   @Inject
   ProjectInitializationAction(
       GerritConfigOps gerritConfigOps,
       Provider<CurrentUser> userProvider,
       PermissionBackend permissionBackend,
-      ProjectIndexer projectIndexer) {
+      ProjectCache projectCache) {
     this.gerritConfigOps = gerritConfigOps;
     this.userProvider = userProvider;
     this.permissionBackend = permissionBackend;
-    this.projectIndexer = projectIndexer;
+    this.projectCache = projectCache;
   }
 
   @Override
@@ -111,7 +111,7 @@
     LocalFS localFS = new LocalFS(maybeUri.get());
     Project.NameKey projectNameKey = Project.NameKey.parse(projectName);
     if (localFS.createProject(projectNameKey, RefNames.HEAD)) {
-      projectIndexer.index(projectNameKey);
+      projectCache.evictAndReindex(projectNameKey);
       return true;
     }
     return false;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationEndpoints.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationEndpoints.java
index fc97945..9fdf5d9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationEndpoints.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationEndpoints.java
@@ -33,7 +33,10 @@
   @UsedAt(PLUGIN_MULTI_SITE)
   public static final String APPLY_OBJECTS_API_ENDPOINT = "apply-objects";
 
+  public static final String BATCH_APPLY_OBJECT_API_ENDPOINT = "batch-apply-object";
+
   public static final String FETCH_ENDPOINT = "fetch";
+  public static final String BATCH_FETCH_ENDPOINT = "batch-fetch";
   public static final String INIT_PROJECT_ENDPOINT = "init-project";
   public static final String DELETE_PROJECT_ENDPOINT = "delete-project";
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java
index 40e39ad..207456a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java
@@ -22,10 +22,12 @@
 import static javax.servlet.http.HttpServletResponse.SC_CREATED;
 import static javax.servlet.http.HttpServletResponse.SC_FORBIDDEN;
 import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
+import static javax.servlet.http.HttpServletResponse.SC_NOT_FOUND;
 import static javax.servlet.http.HttpServletResponse.SC_OK;
 import static javax.servlet.http.HttpServletResponse.SC_UNAUTHORIZED;
 
 import com.google.common.flogger.FluentLogger;
+import com.google.common.reflect.TypeToken;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.extensions.api.projects.HeadInput;
@@ -62,6 +64,8 @@
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.lang.reflect.Type;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
@@ -82,8 +86,10 @@
       Pattern.compile(".*/init-project/([^/]+.git)");
 
   private FetchAction fetchAction;
+  private BatchFetchAction batchFetchAction;
   private ApplyObjectAction applyObjectAction;
   private ApplyObjectsAction applyObjectsAction;
+  private BatchApplyObjectAction batchApplyObjectAction;
   private ProjectInitializationAction projectInitializationAction;
   private UpdateHeadAction updateHEADAction;
   private ProjectDeletionAction projectDeletionAction;
@@ -95,8 +101,10 @@
   @Inject
   public PullReplicationFilter(
       FetchAction fetchAction,
+      BatchFetchAction batchFetchAction,
       ApplyObjectAction applyObjectAction,
       ApplyObjectsAction applyObjectsAction,
+      BatchApplyObjectAction batchApplyObjectAction,
       ProjectInitializationAction projectInitializationAction,
       UpdateHeadAction updateHEADAction,
       ProjectDeletionAction projectDeletionAction,
@@ -104,8 +112,10 @@
       @PluginName String pluginName,
       Provider<CurrentUser> currentUserProvider) {
     this.fetchAction = fetchAction;
+    this.batchFetchAction = batchFetchAction;
     this.applyObjectAction = applyObjectAction;
     this.applyObjectsAction = applyObjectsAction;
+    this.batchApplyObjectAction = batchApplyObjectAction;
     this.projectInitializationAction = projectInitializationAction;
     this.updateHEADAction = updateHEADAction;
     this.projectDeletionAction = projectDeletionAction;
@@ -129,12 +139,18 @@
       if (isFetchAction(httpRequest)) {
         failIfcurrentUserIsAnonymous();
         writeResponse(httpResponse, doFetch(httpRequest));
+      } else if (isBatchFetchAction(httpRequest)) {
+        failIfcurrentUserIsAnonymous();
+        writeResponse(httpResponse, doBatchFetch(httpRequest));
       } else if (isApplyObjectAction(httpRequest)) {
         failIfcurrentUserIsAnonymous();
         writeResponse(httpResponse, doApplyObject(httpRequest));
       } else if (isApplyObjectsAction(httpRequest)) {
         failIfcurrentUserIsAnonymous();
         writeResponse(httpResponse, doApplyObjects(httpRequest));
+      } else if (isBatchApplyObjectsAction(httpRequest)) {
+        failIfcurrentUserIsAnonymous();
+        writeResponse(httpResponse, doBatchApplyObject(httpRequest));
       } else if (isInitProjectAction(httpRequest)) {
         failIfcurrentUserIsAnonymous();
         if (!checkAcceptHeader(httpRequest, httpResponse)) {
@@ -169,9 +185,12 @@
     } catch (ResourceConflictException e) {
       RestApiServlet.replyError(
           httpRequest, httpResponse, SC_CONFLICT, e.getMessage(), e.caching(), e);
-    } catch (InitProjectException | ResourceNotFoundException e) {
+    } catch (InitProjectException e) {
       RestApiServlet.replyError(
           httpRequest, httpResponse, SC_INTERNAL_SERVER_ERROR, e.getMessage(), e.caching(), e);
+    } catch (ResourceNotFoundException e) {
+      RestApiServlet.replyError(
+          httpRequest, httpResponse, SC_NOT_FOUND, e.getMessage(), e.caching(), e);
     } catch (NoSuchElementException e) {
       RestApiServlet.replyError(
           httpRequest, httpResponse, SC_BAD_REQUEST, "Project name not present in the url", e);
@@ -209,7 +228,7 @@
   @SuppressWarnings("unchecked")
   private Response<String> doApplyObject(HttpServletRequest httpRequest)
       throws RestApiException, IOException, PermissionBackendException {
-    RevisionInput input = readJson(httpRequest, TypeLiteral.get(RevisionInput.class));
+    RevisionInput input = readJson(httpRequest, TypeLiteral.get(RevisionInput.class).getType());
     IdString id = getProjectName(httpRequest).get();
 
     return (Response<String>) applyObjectAction.apply(parseProjectResource(id), input);
@@ -218,15 +237,26 @@
   @SuppressWarnings("unchecked")
   private Response<String> doApplyObjects(HttpServletRequest httpRequest)
       throws RestApiException, IOException, PermissionBackendException {
-    RevisionsInput input = readJson(httpRequest, TypeLiteral.get(RevisionsInput.class));
+    RevisionsInput input = readJson(httpRequest, TypeLiteral.get(RevisionsInput.class).getType());
     IdString id = getProjectName(httpRequest).get();
 
     return (Response<String>) applyObjectsAction.apply(parseProjectResource(id), input);
   }
 
   @SuppressWarnings("unchecked")
+  private Response<Map<String, Object>> doBatchApplyObject(HttpServletRequest httpRequest)
+      throws RestApiException, IOException, PermissionBackendException {
+    TypeToken<List<RevisionInput>> collectionType = new TypeToken<>() {};
+    List<RevisionInput> inputs = readJson(httpRequest, collectionType.getType());
+    IdString id = getProjectName(httpRequest).get();
+
+    return (Response<Map<String, Object>>)
+        batchApplyObjectAction.apply(parseProjectResource(id), inputs);
+  }
+
+  @SuppressWarnings("unchecked")
   private Response<String> doUpdateHEAD(HttpServletRequest httpRequest) throws Exception {
-    HeadInput input = readJson(httpRequest, TypeLiteral.get(HeadInput.class));
+    HeadInput input = readJson(httpRequest, TypeLiteral.get(HeadInput.class).getType());
     IdString id = getProjectName(httpRequest).get();
 
     return (Response<String>) updateHEADAction.apply(parseProjectResource(id), input);
@@ -243,7 +273,7 @@
   @SuppressWarnings("unchecked")
   private Response<Map<String, Object>> doFetch(HttpServletRequest httpRequest)
       throws IOException, RestApiException, PermissionBackendException {
-    Input input = readJson(httpRequest, TypeLiteral.get(Input.class));
+    Input input = readJson(httpRequest, TypeLiteral.get(Input.class).getType());
     IdString id = getProjectName(httpRequest).get();
 
     return (Response<Map<String, Object>>) fetchAction.apply(parseProjectResource(id), input);
@@ -257,6 +287,16 @@
     return new ProjectResource(project.get(), currentUserProvider.get());
   }
 
+  @SuppressWarnings("unchecked")
+  private Response<Map<String, Object>> doBatchFetch(HttpServletRequest httpRequest)
+      throws IOException, RestApiException, PermissionBackendException {
+    TypeToken<List<Input>> collectionType = new TypeToken<>() {};
+    List<Input> inputs = readJson(httpRequest, collectionType.getType());
+    IdString id = getProjectName(httpRequest).get();
+
+    return (Response<Map<String, Object>>) batchFetchAction.apply(parseProjectResource(id), inputs);
+  }
+
   private <T> void writeResponse(HttpServletResponse httpResponse, Response<T> response)
       throws IOException {
     String responseJson = gson.toJson(response);
@@ -272,7 +312,7 @@
     }
   }
 
-  private <T> T readJson(HttpServletRequest httpRequest, TypeLiteral<T> typeLiteral)
+  private <T> T readJson(HttpServletRequest httpRequest, Type typeToken)
       throws IOException, BadRequestException {
 
     try (BufferedReader br = httpRequest.getReader();
@@ -286,7 +326,7 @@
           throw new BadRequestException("Expected JSON object", e);
         }
 
-        return gson.fromJson(json, typeLiteral.getType());
+        return gson.fromJson(json, typeToken);
       } finally {
         try {
           // Reader.close won't consume the rest of the input. Explicitly consume the request
@@ -339,10 +379,22 @@
         .endsWith(String.format("/%s~" + APPLY_OBJECTS_API_ENDPOINT, pluginName));
   }
 
+  private boolean isBatchApplyObjectsAction(HttpServletRequest httpRequest) {
+    return httpRequest
+        .getRequestURI()
+        .endsWith(String.format("/%s~" + BATCH_APPLY_OBJECT_API_ENDPOINT, pluginName));
+  }
+
   private boolean isFetchAction(HttpServletRequest httpRequest) {
     return httpRequest.getRequestURI().endsWith(String.format("/%s~" + FETCH_ENDPOINT, pluginName));
   }
 
+  private boolean isBatchFetchAction(HttpServletRequest httpRequest) {
+    return httpRequest
+        .getRequestURI()
+        .endsWith(String.format("/%s~" + BATCH_FETCH_ENDPOINT, pluginName));
+  }
+
   private boolean isInitProjectAction(HttpServletRequest httpRequest) {
     return httpRequest
         .getRequestURI()
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/data/BatchApplyObjectData.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/data/BatchApplyObjectData.java
new file mode 100644
index 0000000..7a613c0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/data/BatchApplyObjectData.java
@@ -0,0 +1,45 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api.data;
+
+import com.google.auto.value.AutoValue;
+import java.util.Optional;
+
+@AutoValue
+public abstract class BatchApplyObjectData {
+
+  public static BatchApplyObjectData create(
+      String refName, Optional<RevisionData> revisionData, boolean isDelete)
+      throws IllegalArgumentException {
+    if (isDelete && revisionData.isPresent()) {
+      throw new IllegalArgumentException(
+          "DELETE ref-updates cannot be associated with a RevisionData");
+    }
+    return new AutoValue_BatchApplyObjectData(refName, revisionData, isDelete);
+  }
+
+  public abstract String refName();
+
+  public abstract Optional<RevisionData> revisionData();
+
+  public abstract boolean isDelete();
+
+  @Override
+  public String toString() {
+    return String.format(
+        "%s:%s isDelete=%s",
+        refName(), revisionData().map(RevisionData::toString).orElse("ABSENT"), isDelete());
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
index 1991260..134ed59 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
@@ -19,10 +19,10 @@
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.Project.NameKey;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.BatchApplyObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import java.io.IOException;
 import java.util.List;
-import org.apache.http.client.ClientProtocolException;
 import org.eclipse.jgit.transport.URIish;
 
 public interface FetchApiClient {
@@ -33,13 +33,23 @@
 
   HttpResult callFetch(
       Project.NameKey project, String refName, URIish targetUri, long startTimeNanos)
-      throws ClientProtocolException, IOException;
+      throws IOException;
 
   default HttpResult callFetch(Project.NameKey project, String refName, URIish targetUri)
-      throws ClientProtocolException, IOException {
+      throws IOException {
     return callFetch(project, refName, targetUri, MILLISECONDS.toNanos(System.currentTimeMillis()));
   }
 
+  HttpResult callBatchFetch(
+      Project.NameKey project, List<String> refsInBatch, URIish targetUri, long startTimeNanos)
+      throws IOException;
+
+  default HttpResult callBatchFetch(
+      Project.NameKey project, List<String> refsInBatch, URIish targetUri) throws IOException {
+    return callBatchFetch(
+        project, refsInBatch, targetUri, MILLISECONDS.toNanos(System.currentTimeMillis()));
+  }
+
   HttpResult initProject(Project.NameKey project, URIish uri) throws IOException;
 
   HttpResult deleteProject(Project.NameKey project, URIish apiUri) throws IOException;
@@ -53,7 +63,14 @@
       boolean isDelete,
       RevisionData revisionData,
       URIish targetUri)
-      throws ClientProtocolException, IOException;
+      throws IOException;
+
+  HttpResult callBatchSendObject(
+      NameKey project,
+      List<BatchApplyObjectData> batchApplyObjects,
+      long eventCreatedOn,
+      URIish targetUri)
+      throws IOException;
 
   HttpResult callSendObjects(
       NameKey project,
@@ -61,5 +78,5 @@
       long eventCreatedOn,
       List<RevisionData> revisionData,
       URIish targetUri)
-      throws ClientProtocolException, IOException;
+      throws IOException;
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index b606ba8..c99d4f7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -14,8 +14,10 @@
 
 package com.googlesource.gerrit.plugins.replication.pull.client;
 
+import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
 import static com.google.gson.FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES;
 import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import com.google.common.base.Strings;
 import com.google.common.flogger.FluentLogger;
@@ -35,6 +37,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.BearerTokenProvider;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
 import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.BatchApplyObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionInput;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionsInput;
@@ -42,13 +45,14 @@
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpResponse;
 import org.apache.http.ParseException;
 import org.apache.http.auth.AuthenticationException;
 import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.ResponseHandler;
 import org.apache.http.client.methods.HttpDelete;
 import org.apache.http.client.methods.HttpPost;
@@ -122,13 +126,81 @@
                 "{\"label\":\"%s\", \"ref_name\": \"%s\", \"async\":%s}",
                 instanceId, refName, callAsync),
             StandardCharsets.UTF_8));
-    post.addHeader(new BasicHeader("Content-Type", "application/json"));
+    post.addHeader(new BasicHeader(CONTENT_TYPE, "application/json"));
     post.addHeader(
         PullReplicationApiRequestMetrics.HTTP_HEADER_X_START_TIME_NANOS,
         Long.toString(startTimeNanos));
     return executeRequest(post, bearerTokenProvider.get(), targetUri);
   }
 
+  private Map<Boolean, List<String>> partitionRefsToAsyncAndSync(List<String> refsInBatch) {
+    return refsInBatch.stream()
+        .collect(
+            Collectors.partitioningBy(
+                refName -> !syncRefsFilter.match(refName),
+                Collectors.mapping(
+                    refName ->
+                        String.format(
+                            "{\"label\":\"%s\", \"ref_name\": \"%s\", \"async\":%s}",
+                            instanceId, refName, !syncRefsFilter.match(refName)),
+                    Collectors.toList())));
+  }
+
+  @Override
+  public HttpResult callBatchFetch(
+      NameKey project, List<String> refsInBatch, URIish targetUri, long startTimeNanos)
+      throws IOException {
+    Map<Boolean, List<String>> refsPartitionedInAsyncAndSync =
+        partitionRefsToAsyncAndSync(refsInBatch);
+    boolean async = true;
+    List<String> asyncRefs = refsPartitionedInAsyncAndSync.get(async);
+    List<String> syncRefs = refsPartitionedInAsyncAndSync.get(!async);
+
+    if (asyncRefs.isEmpty() && syncRefs.isEmpty()) {
+      throw new IllegalArgumentException(
+          "At least one ref should be provided during a batch-fetch operation");
+    }
+
+    String url = formatUrl(targetUri.toString(), project, "batch-fetch");
+
+    if (asyncRefs.isEmpty()) {
+      HttpPost syncPost =
+          createPostRequest(url, "[" + String.join(",", syncRefs) + "]", startTimeNanos);
+      return executeRequest(syncPost, bearerTokenProvider.get(), targetUri);
+    }
+    if (syncRefs.isEmpty()) {
+      HttpPost asyncPost =
+          createPostRequest(url, "[" + String.join(",", asyncRefs) + "]", startTimeNanos);
+      return executeRequest(asyncPost, bearerTokenProvider.get(), targetUri);
+    }
+
+    // first execute for async refs, then for sync
+    HttpPost asyncPost =
+        createPostRequest(url, "[" + String.join(",", asyncRefs) + "]", startTimeNanos);
+    HttpResult asyncResult = executeRequest(asyncPost, bearerTokenProvider.get(), targetUri);
+
+    if (asyncResult.isSuccessful()) {
+      HttpPost syncPost =
+          createPostRequest(
+              url,
+              "[" + String.join(",", syncRefs) + "]",
+              MILLISECONDS.toNanos(System.currentTimeMillis()));
+      return executeRequest(syncPost, bearerTokenProvider.get(), targetUri);
+    }
+
+    return asyncResult;
+  }
+
+  private HttpPost createPostRequest(String url, String msgBody, long startTimeNanos) {
+    HttpPost post = new HttpPost(url);
+    post.setEntity(new StringEntity(msgBody, StandardCharsets.UTF_8));
+    post.addHeader(new BasicHeader(CONTENT_TYPE, "application/json"));
+    post.addHeader(
+        PullReplicationApiRequestMetrics.HTTP_HEADER_X_START_TIME_NANOS,
+        Long.toString(startTimeNanos));
+    return post;
+  }
+
   /* (non-Javadoc)
    * @see com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient#initProject(com.google.gerrit.entities.Project.NameKey, org.eclipse.jgit.transport.URIish)
    */
@@ -137,7 +209,7 @@
     String url = formatInitProjectUrl(uri.toString(), project);
     HttpPut put = new HttpPut(url);
     put.addHeader(new BasicHeader("Accept", MediaType.ANY_TEXT_TYPE.toString()));
-    put.addHeader(new BasicHeader("Content-Type", MediaType.PLAIN_TEXT_UTF_8.toString()));
+    put.addHeader(new BasicHeader(CONTENT_TYPE, MediaType.PLAIN_TEXT_UTF_8.toString()));
     return executeRequest(put, bearerTokenProvider.get(), uri);
   }
 
@@ -162,7 +234,7 @@
     HttpPut req = new HttpPut(url);
     req.setEntity(
         new StringEntity(String.format("{\"ref\": \"%s\"}", newHead), StandardCharsets.UTF_8));
-    req.addHeader(new BasicHeader("Content-Type", MediaType.JSON_UTF_8.toString()));
+    req.addHeader(new BasicHeader(CONTENT_TYPE, MediaType.JSON_UTF_8.toString()));
     return executeRequest(req, bearerTokenProvider.get(), apiUri);
   }
 
@@ -177,7 +249,7 @@
       boolean isDelete,
       @Nullable RevisionData revisionData,
       URIish targetUri)
-      throws ClientProtocolException, IOException {
+      throws IOException {
 
     if (!isDelete) {
       requireNonNull(
@@ -191,7 +263,33 @@
 
     HttpPost post = new HttpPost(url);
     post.setEntity(new StringEntity(GSON.toJson(input)));
-    post.addHeader(new BasicHeader("Content-Type", MediaType.JSON_UTF_8.toString()));
+    post.addHeader(new BasicHeader(CONTENT_TYPE, MediaType.JSON_UTF_8.toString()));
+    return executeRequest(post, bearerTokenProvider.get(), targetUri);
+  }
+
+  @Override
+  public HttpResult callBatchSendObject(
+      NameKey project,
+      List<BatchApplyObjectData> batchedRefs,
+      long eventCreatedOn,
+      URIish targetUri)
+      throws IOException {
+    List<RevisionInput> inputs =
+        batchedRefs.stream()
+            .map(
+                batchApplyObject ->
+                    new RevisionInput(
+                        instanceId,
+                        batchApplyObject.refName(),
+                        eventCreatedOn,
+                        batchApplyObject.revisionData().orElse(null)))
+            .collect(Collectors.toList());
+
+    String url = formatUrl(targetUri.toString(), project, "batch-apply-object");
+
+    HttpPost post = new HttpPost(url);
+    post.setEntity(new StringEntity(GSON.toJson(inputs)));
+    post.addHeader(new BasicHeader(CONTENT_TYPE, MediaType.JSON_UTF_8.toString()));
     return executeRequest(post, bearerTokenProvider.get(), targetUri);
   }
 
@@ -202,7 +300,7 @@
       long eventCreatedOn,
       List<RevisionData> revisionData,
       URIish targetUri)
-      throws ClientProtocolException, IOException {
+      throws IOException {
     if (revisionData.size() == 1) {
       return callSendObject(
           project, refName, eventCreatedOn, false, revisionData.get(0), targetUri);
@@ -215,7 +313,7 @@
     String url = formatUrl(targetUri.toString(), project, "apply-objects");
     HttpPost post = new HttpPost(url);
     post.setEntity(new StringEntity(GSON.toJson(input)));
-    post.addHeader(new BasicHeader("Content-Type", MediaType.JSON_UTF_8.toString()));
+    post.addHeader(new BasicHeader(CONTENT_TYPE, MediaType.JSON_UTF_8.toString()));
     return executeRequest(post, bearerTokenProvider.get(), targetUri);
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResultUtils.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResultUtils.java
new file mode 100644
index 0000000..257e7e4
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResultUtils.java
@@ -0,0 +1,43 @@
+//
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.client;
+
+import com.google.gerrit.entities.Project;
+import java.util.Optional;
+
+public class HttpResultUtils {
+
+  public static String status(Optional<HttpResult> maybeResult) {
+    return maybeResult.map(HttpResult::toString).orElse("unknown");
+  }
+
+  public static boolean isSuccessful(Optional<HttpResult> maybeResult) {
+    return maybeResult.map(HttpResult::isSuccessful).orElse(false);
+  }
+
+  public static boolean isProjectMissing(
+      Optional<HttpResult> maybeResult, Project.NameKey project) {
+    return maybeResult.map(r -> r.isProjectMissing(project)).orElse(false);
+  }
+
+  public static boolean isParentObjectMissing(Optional<HttpResult> maybeResult) {
+    return maybeResult.map(HttpResult::isParentObjectMissing).orElse(false);
+  }
+
+  public static String errorMsg(Optional<HttpResult> maybeResult) {
+    return maybeResult.flatMap(HttpResult::getMessage).orElse("unknown");
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandler.java
deleted file mode 100644
index a618e16..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandler.java
+++ /dev/null
@@ -1,67 +0,0 @@
-// Copyright (C) 2021 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication.pull.event;
-
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.entities.Change;
-import com.google.gerrit.entities.Project;
-import com.google.gerrit.entities.RefNames;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventListener;
-import com.google.gerrit.server.index.change.ChangeIndexer;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.replication.pull.Context;
-import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
-import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
-
-public class FetchRefReplicatedEventHandler implements EventListener {
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-  private ChangeIndexer changeIndexer;
-
-  @Inject
-  FetchRefReplicatedEventHandler(ChangeIndexer changeIndexer) {
-    this.changeIndexer = changeIndexer;
-  }
-
-  @Override
-  public void onEvent(Event event) {
-    if (event instanceof FetchRefReplicatedEvent && isLocalEvent()) {
-      FetchRefReplicatedEvent fetchRefReplicatedEvent = (FetchRefReplicatedEvent) event;
-      if (!RefNames.isNoteDbMetaRef(fetchRefReplicatedEvent.getRefName())
-          || !fetchRefReplicatedEvent
-              .getStatus()
-              .equals(ReplicationState.RefFetchResult.SUCCEEDED.toString())) {
-        return;
-      }
-
-      Project.NameKey projectNameKey = fetchRefReplicatedEvent.getProjectNameKey();
-      logger.atFine().log(
-          "Indexing ref '%s' for project %s",
-          fetchRefReplicatedEvent.getRefName(), projectNameKey.get());
-      Change.Id changeId = Change.Id.fromRef(fetchRefReplicatedEvent.getRefName());
-      if (changeId != null) {
-        changeIndexer.index(projectNameKey, changeId);
-      } else {
-        logger.atWarning().log(
-            "Couldn't get changeId from refName. Skipping indexing of change %s for project %s",
-            fetchRefReplicatedEvent.getRefName(), projectNameKey.get());
-      }
-    }
-  }
-
-  private boolean isLocalEvent() {
-    return Context.isLocalEvent();
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventModule.java
deleted file mode 100644
index 675563a..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventModule.java
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright (C) 2021 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication.pull.event;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.gerrit.server.events.EventListener;
-
-public class FetchRefReplicatedEventModule extends LifecycleModule {
-
-  @Override
-  protected void configure() {
-    DynamicSet.bind(binder(), EventListener.class).to(FetchRefReplicatedEventHandler.class);
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObject.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObject.java
index 36356e9..3b6b0be 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObject.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObject.java
@@ -15,6 +15,8 @@
 package com.googlesource.gerrit.plugins.replication.pull.fetch;
 
 import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.restapi.IdString;
+import com.google.gerrit.extensions.restapi.ResourceNotFoundException;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.pull.LocalGitRepositoryManagerProvider;
@@ -22,6 +24,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
 import java.io.IOException;
+import org.eclipse.jgit.errors.RepositoryNotFoundException;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.ObjectInserter;
 import org.eclipse.jgit.lib.RefUpdate;
@@ -43,7 +46,7 @@
   }
 
   public RefUpdateState apply(Project.NameKey name, RefSpec refSpec, RevisionData[] revisionsData)
-      throws MissingParentObjectException, IOException {
+      throws MissingParentObjectException, IOException, ResourceNotFoundException {
     try (Repository git = gitManager.openRepository(name)) {
 
       ObjectId refHead = null;
@@ -87,6 +90,8 @@
         RefUpdate.Result result = ru.update();
         return new RefUpdateState(refSpec.getSource(), result);
       }
+    } catch (RepositoryNotFoundException e) {
+      throw new ResourceNotFoundException(IdString.fromDecoded(name.get()));
     }
   }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 0c3e02c..0c31995 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -584,6 +584,21 @@
 	By default, replicates without matching, i.e. replicates
 	everything from all remotes.
 
+remote.NAME.enableBatchedRefs
+:	Choose whether the batch-apply-object endpoint is enabled.
+	If you set this to `true`, then there will be a single call
+	to the batch-apply-object endpoint with all the refs from
+	the batch ref update included. The default behaviour means
+	one call to the apply object(s) endpoint per ref.
+
+	*NOTE*: the default value is only needed for backwards
+	compatibility to allow migrating transparently to the
+	latest pull-replication plugin version. Once the migration is
+	over, this value should be set to `true` to leverage the
+	performance improvements introduced by the `batch-apply-object` API.
+
+	By default, false.
+
 Directory `replication`
 --------------------
 The optional directory `$site_path/etc/replication` contains Git-style
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FakeGitReferenceUpdatedEvent.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FakeGitReferenceUpdatedEvent.java
deleted file mode 100644
index 1472be2..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FakeGitReferenceUpdatedEvent.java
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright (C) 2020 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication.pull;
-
-import com.google.common.base.Suppliers;
-import com.google.gerrit.entities.Project;
-import com.google.gerrit.server.data.RefUpdateAttribute;
-import com.google.gerrit.server.events.RefUpdatedEvent;
-
-public class FakeGitReferenceUpdatedEvent extends RefUpdatedEvent {
-  FakeGitReferenceUpdatedEvent(
-      Project.NameKey project,
-      String ref,
-      String oldObjectId,
-      String newObjectId,
-      String instanceId) {
-    RefUpdateAttribute upd = new RefUpdateAttribute();
-    upd.newRev = newObjectId;
-    upd.oldRev = oldObjectId;
-    upd.project = project.get();
-    upd.refName = ref;
-    this.refUpdate = Suppliers.ofInstance(upd);
-    this.instanceId = instanceId;
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedAsyncIT.java
similarity index 90%
rename from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
rename to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedAsyncIT.java
index 0ebd0ae..a72a97e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedAsyncIT.java
@@ -29,10 +29,15 @@
     sysModule =
         "com.googlesource.gerrit.plugins.replication.pull.PullReplicationITAbstract$PullReplicationTestModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationAsyncIT extends PullReplicationITAbstract {
+public class PullReplicationBatchRefUpdatedAsyncIT extends PullReplicationITAbstract {
   @Inject private SitePaths sitePaths;
 
   @Override
+  protected boolean useBatchRefUpdateEvent() {
+    return true;
+  }
+
+  @Override
   public void setUpTestPlugin() throws Exception {
     FileBasedConfig config =
         new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedIT.java
new file mode 100644
index 0000000..2e727d1
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedIT.java
@@ -0,0 +1,33 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class PullReplicationBatchRefUpdatedIT extends PullReplicationITBase {
+
+  @Override
+  protected boolean useBatchRefUpdateEvent() {
+    return true;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBase.java
similarity index 73%
rename from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
rename to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBase.java
index 0721dd2..0f6754f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBase.java
@@ -17,26 +17,15 @@
 import static com.google.common.truth.Truth.assertThat;
 import static java.util.stream.Collectors.toList;
 
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.PushOneCommit.Result;
-import com.google.gerrit.acceptance.SkipProjectClone;
-import com.google.gerrit.acceptance.TestPlugin;
-import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.config.GerritConfig;
-import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
-import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.api.projects.BranchInput;
-import com.google.gerrit.server.config.SitePaths;
-import com.google.inject.Inject;
+import com.google.gerrit.server.events.ProjectEvent;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
 import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
-import java.util.function.Supplier;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.Repository;
@@ -45,33 +34,14 @@
 import org.eclipse.jgit.util.FS;
 import org.junit.Test;
 
-@SkipProjectClone
-@UseLocalDisk
-@TestPlugin(
-    name = "pull-replication",
-    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
-    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationFanoutConfigIT extends LightweightPluginDaemonTest {
-  private static final Optional<String> ALL_PROJECTS = Optional.empty();
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+abstract class PullReplicationFanoutConfigBase extends PullReplicationSetupBase {
   private static final int TEST_REPLICATION_DELAY = 60;
-  private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2);
-  private static final String TEST_REPLICATION_SUFFIX = "suffix1";
-  private static final String TEST_REPLICATION_REMOTE = "remote1";
 
-  @Inject private SitePaths sitePaths;
-  @Inject private ProjectOperations projectOperations;
-  private Path gitPath;
-  private FileBasedConfig config;
   private FileBasedConfig remoteConfig;
-  private FileBasedConfig secureConfig;
 
   @Override
   public void setUpTestPlugin() throws Exception {
     gitPath = sitePaths.site_path.resolve("git");
-
-    config =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
     remoteConfig =
         new FileBasedConfig(
             sitePaths
@@ -80,17 +50,8 @@
                 .toFile(),
             FS.DETECTED);
 
-    setReplicationSource(
-        TEST_REPLICATION_REMOTE); // Simulates a full replication.config initialization
-
     setRemoteConfig(TEST_REPLICATION_SUFFIX, ALL_PROJECTS);
-
-    secureConfig =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("secure.config").toFile(), FS.DETECTED);
-    setReplicationCredentials(TEST_REPLICATION_REMOTE, admin.username(), admin.httpPassword());
-    secureConfig.save();
-
-    super.setUpTestPlugin();
+    super.setUpTestPlugin(false);
   }
 
   @Test
@@ -103,8 +64,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
@@ -139,8 +100,8 @@
     RevCommit sourceCommit = pushResult.getCommit();
     final String sourceRef = pushResult.getPatchSet().refName();
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
@@ -172,8 +133,8 @@
 
     ReplicationQueue pullReplicationQueue =
         plugin.getSysInjector().getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             newBranch,
             ObjectId.zeroId().getName(),
@@ -250,17 +211,12 @@
     waitUntil(() -> sources.getAll().size() == 1);
   }
 
-  private Ref getRef(Repository repo, String branchName) throws Exception {
-    return repo.getRefDatabase().exactRef(branchName);
-  }
-
-  private Ref checkedGetRef(Repository repo, String branchName) {
-    try {
-      return repo.getRefDatabase().exactRef(branchName);
-    } catch (Exception e) {
-      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
-      return null;
-    }
+  @Override
+  protected void setReplicationSource(
+      String remoteName, List<String> replicaSuffixes, Optional<String> project)
+      throws IOException {
+    setReplicationSource(remoteName);
+}
   }
 
   private void setReplicationSource(String remoteName) throws Exception {
@@ -293,23 +249,4 @@
     project.ifPresent(prj -> remoteConfig.setString("remote", null, "projects", prj));
     remoteConfig.save();
   }
-
-  private void setReplicationCredentials(String remoteName, String username, String password)
-      throws Exception {
-    secureConfig.setString("remote", remoteName, "username", username);
-    secureConfig.setString("remote", remoteName, "password", password);
-    secureConfig.save();
-  }
-
-  private void waitUntil(Supplier<Boolean> waitCondition) throws Exception {
-    WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
-  }
-
-  private <T> T getInstance(Class<T> classObj) {
-    return plugin.getSysInjector().getInstance(classObj);
-  }
-
-  private Project.NameKey createTestProject(String name) throws Exception {
-    return projectOperations.newProject().name(name).create();
-  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBatchRefUpdateEventIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBatchRefUpdateEventIT.java
new file mode 100644
index 0000000..7babf34
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBatchRefUpdateEventIT.java
@@ -0,0 +1,33 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class PullReplicationFanoutConfigBatchRefUpdateEventIT
+    extends PullReplicationFanoutConfigBase {
+
+  @Override
+  protected boolean useBatchRefUpdateEvent() {
+    return true;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigRefUpdatedEventIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigRefUpdatedEventIT.java
new file mode 100644
index 0000000..16b0b02
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigRefUpdatedEventIT.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class PullReplicationFanoutConfigRefUpdatedEventIT extends PullReplicationFanoutConfigBase {
+
+  @Override
+  protected boolean useBatchRefUpdateEvent() {
+    return false;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
index 22b3c86..3afc773 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
@@ -34,6 +34,7 @@
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.restapi.RestApiException;
 import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.events.ProjectEvent;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
 import java.io.IOException;
@@ -108,8 +109,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
@@ -151,8 +152,8 @@
 
     ReplicationQueue pullReplicationQueue =
         plugin.getSysInjector().getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             newBranch,
             ObjectId.zeroId().getName(),
@@ -223,8 +224,8 @@
 
     ReplicationQueue pullReplicationQueue =
         plugin.getSysInjector().getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             newBranch,
             ObjectId.zeroId().getName(),
@@ -250,8 +251,8 @@
     assertThat(pushedRefs).hasSize(1);
     assertThat(pushedRefs.iterator().next().getStatus()).isEqualTo(Status.OK);
 
-    FakeGitReferenceUpdatedEvent forcedPushEvent =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent forcedPushEvent =
+        generateUpdateEvent(
             project,
             newBranch,
             branchRevision,
@@ -292,8 +293,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
@@ -336,8 +337,8 @@
 
     ReplicationQueue pullReplicationQueue =
         plugin.getSysInjector().getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             newBranch,
             ObjectId.zeroId().getName(),
@@ -437,8 +438,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java
similarity index 93%
rename from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
rename to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java
index 29bf7e4..b1e140c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java
@@ -21,8 +21,6 @@
 import static com.google.gerrit.server.group.SystemGroupBackend.REGISTERED_USERS;
 
 import com.google.gerrit.acceptance.PushOneCommit.Result;
-import com.google.gerrit.acceptance.SkipProjectClone;
-import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.config.GerritConfig;
 import com.google.gerrit.entities.Permission;
@@ -34,6 +32,7 @@
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.server.events.ProjectEvent;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient;
 import java.io.IOException;
@@ -54,14 +53,7 @@
 import org.junit.Ignore;
 import org.junit.Test;
 
-@SkipProjectClone
-@UseLocalDisk
-@TestPlugin(
-    name = "pull-replication",
-    sysModule =
-        "com.googlesource.gerrit.plugins.replication.pull.PullReplicationITAbstract$PullReplicationTestModule",
-    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationIT extends PullReplicationSetupBase {
+abstract class PullReplicationITBase extends PullReplicationSetupBase {
 
   @Override
   protected void setReplicationSource(
@@ -94,8 +86,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
@@ -127,8 +119,8 @@
 
     ReplicationQueue pullReplicationQueue =
         plugin.getSysInjector().getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             newBranch,
             ObjectId.zeroId().getName(),
@@ -170,8 +162,8 @@
 
     ReplicationQueue pullReplicationQueue =
         plugin.getSysInjector().getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             newBranch,
             ObjectId.zeroId().getName(),
@@ -196,8 +188,8 @@
     assertThat(pushedRefs).hasSize(1);
     assertThat(pushedRefs.iterator().next().getStatus()).isEqualTo(Status.OK);
 
-    FakeGitReferenceUpdatedEvent forcedPushEvent =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent forcedPushEvent =
+        generateUpdateEvent(
             project,
             newBranch,
             branchRevision,
@@ -235,8 +227,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
@@ -276,8 +268,8 @@
 
     ReplicationQueue pullReplicationQueue =
         plugin.getSysInjector().getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             newBranch,
             ObjectId.zeroId().getName(),
@@ -386,8 +378,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedAsyncIT.java
similarity index 90%
copy from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
copy to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedAsyncIT.java
index 0ebd0ae..fba8783 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedAsyncIT.java
@@ -29,10 +29,15 @@
     sysModule =
         "com.googlesource.gerrit.plugins.replication.pull.PullReplicationITAbstract$PullReplicationTestModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationAsyncIT extends PullReplicationITAbstract {
+public class PullReplicationRefUpdatedAsyncIT extends PullReplicationITAbstract {
   @Inject private SitePaths sitePaths;
 
   @Override
+  protected boolean useBatchRefUpdateEvent() {
+    return false;
+  }
+
+  @Override
   public void setUpTestPlugin() throws Exception {
     FileBasedConfig config =
         new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedIT.java
new file mode 100644
index 0000000..6e7c369
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedIT.java
@@ -0,0 +1,33 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class PullReplicationRefUpdatedIT extends PullReplicationITBase {
+
+  @Override
+  protected boolean useBatchRefUpdateEvent() {
+    return false;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
index fea576a..9cb2dc9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
@@ -16,13 +16,20 @@
 
 import static java.util.stream.Collectors.toList;
 
+import com.google.common.base.Suppliers;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.Project.NameKey;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.data.AccountAttribute;
+import com.google.gerrit.server.data.RefUpdateAttribute;
+import com.google.gerrit.server.events.BatchRefUpdateEvent;
+import com.google.gerrit.server.events.ProjectEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
 import com.google.inject.Inject;
 import java.io.File;
 import java.io.IOException;
@@ -53,6 +60,22 @@
   protected FileBasedConfig config;
   protected FileBasedConfig secureConfig;
 
+  protected abstract boolean useBatchRefUpdateEvent();
+
+  protected ProjectEvent generateUpdateEvent(
+      Project.NameKey project,
+      String ref,
+      String oldObjectId,
+      String newObjectId,
+      String instanceId) {
+
+    if (useBatchRefUpdateEvent()) {
+      return generateBatchRefUpdateEvent(project, ref, oldObjectId, newObjectId, instanceId);
+    }
+
+    return generateRefUpdateEvent(project, ref, oldObjectId, newObjectId, instanceId);
+  }
+
   protected void setUpTestPlugin(boolean loadExisting) throws Exception {
     gitPath = sitePaths.site_path.resolve("git");
 
@@ -72,6 +95,9 @@
     setReplicationCredentials(TEST_REPLICATION_REMOTE, admin.username(), admin.httpPassword());
     secureConfig.save();
 
+    cfg.setBoolean(
+        "event", "stream-events", "enableBatchRefUpdatedEvents", useBatchRefUpdateEvent());
+
     super.setUpTestPlugin();
   }
 
@@ -119,4 +145,38 @@
       List<String> replicaSuffixes, Function<String, String> toURL) {
     return replicaSuffixes.stream().map(suffix -> toURL.apply(suffix)).collect(toList());
   }
+
+  private BatchRefUpdateEvent generateBatchRefUpdateEvent(
+      Project.NameKey project,
+      String ref,
+      String oldObjectId,
+      String newObjectId,
+      String instanceId) {
+    RefUpdateAttribute upd = new RefUpdateAttribute();
+    upd.newRev = newObjectId;
+    upd.oldRev = oldObjectId;
+    upd.project = project.get();
+    upd.refName = ref;
+    BatchRefUpdateEvent event =
+        new BatchRefUpdateEvent(
+            project,
+            Suppliers.ofInstance(List.of(upd)),
+            Suppliers.ofInstance(new AccountAttribute(admin.id().get())));
+    event.instanceId = instanceId;
+    return event;
+  }
+
+  private ProjectEvent generateRefUpdateEvent(
+      NameKey project, String ref, String oldObjectId, String newObjectId, String instanceId) {
+    RefUpdateAttribute upd = new RefUpdateAttribute();
+    upd.newRev = newObjectId;
+    upd.oldRev = oldObjectId;
+    upd.project = project.get();
+    upd.refName = ref;
+    RefUpdatedEvent event = new RefUpdatedEvent();
+    event.refUpdate = Suppliers.ofInstance(upd);
+    event.submitter = Suppliers.ofInstance(new AccountAttribute(admin.id().get()));
+    event.instanceId = instanceId;
+    return event;
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java
similarity index 94%
rename from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolIT.java
rename to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java
index e55e383..2e95ef1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java
@@ -21,6 +21,7 @@
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.config.GerritConfig;
+import com.google.gerrit.server.events.ProjectEvent;
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
@@ -36,7 +37,8 @@
     name = "pull-replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationWithGitHttpTransportProtocolIT extends PullReplicationSetupBase {
+public abstract class PullReplicationWithGitHttpTransportProtocolBase
+    extends PullReplicationSetupBase {
 
   @Override
   protected void setReplicationSource(
@@ -80,8 +82,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBatchRefUpdatedIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBatchRefUpdatedIT.java
new file mode 100644
index 0000000..8c8ca37
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBatchRefUpdatedIT.java
@@ -0,0 +1,34 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class PullReplicationWithGitHttpTransportProtocolBatchRefUpdatedIT
+    extends PullReplicationWithGitHttpTransportProtocolBase {
+
+  @Override
+  protected boolean useBatchRefUpdateEvent() {
+    return true;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolRefUpdatedIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolRefUpdatedIT.java
new file mode 100644
index 0000000..5f3c7b6
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolRefUpdatedIT.java
@@ -0,0 +1,34 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class PullReplicationWithGitHttpTransportProtocolRefUpdatedIT
+    extends PullReplicationWithGitHttpTransportProtocolBase {
+
+  @Override
+  protected boolean useBatchRefUpdateEvent() {
+    return false;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
index 603528d..46e2488 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -34,12 +34,13 @@
 import com.google.common.collect.Lists;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.api.changes.NotifyHandling;
-import com.google.gerrit.extensions.common.AccountInfo;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.metrics.DisabledMetricMaker;
 import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.data.AccountAttribute;
 import com.google.gerrit.server.data.RefUpdateAttribute;
+import com.google.gerrit.server.events.BatchRefUpdateEvent;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.events.RefUpdatedEvent;
@@ -47,6 +48,7 @@
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
 import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.BatchApplyObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
@@ -58,8 +60,12 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
+import org.eclipse.jgit.errors.LargeObjectException;
+import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.URIish;
 import org.eclipse.jgit.util.FS;
 import org.junit.Before;
 import org.junit.Test;
@@ -76,6 +82,10 @@
   private static final String FOREIGN_INSTANCE_ID = "any other instance id";
   private static final String TEST_REF_NAME = "refs/meta/heads/anyref";
 
+  private static final Project.NameKey PROJECT = Project.nameKey("defaultProject");
+  private static final String NEW_OBJECT_ID =
+      ObjectId.fromString("3c1ddc050d7906adb0e29bc3bc46af8749b2f63b").getName();
+
   @Mock private WorkQueue wq;
   @Mock private Source source;
   @Mock private SourcesCollection sourceCollection;
@@ -84,23 +94,27 @@
   @Mock ReplicationStateListeners sl;
   @Mock FetchRestApiClient fetchRestApiClient;
   @Mock FetchApiClient.Factory fetchClientFactory;
-  @Mock AccountInfo accountInfo;
+  @Mock AccountAttribute accountAttribute;
   @Mock RevisionReader revReader;
   @Mock RevisionData revisionData;
   @Mock HttpResult successfulHttpResult;
   @Mock HttpResult fetchHttpResult;
+  @Mock HttpResult batchFetchHttpResult;
   @Mock RevisionData revisionDataWithParents;
   List<ObjectId> revisionDataParentObjectIds;
   @Mock HttpResult httpResult;
+  @Mock HttpResult batchHttpResult;
   @Mock ApplyObjectsRefsFilter applyObjectsRefsFilter;
+
+  @Mock Config config;
   ApplyObjectMetrics applyObjectMetrics;
-  FetchReplicationMetrics fetchMetrics;
   ReplicationQueueMetrics queueMetrics;
   ShutdownState shutdownState;
 
   @Captor ArgumentCaptor<String> stringCaptor;
   @Captor ArgumentCaptor<Project.NameKey> projectNameKeyCaptor;
   @Captor ArgumentCaptor<List<RevisionData>> revisionsDataCaptor;
+  @Captor ArgumentCaptor<List<BatchApplyObjectData>> batchRefsCaptor;
 
   private ExcludedRefsFilter refsFilter;
   private ReplicationQueue objectUnderTest;
@@ -119,8 +133,11 @@
     when(source.wouldFetchRef(anyString())).thenReturn(true);
     ImmutableList<String> apis = ImmutableList.of("http://localhost:18080");
     when(source.getApis()).thenReturn(apis);
+    when(source.enableBatchedRefs()).thenReturn(true);
     when(sourceCollection.getAll()).thenReturn(Lists.newArrayList(source));
     when(rd.get()).thenReturn(sourceCollection);
+    when(config.getBoolean("event", "stream-events", "enableBatchRefUpdatedEvents", false))
+        .thenReturn(true);
     lenient()
         .when(revReader.read(any(), any(), anyString(), eq(0)))
         .thenReturn(Optional.of(revisionData));
@@ -146,16 +163,22 @@
     lenient()
         .when(fetchRestApiClient.callSendObjects(any(), anyString(), anyLong(), any(), any()))
         .thenReturn(httpResult);
+    lenient()
+        .when(fetchRestApiClient.callBatchSendObject(any(), any(), anyLong(), any()))
+        .thenReturn(batchHttpResult);
     when(fetchRestApiClient.callFetch(any(), anyString(), any())).thenReturn(fetchHttpResult);
+    when(fetchRestApiClient.callBatchFetch(any(), any(), any())).thenReturn(batchFetchHttpResult);
     when(fetchRestApiClient.initProject(any(), any())).thenReturn(successfulHttpResult);
     when(successfulHttpResult.isSuccessful()).thenReturn(true);
     when(httpResult.isSuccessful()).thenReturn(true);
+    when(batchHttpResult.isSuccessful()).thenReturn(true);
     when(fetchHttpResult.isSuccessful()).thenReturn(true);
+    when(batchFetchHttpResult.isSuccessful()).thenReturn(true);
     when(httpResult.isProjectMissing(any())).thenReturn(false);
+    when(batchHttpResult.isProjectMissing(any())).thenReturn(false);
     when(applyObjectsRefsFilter.match(any())).thenReturn(false);
 
     applyObjectMetrics = new ApplyObjectMetrics("pull-replication", new DisabledMetricMaker());
-    fetchMetrics = new FetchReplicationMetrics("pull-replication", new DisabledMetricMaker());
     queueMetrics = new ReplicationQueueMetrics("pull-replication", new DisabledMetricMaker());
     shutdownState = new ShutdownState();
 
@@ -169,38 +192,65 @@
             refsFilter,
             () -> revReader,
             applyObjectMetrics,
-            fetchMetrics,
             queueMetrics,
             LOCAL_INSTANCE_ID,
+            config,
             applyObjectsRefsFilter,
             shutdownState);
   }
 
   @Test
-  public void shouldCallSendObjectWhenMetaRef() throws Exception {
+  public void shouldCallBatchSendObjectWhenMetaRef() throws Exception {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta");
+    objectUnderTest.start();
+    objectUnderTest.onEvent(event);
+
+    verify(fetchRestApiClient).callBatchSendObject(any(), any(), anyLong(), any());
+  }
+
+  @Test
+  public void shouldCallSendObjectWhenMetaRefAndRefUpdateEvent() throws IOException {
+    when(config.getBoolean("event", "stream-events", "enableBatchRefUpdatedEvents", false))
+        .thenReturn(false);
+
+    objectUnderTest =
+        new ReplicationQueue(
+            wq,
+            rd,
+            dis,
+            sl,
+            fetchClientFactory,
+            refsFilter,
+            () -> revReader,
+            applyObjectMetrics,
+            queueMetrics,
+            LOCAL_INSTANCE_ID,
+            config,
+            applyObjectsRefsFilter,
+            shutdownState);
+
     Event event = new TestEvent("refs/changes/01/1/meta");
     objectUnderTest.start();
     objectUnderTest.onEvent(event);
 
-    verify(fetchRestApiClient).callSendObjects(any(), anyString(), anyLong(), any(), any());
+    verify(fetchRestApiClient).callBatchSendObject(any(), any(), anyLong(), any());
   }
 
   @Test
   public void shouldIgnoreEventWhenIsNotLocalInstanceId() throws Exception {
-    Event event = new TestEvent();
+    Event event = generateBatchRefUpdateEvent(TEST_REF_NAME);
     event.instanceId = FOREIGN_INSTANCE_ID;
     objectUnderTest.start();
     objectUnderTest.onEvent(event);
 
-    verify(fetchRestApiClient, never())
-        .callSendObjects(any(), anyString(), anyLong(), any(), any());
+    verify(fetchRestApiClient, never()).callBatchSendObject(any(), any(), anyLong(), any());
   }
 
   @Test
   public void shouldCallInitProjectWhenProjectIsMissing() throws Exception {
-    Event event = new TestEvent("refs/changes/01/1/meta");
-    when(httpResult.isSuccessful()).thenReturn(false);
-    when(httpResult.isProjectMissing(any())).thenReturn(true);
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta");
+    when(batchHttpResult.isSuccessful()).thenReturn(false);
+    when(batchHttpResult.isProjectMissing(any())).thenReturn(true);
     when(source.isCreateMissingRepositories()).thenReturn(true);
 
     objectUnderTest.start();
@@ -210,10 +260,28 @@
   }
 
   @Test
-  public void shouldNotCallInitProjectWhenReplicateNewRepositoriesNotSet() throws Exception {
-    Event event = new TestEvent("refs/changes/01/1/meta");
-    when(httpResult.isSuccessful()).thenReturn(false);
-    when(httpResult.isProjectMissing(any())).thenReturn(true);
+  public void shouldCallSendObjectReorderingRefsHavingMetaAtTheEnd() throws Exception {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta", "refs/changes/01/1/1");
+    objectUnderTest.start();
+    objectUnderTest.onEvent(event);
+    verifySendObjectOrdering("refs/changes/01/1/1", "refs/changes/01/1/meta");
+  }
+
+  @Test
+  public void shouldCallSendObjectKeepingMetaAtTheEnd() throws Exception {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1", "refs/changes/01/1/meta");
+    objectUnderTest.start();
+    objectUnderTest.onEvent(event);
+    verifySendObjectOrdering("refs/changes/01/1/1", "refs/changes/01/1/meta");
+  }
+
+  @Test
+  public void shouldNotCallInitProjectWhenReplicateNewRepositoriesNotSet() throws IOException {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta");
+    when(batchHttpResult.isSuccessful()).thenReturn(false);
+    when(batchHttpResult.isProjectMissing(any())).thenReturn(true);
+    lenient().when(httpResult.isSuccessful()).thenReturn(false);
+    lenient().when(httpResult.isProjectMissing(any())).thenReturn(true);
     when(source.isCreateMissingRepositories()).thenReturn(false);
 
     objectUnderTest.start();
@@ -223,105 +291,203 @@
   }
 
   @Test
-  public void shouldCallSendObjectWhenPatchSetRef() throws Exception {
+  public void shouldCallBatchSendObjectWhenPatchSetRefAndRefUpdateEvent() throws Exception {
+    when(config.getBoolean("event", "stream-events", "enableBatchRefUpdatedEvents", false))
+        .thenReturn(false);
+
+    objectUnderTest =
+        new ReplicationQueue(
+            wq,
+            rd,
+            dis,
+            sl,
+            fetchClientFactory,
+            refsFilter,
+            () -> revReader,
+            applyObjectMetrics,
+            queueMetrics,
+            LOCAL_INSTANCE_ID,
+            config,
+            applyObjectsRefsFilter,
+            shutdownState);
+
     Event event = new TestEvent("refs/changes/01/1/1");
     objectUnderTest.start();
     objectUnderTest.onEvent(event);
 
-    verify(fetchRestApiClient).callSendObjects(any(), anyString(), anyLong(), any(), any());
+    verify(fetchRestApiClient).callBatchSendObject(any(), any(), anyLong(), any());
   }
 
   @Test
-  public void shouldFallbackToCallFetchWhenIOException() throws Exception {
-    Event event = new TestEvent("refs/changes/01/1/meta");
+  public void shouldCallBatchSendObjectWhenPatchSetRef() throws Exception {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
+    objectUnderTest.start();
+    objectUnderTest.onEvent(event);
+
+    verify(fetchRestApiClient).callBatchSendObject(any(), any(), anyLong(), any());
+  }
+
+  @Test
+  public void shouldFallbackToCallBatchFetchWhenIOException()
+      throws IOException, LargeObjectException, RefUpdateException {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta");
     objectUnderTest.start();
 
     when(revReader.read(any(), any(), anyString(), anyInt())).thenThrow(IOException.class);
 
     objectUnderTest.onEvent(event);
 
-    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+    verify(fetchRestApiClient).callBatchFetch(any(), any(), any());
   }
 
   @Test
-  public void shouldFallbackToCallFetchWhenLargeRef() throws Exception {
-    Event event = new TestEvent("refs/changes/01/1/1");
+  public void shouldFallbackToCallBatchFetchWhenLargeRef() throws Exception {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
     objectUnderTest.start();
 
     when(revReader.read(any(), any(), anyString(), anyInt())).thenReturn(Optional.empty());
 
     objectUnderTest.onEvent(event);
 
-    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+    verify(fetchRestApiClient).callBatchFetch(any(), any(), any());
   }
 
   @Test
-  public void shouldFallbackToCallFetchWhenParentObjectIsMissing() throws Exception {
-    Event event = new TestEvent("refs/changes/01/1/1");
+  public void
+      shouldFallbackToCallBatchFetchWhenParentObjectIsMissingAndRefDoesntMatchApplyObjectsRefsFilter()
+          throws Exception {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
     objectUnderTest.start();
 
-    when(httpResult.isSuccessful()).thenReturn(false);
-    when(httpResult.isParentObjectMissing()).thenReturn(true);
-    when(fetchRestApiClient.callSendObjects(any(), anyString(), anyLong(), any(), any()))
-        .thenReturn(httpResult);
+    when(batchHttpResult.isSuccessful()).thenReturn(false);
+    when(batchHttpResult.isParentObjectMissing()).thenReturn(true);
 
     objectUnderTest.onEvent(event);
 
-    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+    verify(fetchRestApiClient).callBatchFetch(any(), any(), any());
+  }
+
+  @Test
+  public void
+      shouldFallbackToApplyObjectsForEachRefWhenParentObjectIsMissingAndRefMatchesApplyObjectsRefFilter()
+          throws Exception {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1", "refs/changes/02/1/1");
+    objectUnderTest.start();
+
+    when(batchHttpResult.isSuccessful()).thenReturn(false);
+    when(batchHttpResult.isParentObjectMissing()).thenReturn(true);
+    when(applyObjectsRefsFilter.match(any())).thenReturn(true);
+
+    objectUnderTest.onEvent(event);
+
+    verify(fetchRestApiClient, times(2))
+        .callSendObjects(any(), anyString(), anyLong(), any(), any());
+    verify(fetchRestApiClient, never()).callFetch(any(), anyString(), any());
+  }
+
+  @Test
+  public void shouldFallbackToCallBatchFetchWhenParentObjectNotMissingButApplyObjectFails()
+      throws Exception {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
+    objectUnderTest.start();
+
+    when(batchHttpResult.isSuccessful()).thenReturn(false);
+    when(batchHttpResult.isParentObjectMissing()).thenReturn(false);
+    lenient().when(httpResult.isSuccessful()).thenReturn(false);
+    lenient().when(httpResult.isParentObjectMissing()).thenReturn(false);
+
+    objectUnderTest.onEvent(event);
+
+    verify(fetchRestApiClient).callBatchFetch(any(), any(), any());
   }
 
   @Test
   public void shouldFallbackToApplyAllParentObjectsWhenParentObjectIsMissingOnMetaRef()
       throws Exception {
-    Event event = new TestEvent("refs/changes/01/1/meta");
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta");
     objectUnderTest.start();
 
-    when(httpResult.isSuccessful()).thenReturn(false, true);
-    when(httpResult.isParentObjectMissing()).thenReturn(true, false);
-    when(fetchRestApiClient.callSendObjects(any(), anyString(), anyLong(), any(), any()))
-        .thenReturn(httpResult);
+    when(batchHttpResult.isSuccessful()).thenReturn(false);
+    when(batchHttpResult.isParentObjectMissing()).thenReturn(true);
 
     objectUnderTest.onEvent(event);
 
-    verify(fetchRestApiClient, times(2))
+    verify(fetchRestApiClient, times(1))
         .callSendObjects(any(), anyString(), anyLong(), revisionsDataCaptor.capture(), any());
     List<List<RevisionData>> revisionsDataValues = revisionsDataCaptor.getAllValues();
-    assertThat(revisionsDataValues).hasSize(2);
+    assertThat(revisionsDataValues).hasSize(1);
 
     List<RevisionData> firstRevisionsValues = revisionsDataValues.get(0);
-    assertThat(firstRevisionsValues).hasSize(1);
+    assertThat(firstRevisionsValues).hasSize(1 + revisionDataParentObjectIds.size());
     assertThat(firstRevisionsValues).contains(revisionData);
-
-    List<RevisionData> secondRevisionsValues = revisionsDataValues.get(1);
-    assertThat(secondRevisionsValues).hasSize(1 + revisionDataParentObjectIds.size());
   }
 
   @Test
   public void shouldFallbackToApplyAllParentObjectsWhenParentObjectIsMissingOnAllowedRefs()
       throws Exception {
     String refName = "refs/tags/test-tag";
-    Event event = new TestEvent(refName);
+    Event event = generateBatchRefUpdateEvent(refName);
     objectUnderTest.start();
 
-    when(httpResult.isSuccessful()).thenReturn(false, true);
-    when(httpResult.isParentObjectMissing()).thenReturn(true, false);
-    when(fetchRestApiClient.callSendObjects(any(), anyString(), anyLong(), any(), any()))
-        .thenReturn(httpResult);
+    when(batchHttpResult.isSuccessful()).thenReturn(false);
+    when(batchHttpResult.isParentObjectMissing()).thenReturn(true);
     when(applyObjectsRefsFilter.match(refName)).thenReturn(true);
 
     objectUnderTest.onEvent(event);
 
-    verify(fetchRestApiClient, times(2))
+    verify(fetchRestApiClient, times(1))
         .callSendObjects(any(), anyString(), anyLong(), revisionsDataCaptor.capture(), any());
     List<List<RevisionData>> revisionsDataValues = revisionsDataCaptor.getAllValues();
-    assertThat(revisionsDataValues).hasSize(2);
+    assertThat(revisionsDataValues).hasSize(1);
 
     List<RevisionData> firstRevisionsValues = revisionsDataValues.get(0);
-    assertThat(firstRevisionsValues).hasSize(1);
+    assertThat(firstRevisionsValues).hasSize(1 + revisionDataParentObjectIds.size());
     assertThat(firstRevisionsValues).contains(revisionData);
+  }
 
-    List<RevisionData> secondRevisionsValues = revisionsDataValues.get(1);
-    assertThat(secondRevisionsValues).hasSize(1 + revisionDataParentObjectIds.size());
+  @Test
+  public void shouldCallSendObjectsIfBatchedRefsNotEnabledAtSource() throws Exception {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
+    when(source.enableBatchedRefs()).thenReturn(false);
+    objectUnderTest.start();
+    objectUnderTest.onEvent(event);
+
+    verify(fetchRestApiClient, never()).callBatchSendObject(any(), any(), anyLong(), any());
+    verify(fetchRestApiClient).callSendObjects(any(), anyString(), anyLong(), any(), any());
+  }
+
+  @Test
+  public void shouldCallFetchIfBatchedRefsNotEnabledAtSource() throws Exception {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
+    when(source.enableBatchedRefs()).thenReturn(false);
+    when(httpResult.isSuccessful()).thenReturn(false);
+    when(httpResult.isParentObjectMissing()).thenReturn(false);
+
+    objectUnderTest.start();
+    objectUnderTest.onEvent(event);
+
+    verify(fetchRestApiClient, never()).callBatchFetch(any(), any(), any());
+    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+  }
+
+  @Test
+  public void shouldCallBatchFetchForAllTheRefsInTheBatchIfApplyObjectFails() throws Exception {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1", "refs/changes/02/1/1");
+    when(batchHttpResult.isSuccessful()).thenReturn(false);
+    when(batchHttpResult.isParentObjectMissing()).thenReturn(true);
+    when(applyObjectsRefsFilter.match(any())).thenReturn(true, true);
+    when(httpResult.isSuccessful()).thenReturn(true, false);
+
+    objectUnderTest.start();
+    objectUnderTest.onEvent(event);
+
+    verify(fetchRestApiClient, times(2))
+        .callSendObjects(any(), anyString(), anyLong(), any(), any());
+    verify(fetchRestApiClient)
+        .callBatchFetch(
+            PROJECT,
+            List.of("refs/changes/01/1/1", "refs/changes/02/1/1"),
+            new URIish("http://localhost:18080"));
   }
 
   @Test
@@ -343,15 +509,15 @@
             refsFilter,
             () -> revReader,
             applyObjectMetrics,
-            fetchMetrics,
             queueMetrics,
             LOCAL_INSTANCE_ID,
+            config,
             applyObjectsRefsFilter,
             shutdownState);
-    Event event = new TestEvent("refs/multi-site/version");
+    Event event = generateBatchRefUpdateEvent("refs/multi-site/version");
     objectUnderTest.onEvent(event);
 
-    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountAttribute);
   }
 
   @Test
@@ -362,10 +528,10 @@
 
   @Test
   public void shouldSkipEventWhenStarredChangesRef() {
-    Event event = new TestEvent("refs/starred-changes/41/2941/1000000");
+    Event event = generateBatchRefUpdateEvent("refs/starred-changes/41/2941/1000000");
     objectUnderTest.onEvent(event);
 
-    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountAttribute);
   }
 
   @Test
@@ -428,6 +594,36 @@
     return createTempDirectory(prefix);
   }
 
+  private BatchRefUpdateEvent generateBatchRefUpdateEvent(String... refs) {
+    List<RefUpdateAttribute> refUpdates =
+        Arrays.stream(refs)
+            .map(
+                ref -> {
+                  RefUpdateAttribute upd = new RefUpdateAttribute();
+                  upd.newRev = NEW_OBJECT_ID;
+                  upd.oldRev = ObjectId.zeroId().getName();
+                  upd.project = PROJECT.get();
+                  upd.refName = ref;
+                  return upd;
+                })
+            .collect(Collectors.toList());
+
+    BatchRefUpdateEvent event =
+        new BatchRefUpdateEvent(
+            PROJECT, Suppliers.ofInstance(refUpdates), Suppliers.ofInstance(accountAttribute));
+    event.instanceId = LOCAL_INSTANCE_ID;
+    return event;
+  }
+
+  private void verifySendObjectOrdering(String firstRef, String secondRef) throws IOException {
+    verify(fetchRestApiClient)
+        .callBatchSendObject(any(), batchRefsCaptor.capture(), anyLong(), any());
+    List<BatchApplyObjectData> batchRefs = batchRefsCaptor.getValue();
+
+    assertThat(batchRefs.get(0).refName()).isEqualTo(firstRef);
+    assertThat(batchRefs.get(1).refName()).isEqualTo(secondRef);
+  }
+
   private class TestEvent extends RefUpdatedEvent {
 
     public TestEvent() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionTest.java
index 245003a..4eda6b0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionTest.java
@@ -27,6 +27,7 @@
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.BadRequestException;
 import com.google.gerrit.extensions.restapi.ResourceConflictException;
+import com.google.gerrit.extensions.restapi.ResourceNotFoundException;
 import com.google.gerrit.extensions.restapi.Response;
 import com.google.gerrit.server.project.ProjectResource;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
@@ -210,6 +211,19 @@
     applyObjectAction.apply(projectResource, inputParams);
   }
 
+  @Test(expected = ResourceNotFoundException.class)
+  public void shouldRethrowResourceNotFoundException()
+      throws RestApiException, IOException, RefUpdateException, MissingParentObjectException {
+    RevisionInput inputParams =
+        new RevisionInput(label, refName, DUMMY_EVENT_TIMESTAMP, createSampleRevisionData());
+
+    doThrow(new ResourceNotFoundException("test_projects"))
+        .when(applyObjectCommand)
+        .applyObject(any(), anyString(), any(), anyString(), anyLong());
+
+    applyObjectAction.apply(projectResource, inputParams);
+  }
+
   private RevisionData createSampleRevisionData() {
     RevisionObjectData commitData =
         new RevisionObjectData(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java
index 2d26b92..446902e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java
@@ -26,6 +26,7 @@
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.Project.NameKey;
 import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.restapi.ResourceNotFoundException;
 import com.google.gerrit.metrics.Timer1;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventDispatcher;
@@ -104,7 +105,7 @@
   @Test
   public void shouldSendEventWhenApplyObject()
       throws PermissionBackendException, IOException, RefUpdateException,
-          MissingParentObjectException {
+          MissingParentObjectException, ResourceNotFoundException {
     RevisionData sampleRevisionData =
         createSampleRevisionData(sampleCommitObjectId, sampleTreeObjectId);
     objectUnderTest.applyObject(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchApplyObjectActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchApplyObjectActionTest.java
new file mode 100644
index 0000000..d8dabf1
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchApplyObjectActionTest.java
@@ -0,0 +1,214 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.apache.http.HttpStatus.SC_OK;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import com.google.gerrit.extensions.restapi.MergeConflictException;
+import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.server.project.ProjectResource;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionInput;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
+import java.util.Collections;
+import java.util.List;
+import javax.servlet.http.HttpServletResponse;
+import org.eclipse.jgit.lib.Constants;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BatchApplyObjectActionTest {
+
+  private static final long DUMMY_EVENT_TIMESTAMP = 1684875939;
+
+  private BatchApplyObjectAction batchApplyObjectAction;
+  private static final String LABEL = "instance-2-label";
+  private static final String REF_NAME = "refs/heads/master";
+  private static final String REF_META_NAME = "refs/meta/version";
+  private static final String SAMPLE_COMMIT_OBJECT_ID = "9f8d52853089a3cf00c02ff7bd0817bd4353a95a";
+  private static final String SAMPLE_TREE_OBJECT_ID = "4b825dc642cb6eb9a060e54bf8d69288fbee4904";
+
+  private static final String SAMPLE_COMMIT_CONTENT =
+      "tree "
+          + SAMPLE_TREE_OBJECT_ID
+          + "\n"
+          + "parent 20eb48d28be86dc88fb4bef747f08de0fbefe12d\n"
+          + "author Gerrit User 1000000 <1000000@69ec38f0-350e-4d9c-96d4-bc956f2faaac> 1610471611 +0100\n"
+          + "committer Gerrit Code Review <root@maczech-XPS-15> 1610471611 +0100\n"
+          + "\n"
+          + "Update patch set 1\n"
+          + "\n"
+          + "Change has been successfully merged by Administrator\n"
+          + "\n"
+          + "Patch-set: 1\n"
+          + "Status: merged\n"
+          + "Tag: autogenerated:gerrit:merged\n"
+          + "Reviewer: Gerrit User 1000000 <1000000@69ec38f0-350e-4d9c-96d4-bc956f2faaac>\n"
+          + "Label: SUBM=+1\n"
+          + "Submission-id: 1904-1610471611558-783c0a2f\n"
+          + "Submitted-with: OK\n"
+          + "Submitted-with: OK: Code-Review: Gerrit User 1000000 <1000000@69ec38f0-350e-4d9c-96d4-bc956f2faaac>";
+
+  @Mock private ApplyObjectAction applyObjectAction;
+  @Mock private ProjectResource projectResource;
+
+  @Before
+  public void setup() {
+    batchApplyObjectAction = new BatchApplyObjectAction(applyObjectAction);
+  }
+
+  @Test
+  public void shouldDelegateToApplyObjectActionForEveryRevision() throws RestApiException {
+    RevisionInput first =
+        new RevisionInput(LABEL, REF_NAME, DUMMY_EVENT_TIMESTAMP, createSampleRevisionData());
+    RevisionInput second =
+        new RevisionInput(LABEL, "foo", DUMMY_EVENT_TIMESTAMP, createSampleRevisionData());
+
+    batchApplyObjectAction.apply(projectResource, List.of(first, second));
+
+    verify(applyObjectAction).apply(projectResource, first);
+    verify(applyObjectAction).apply(projectResource, second);
+  }
+
+  @Test
+  public void shouldReturnOkResponseCodeWhenAllRevisionsAreProcessedSuccessfully()
+      throws RestApiException {
+    RevisionInput first =
+        new RevisionInput(LABEL, REF_NAME, DUMMY_EVENT_TIMESTAMP, createSampleRevisionData());
+    RevisionInput second =
+        new RevisionInput(LABEL, "foo", DUMMY_EVENT_TIMESTAMP, createSampleRevisionData());
+
+    when(applyObjectAction.apply(projectResource, first))
+        .thenAnswer((Answer<Response<?>>) invocation -> Response.created(first));
+    when(applyObjectAction.apply(projectResource, second))
+        .thenAnswer((Answer<Response<?>>) invocation -> Response.created(second));
+
+    Response<?> response = batchApplyObjectAction.apply(projectResource, List.of(first, second));
+
+    assertThat(response.statusCode()).isEqualTo(SC_OK);
+  }
+
+  @Test
+  public void shouldReturnAListWithAllTheRevisionsInResponseBodyOnSuccess()
+      throws RestApiException {
+    RevisionInput first =
+        new RevisionInput(LABEL, REF_NAME, DUMMY_EVENT_TIMESTAMP, createSampleRevisionData());
+    RevisionInput second =
+        new RevisionInput(LABEL, "foo", DUMMY_EVENT_TIMESTAMP, createSampleRevisionData());
+    Response<?> firstResponse = Response.created(first);
+    Response<?> secondResponse = Response.created(second);
+
+    when(applyObjectAction.apply(projectResource, first))
+        .thenAnswer((Answer<Response<?>>) invocation -> firstResponse);
+    when(applyObjectAction.apply(projectResource, second))
+        .thenAnswer((Answer<Response<?>>) invocation -> secondResponse);
+
+    Response<?> response = batchApplyObjectAction.apply(projectResource, List.of(first, second));
+
+    assertThat((List<Response<?>>) response.value())
+        .isEqualTo(List.of(firstResponse, secondResponse));
+  }
+
+  @Test
+  public void shouldAcceptAMixOfCreatesAndDeletes() throws RestApiException {
+    RevisionInput delete = new RevisionInput(LABEL, REF_NAME, DUMMY_EVENT_TIMESTAMP, null);
+    RevisionInput create =
+        new RevisionInput(LABEL, "foo", DUMMY_EVENT_TIMESTAMP, createSampleRevisionData());
+    Response<?> deleteResponse = Response.withStatusCode(HttpServletResponse.SC_NO_CONTENT, "");
+    Response<?> createResponse = Response.created(create);
+
+    when(applyObjectAction.apply(projectResource, delete))
+        .thenAnswer((Answer<Response<?>>) invocation -> deleteResponse);
+    when(applyObjectAction.apply(projectResource, create))
+        .thenAnswer((Answer<Response<?>>) invocation -> createResponse);
+
+    Response<?> response = batchApplyObjectAction.apply(projectResource, List.of(delete, create));
+
+    assertThat((List<Response<?>>) response.value())
+        .isEqualTo(List.of(deleteResponse, createResponse));
+  }
+
+  @Test
+  public void shouldReturnOneOkCodeEvenIfInputContainsBothCreatesAndDeletes()
+      throws RestApiException {
+    RevisionInput create =
+        new RevisionInput(LABEL, REF_NAME, DUMMY_EVENT_TIMESTAMP, createSampleRevisionData());
+    RevisionInput delete = new RevisionInput(LABEL, REF_META_NAME, DUMMY_EVENT_TIMESTAMP + 1, null);
+
+    List<RevisionInput> inputs = List.of(create, delete);
+
+    Response<?> response = batchApplyObjectAction.apply(projectResource, inputs);
+
+    assertThat(response.statusCode()).isEqualTo(SC_OK);
+  }
+
+  @Test(expected = RestApiException.class)
+  public void shouldThrowARestApiExceptionIfProcessingFailsForAnyOfTheRevisions()
+      throws RestApiException {
+    RevisionInput good =
+        new RevisionInput(LABEL, REF_NAME, DUMMY_EVENT_TIMESTAMP, createSampleRevisionData());
+    RevisionInput bad =
+        new RevisionInput(LABEL, "bad", DUMMY_EVENT_TIMESTAMP, createSampleRevisionData());
+
+    when(applyObjectAction.apply(projectResource, good))
+        .thenAnswer((Answer<Response<?>>) invocation -> Response.created(good));
+    when(applyObjectAction.apply(projectResource, bad))
+        .thenThrow(new MergeConflictException("BOOM"));
+
+    batchApplyObjectAction.apply(projectResource, List.of(good, bad));
+  }
+
+  @Test
+  public void shouldStopProcessingWhenAFailureOccurs() throws RestApiException {
+    RevisionInput good =
+        new RevisionInput(LABEL, REF_NAME, DUMMY_EVENT_TIMESTAMP, createSampleRevisionData());
+    RevisionInput bad =
+        new RevisionInput(LABEL, "bad", DUMMY_EVENT_TIMESTAMP, createSampleRevisionData());
+
+    when(applyObjectAction.apply(projectResource, bad))
+        .thenThrow(new MergeConflictException("BOOM"));
+
+    try {
+      batchApplyObjectAction.apply(projectResource, List.of(bad, good));
+    } catch (MergeConflictException e) {
+      verify(applyObjectAction, never()).apply(projectResource, good);
+    }
+  }
+
+  private RevisionData createSampleRevisionData() {
+    RevisionObjectData commitData =
+        new RevisionObjectData(
+            SAMPLE_COMMIT_OBJECT_ID, Constants.OBJ_COMMIT, SAMPLE_COMMIT_CONTENT.getBytes());
+    RevisionObjectData treeData =
+        new RevisionObjectData(SAMPLE_TREE_OBJECT_ID, Constants.OBJ_TREE, new byte[] {});
+    return createSampleRevisionData(commitData, treeData);
+  }
+
+  private RevisionData createSampleRevisionData(
+      RevisionObjectData commitData, RevisionObjectData treeData) {
+    return new RevisionData(Collections.emptyList(), commitData, treeData, Lists.newArrayList());
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchActionTest.java
new file mode 100644
index 0000000..738815a
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchActionTest.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.apache.http.HttpStatus.SC_OK;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.extensions.restapi.MergeConflictException;
+import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.server.project.ProjectResource;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BatchFetchActionTest {
+
+  BatchFetchAction batchFetchAction;
+  String label = "instance-2-label";
+  String master = "refs/heads/master";
+  String test = "refs/heads/test";
+
+  @Mock ProjectResource projectResource;
+  @Mock FetchAction fetchAction;
+
+  @Before
+  public void setup() {
+    batchFetchAction = new BatchFetchAction(fetchAction);
+  }
+
+  @Test
+  public void shouldDelegateToFetchActionForEveryFetchInput() throws RestApiException {
+    FetchAction.Input first = createInput(master);
+    FetchAction.Input second = createInput(test);
+
+    batchFetchAction.apply(projectResource, List.of(first, second));
+
+    verify(fetchAction).apply(projectResource, first);
+    verify(fetchAction).apply(projectResource, second);
+  }
+
+  @Test
+  public void shouldReturnOkResponseCodeWhenAllInputsAreProcessedSuccessfully()
+      throws RestApiException {
+    FetchAction.Input first = createInput(master);
+    FetchAction.Input second = createInput(test);
+
+    when(fetchAction.apply(any(), any()))
+        .thenAnswer((Answer<Response<?>>) invocation -> Response.accepted("some-url"));
+    Response<?> response = batchFetchAction.apply(projectResource, List.of(first, second));
+
+    assertThat(response.statusCode()).isEqualTo(SC_OK);
+  }
+
+  @Test
+  public void shouldReturnAListWithAllResponsesOnSuccess() throws RestApiException {
+    FetchAction.Input first = createInput(master);
+    FetchAction.Input second = createInput(test);
+    String masterUrl = "master-url";
+    String testUrl = "test-url";
+    Response.Accepted firstResponse = Response.accepted(masterUrl);
+    Response.Accepted secondResponse = Response.accepted(testUrl);
+
+    when(fetchAction.apply(projectResource, first))
+        .thenAnswer((Answer<Response<?>>) invocation -> firstResponse);
+    when(fetchAction.apply(projectResource, second))
+        .thenAnswer((Answer<Response<?>>) invocation -> secondResponse);
+    Response<?> response = batchFetchAction.apply(projectResource, List.of(first, second));
+
+    assertThat((List<Response<?>>) response.value())
+        .isEqualTo(List.of(firstResponse, secondResponse));
+  }
+
+  @Test
+  public void shouldReturnAMixOfSyncAndAsyncResponses() throws RestApiException {
+    FetchAction.Input async = createInput(master);
+    FetchAction.Input sync = createInput(test);
+    String masterUrl = "master-url";
+    Response.Accepted asyncResponse = Response.accepted(masterUrl);
+    Response<?> syncResponse = Response.created(sync);
+
+    when(fetchAction.apply(projectResource, async))
+        .thenAnswer((Answer<Response<?>>) invocation -> asyncResponse);
+    when(fetchAction.apply(projectResource, sync))
+        .thenAnswer((Answer<Response<?>>) invocation -> syncResponse);
+    Response<?> response = batchFetchAction.apply(projectResource, List.of(async, sync));
+
+    assertThat((List<Response<?>>) response.value())
+        .isEqualTo(List.of(asyncResponse, syncResponse));
+  }
+
+  @Test(expected = RestApiException.class)
+  public void shouldThrowRestApiExceptionWhenProcessingFailsForAnInput() throws RestApiException {
+    FetchAction.Input first = createInput(master);
+    FetchAction.Input second = createInput(test);
+    String masterUrl = "master-url";
+
+    when(fetchAction.apply(projectResource, first))
+        .thenAnswer((Answer<Response<?>>) invocation -> Response.accepted(masterUrl));
+    when(fetchAction.apply(projectResource, second)).thenThrow(new MergeConflictException("BOOM"));
+
+    batchFetchAction.apply(projectResource, List.of(first, second));
+  }
+
+  private FetchAction.Input createInput(String refName) {
+    FetchAction.Input input = new FetchAction.Input();
+    input.label = label;
+    input.refName = refName;
+    return input;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java
index 153a549..723ad12 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java
@@ -85,6 +85,11 @@
   }
 
   @Test
+  public void shouldAuthenticateWhenBatchFetch() throws Exception {
+    authenticateAndFilter("any-prefix/pull-replication~batch-fetch", NO_QUERY_PARAMETERS);
+  }
+
+  @Test
   public void shouldAuthenticateWhenApplyObject() throws Exception {
     authenticateAndFilter("any-prefix/pull-replication~apply-object", NO_QUERY_PARAMETERS);
   }
@@ -95,6 +100,11 @@
   }
 
   @Test
+  public void shouldAuthenticateWhenBatchApplyObject() throws Exception {
+    authenticateAndFilter("any-prefix/pull-replication~batch-apply-object", NO_QUERY_PARAMETERS);
+  }
+
+  @Test
   public void shouldAuthenticateWhenDeleteProject() throws Exception {
     authenticateAndFilter("any-prefix/pull-replication~delete-project", NO_QUERY_PARAMETERS);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionActionIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionActionIT.java
index 7c7846c..3bd9d35 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionActionIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionActionIT.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication.pull.api;
 
+import static com.google.common.truth.Truth.assertThat;
 import static com.google.gerrit.acceptance.testsuite.project.TestProjectUpdate.allowCapability;
 
 import com.google.gerrit.acceptance.config.GerritConfig;
@@ -181,6 +182,50 @@
             assertHttpResponseCode(HttpServletResponse.SC_OK));
   }
 
+  @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+  public void shouldRemoveFromTheProjectCacheWhenProjectIsSuccessfullyDeleted() throws Exception {
+    String testProjectName = project.get();
+    url = getURLWithAuthenticationPrefix(testProjectName);
+    assertThat(projectCache.get(project).isPresent()).isTrue();
+
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBasicAuthenticationAsAdmin(createDeleteRequest()),
+            assertHttpResponseCode(HttpServletResponse.SC_OK));
+    assertThat(projectCache.get(project).isPresent()).isFalse();
+  }
+
+  @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+  @GerritConfig(name = "container.replica", value = "true")
+  public void shouldRemoveFromtheReplicaProjectCacheWhenProjectIsSuccessfullyDeletedFromTheReplica()
+      throws Exception {
+    String testProjectName = project.get();
+    url = getURLWithAuthenticationPrefix(testProjectName);
+    assertThat(projectCache.get(project).isPresent()).isTrue();
+
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBasicAuthenticationAsAdmin(createDeleteRequest()),
+            assertHttpResponseCode(HttpServletResponse.SC_OK));
+    assertThat(projectCache.get(project).isPresent()).isFalse();
+  }
+
+  @Test
+  @GerritConfig(name = "container.replica", value = "true")
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+  public void shouldNotRemoveFromTheReplicaCacheIfAProjectCannotBeDeleted() throws Exception {
+    assertThat(projectCache.get(project).isPresent()).isTrue();
+    httpClientFactory
+        .create(source)
+        .execute(
+            createDeleteRequest(), assertHttpResponseCode(HttpServletResponse.SC_UNAUTHORIZED));
+    assertThat(projectCache.get(project).isPresent()).isTrue();
+  }
+
   @Override
   protected String getURLWithAuthenticationPrefix(String projectName) {
     return String.format(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilterTest.java
index 5ede668..e2afe92 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilterTest.java
@@ -9,7 +9,6 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.internal.verification.VerificationModeFactory.atLeastOnce;
-import static org.mockito.internal.verification.VerificationModeFactory.times;
 
 import com.google.common.net.MediaType;
 import com.google.gerrit.entities.Project;
@@ -40,8 +39,10 @@
   @Mock HttpServletResponse response;
   @Mock FilterChain filterChain;
   @Mock private FetchAction fetchAction;
+  @Mock private BatchFetchAction batchFetchAction;
   @Mock private ApplyObjectAction applyObjectAction;
   @Mock private ApplyObjectsAction applyObjectsAction;
+  @Mock private BatchApplyObjectAction batchApplyObjectAction;
   @Mock private ProjectInitializationAction projectInitializationAction;
   @Mock private UpdateHeadAction updateHEADAction;
   @Mock private ProjectDeletionAction projectDeletionAction;
@@ -56,12 +57,17 @@
   private final String PROJECT_NAME_GIT = "some-project.git";
   private final String FETCH_URI =
       String.format("any-prefix/projects/%s/%s~fetch", PROJECT_NAME, PLUGIN_NAME);
+  private final String BATCH_FETCH_URI =
+      String.format("any-prefix/projects/%s/%s~batch-fetch", PROJECT_NAME, PLUGIN_NAME);
   private final String APPLY_OBJECT_URI =
       String.format("any-prefix/projects/%s/%s~apply-object", PROJECT_NAME, PLUGIN_NAME);
   private final String APPLY_OBJECTS_URI =
       String.format("any-prefix/projects/%s/%s~apply-objects", PROJECT_NAME, PLUGIN_NAME);
   private final String HEAD_URI =
       String.format("any-prefix/projects/%s/%s~HEAD", PROJECT_NAME, PLUGIN_NAME);
+
+  private final String BATCH_APPLY_OBJECT_URI =
+      String.format("any-prefix/projects/%s/%s~batch-apply-object", PROJECT_NAME, PLUGIN_NAME);
   private final String DELETE_PROJECT_URI =
       String.format("any-prefix/projects/%s/%s~delete-project", PROJECT_NAME, PLUGIN_NAME);
   private final String INIT_PROJECT_URI =
@@ -76,8 +82,10 @@
   private PullReplicationFilter createPullReplicationFilter(CurrentUser currentUser) {
     return new PullReplicationFilter(
         fetchAction,
+        batchFetchAction,
         applyObjectAction,
         applyObjectsAction,
+        batchApplyObjectAction,
         projectInitializationAction,
         updateHEADAction,
         projectDeletionAction,
@@ -125,6 +133,31 @@
   }
 
   @Test
+  public void shouldFilterBatchFetchAction() throws Exception {
+    byte[] payloadBatchFetch =
+        ("[{"
+                + "\"label\":\"Replication\", "
+                + "\"ref_name\": \"refs/heads/master\", "
+                + "\"async\":false"
+                + "},"
+                + "{"
+                + "\"label\":\"Replication\", "
+                + "\"ref_name\": \"refs/heads/test\", "
+                + "\"async\":false"
+                + "}]")
+            .getBytes(StandardCharsets.UTF_8);
+
+    defineBehaviours(payloadBatchFetch, BATCH_FETCH_URI);
+    when(batchFetchAction.apply(any(), any())).thenReturn(OK_RESPONSE);
+
+    PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+
+    verifyBehaviours();
+    verify(batchFetchAction).apply(any(ProjectResource.class), any());
+  }
+
+  @Test
   public void shouldFilterApplyObjectAction() throws Exception {
 
     byte[] payloadApplyObject =
@@ -180,7 +213,6 @@
     final PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
     pullReplicationFilter.doFilter(request, response, filterChain);
 
-    verify(request, times(5)).getRequestURI();
     verify(projectInitializationAction).initProject(eq(PROJECT_NAME_GIT));
     verify(response).getWriter();
   }
@@ -211,7 +243,6 @@
     final PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
     pullReplicationFilter.doFilter(request, response, filterChain);
 
-    verify(request, times(7)).getRequestURI();
     verify(projectCache).get(Project.nameKey(PROJECT_NAME));
     verify(projectDeletionAction).apply(any(ProjectResource.class), any());
     verify(response).getWriter();
@@ -267,7 +298,7 @@
   }
 
   @Test
-  public void shouldBe500WhenResourceNotFound() throws Exception {
+  public void shouldBe404WhenResourceNotFound() throws Exception {
     when(request.getRequestURI()).thenReturn(DELETE_PROJECT_URI);
     when(request.getMethod()).thenReturn("DELETE");
     when(projectCache.get(Project.nameKey(PROJECT_NAME))).thenReturn(Optional.of(projectState));
@@ -278,7 +309,7 @@
     final PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
     pullReplicationFilter.doFilter(request, response, filterChain);
 
-    verify(response).setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+    verify(response).setStatus(HttpServletResponse.SC_NOT_FOUND);
   }
 
   @Test
@@ -364,4 +395,33 @@
 
     verify(response).setStatus(HttpServletResponse.SC_BAD_REQUEST);
   }
+
+  @Test
+  public void shouldFilterBatchApplyObjectAction() throws Exception {
+
+    byte[] payloadApplyObject =
+        ("[{\"label\":\"Replication\",\"ref_name\":\"refs/heads/foo\","
+                + "\"revision_data\":{"
+                + "\"commit_object\":{\"type\":1,\"content\":\"some-content\"},"
+                + "\"tree_object\":{\"type\":2,\"content\":\"some-content\"},"
+                + "\"blobs\":[]}"
+                + "},"
+                + "{\"label\":\"Replication\",\"ref_name\":\"refs/heads/bar\","
+                + "\"revision_data\":{"
+                + "\"commit_object\":{\"type\":1,\"content\":\"some-content\"},"
+                + "\"tree_object\":{\"type\":2,\"content\":\"some-content\"},"
+                + "\"blobs\":[]}"
+                + "}]")
+            .getBytes(StandardCharsets.UTF_8);
+
+    defineBehaviours(payloadApplyObject, BATCH_APPLY_OBJECT_URI);
+
+    when(batchApplyObjectAction.apply(any(), any())).thenReturn(OK_RESPONSE);
+
+    PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+
+    verifyBehaviours();
+    verify(batchApplyObjectAction).apply(any(ProjectResource.class), any());
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/data/BatchApplyObjectDataTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/data/BatchApplyObjectDataTest.java
new file mode 100644
index 0000000..bf74b56
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/data/BatchApplyObjectDataTest.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api.data;
+
+import java.util.Optional;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BatchApplyObjectDataTest {
+
+  @Mock private RevisionData revisionData;
+
+  @Test(expected = IllegalArgumentException.class)
+  public void shouldFailIfRevisionDataIsPresentForADelete() {
+    BatchApplyObjectData.create("foo", Optional.of(revisionData), true);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java
index 25aa2a7..dd8ec7d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java
@@ -30,12 +30,16 @@
 import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
 import com.googlesource.gerrit.plugins.replication.pull.BearerTokenProvider;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.BatchApplyObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.filter.SyncRefsFilter;
 import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
 import org.apache.http.Header;
 import org.apache.http.client.methods.HttpDelete;
 import org.apache.http.client.methods.HttpPost;
@@ -163,6 +167,25 @@
   }
 
   @Test
+  public void shouldCallBatchFetchEndpoint() throws Exception {
+
+    objectUnderTest.callBatchFetch(
+        Project.nameKey("test_repo"),
+        List.of(refName, RefNames.REFS_HEADS + "test"),
+        new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    assertThat(httpPost.getURI().getHost()).isEqualTo("gerrit-host");
+    assertThat(httpPost.getURI().getPath())
+        .isEqualTo(
+            String.format(
+                "%s/projects/test_repo/pull-replication~batch-fetch", urlAuthenticationPrefix()));
+    assertAuthentication(httpPost);
+  }
+
+  @Test
   public void shouldByDefaultCallSyncFetchForAllRefs() throws Exception {
 
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
@@ -199,6 +222,41 @@
   }
 
   @Test
+  public void shouldCallAsyncBatchFetchForAllRefs() throws Exception {
+
+    when(config.getStringList("replication", null, "syncRefs"))
+        .thenReturn(new String[] {"NO_SYNC_REFS"});
+    syncRefsFilter = new SyncRefsFilter(replicationConfig);
+    objectUnderTest =
+        new FetchRestApiClient(
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            instanceId,
+            bearerTokenProvider,
+            source);
+
+    String testRef = RefNames.REFS_HEADS + "test";
+    List<String> refs = List.of(refName, testRef);
+    objectUnderTest.callBatchFetch(Project.nameKey("test_repo"), refs, new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    String expectedPayload =
+        "[{\"label\":\"Replication\", \"ref_name\": \""
+            + refName
+            + "\", \"async\":true},"
+            + "{\"label\":\"Replication\", \"ref_name\": \""
+            + refs.get(1)
+            + "\", \"async\":true}"
+            + "]";
+    assertThat(readPayload(httpPost)).isEqualTo(expectedPayload);
+  }
+
+  @Test
   public void shouldCallSyncFetchOnlyForMetaRef() throws Exception {
     String metaRefName = "refs/changes/01/101/meta";
     String expectedMetaRefPayload =
@@ -230,8 +288,34 @@
   }
 
   @Test
-  public void shouldCallFetchEndpointWithPayload() throws Exception {
+  public void shouldCallSyncBatchFetchOnlyForMetaRef() throws Exception {
+    String metaRefName = "refs/changes/01/101/meta";
+    String expectedMetaRefPayload =
+        "[{\"label\":\"Replication\", \"ref_name\": \"" + metaRefName + "\", \"async\":false}]";
 
+    when(config.getStringList("replication", null, "syncRefs"))
+        .thenReturn(new String[] {"^refs\\/changes\\/.*\\/meta"});
+    syncRefsFilter = new SyncRefsFilter(replicationConfig);
+    objectUnderTest =
+        new FetchRestApiClient(
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            instanceId,
+            bearerTokenProvider,
+            source);
+
+    objectUnderTest.callBatchFetch(
+        Project.nameKey("test_repo"), List.of(metaRefName), new URIish(api));
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+    HttpPost httpPost = httpPostCaptor.getValue();
+    assertThat(readPayload(httpPost)).isEqualTo(expectedMetaRefPayload);
+  }
+
+  @Test
+  public void shouldCallFetchEndpointWithPayload() throws Exception {
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
 
     verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
@@ -241,6 +325,93 @@
   }
 
   @Test
+  public void shouldCallBatchFetchEndpointWithPayload() throws IOException, URISyntaxException {
+
+    String testRef = RefNames.REFS_HEADS + "test";
+    List<String> refs = List.of(refName, testRef);
+    objectUnderTest.callBatchFetch(Project.nameKey("test_repo"), refs, new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    String expectedPayload =
+        "[{\"label\":\"Replication\", \"ref_name\": \""
+            + refName
+            + "\", \"async\":false},"
+            + "{\"label\":\"Replication\", \"ref_name\": \""
+            + refs.get(1)
+            + "\", \"async\":false}"
+            + "]";
+    assertThat(readPayload(httpPost)).isEqualTo(expectedPayload);
+  }
+
+  @Test
+  public void shouldExecuteOneFetchCallForAsyncAndOneForSyncRefsDuringBatchFetch()
+      throws IOException, URISyntaxException {
+
+    when(config.getStringList("replication", null, "syncRefs"))
+        .thenReturn(new String[] {"^refs\\/heads\\/test"});
+    syncRefsFilter = new SyncRefsFilter(replicationConfig);
+    String testRef = RefNames.REFS_HEADS + "test";
+    List<String> refs = List.of(refName, testRef);
+    objectUnderTest =
+        new FetchRestApiClient(
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            instanceId,
+            bearerTokenProvider,
+            source);
+    objectUnderTest.callBatchFetch(Project.nameKey("test_repo"), refs, new URIish(api));
+
+    verify(httpClient, times(2)).execute(httpPostCaptor.capture(), any());
+
+    List<HttpPost> httpPosts = httpPostCaptor.getAllValues();
+    String expectedSyncPayload =
+        "[{\"label\":\"Replication\", \"ref_name\": \""
+            + refs.get(1)
+            + "\", \"async\":false}"
+            + "]";
+    String expectedAsyncPayload =
+        "[{\"label\":\"Replication\", \"ref_name\": \"" + refName + "\", \"async\":true}]";
+
+    assertThat(readPayload(httpPosts.get(0))).isEqualTo(expectedAsyncPayload);
+    assertThat(readPayload(httpPosts.get(1))).isEqualTo(expectedSyncPayload);
+  }
+
+  @Test
+  public void shouldNotExecuteSyncFetchCallWhenAsyncCallFailsDuringBatchFetch()
+      throws IOException, URISyntaxException {
+    when(config.getStringList("replication", null, "syncRefs"))
+        .thenReturn(new String[] {"^refs\\/heads\\/test"});
+    when(httpClient.execute(any(), any())).thenReturn(new HttpResult(500, Optional.of("BOOM")));
+    syncRefsFilter = new SyncRefsFilter(replicationConfig);
+    String testRef = RefNames.REFS_HEADS + "test";
+    List<String> refs = List.of(refName, testRef);
+    objectUnderTest =
+        new FetchRestApiClient(
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            instanceId,
+            bearerTokenProvider,
+            source);
+    objectUnderTest.callBatchFetch(Project.nameKey("test_repo"), refs, new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    String expectedAsyncPayload =
+        "[{\"label\":\"Replication\", \"ref_name\": \"" + refName + "\", \"async\":true}]";
+
+    assertThat(readPayload(httpPost)).isEqualTo(expectedAsyncPayload);
+  }
+
+  @Test
   public void shouldSetContentTypeHeader() throws Exception {
 
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
@@ -253,8 +424,19 @@
   }
 
   @Test
-  public void shouldCallSendObjectEndpoint() throws Exception {
+  public void shouldSetContentTypeHeaderInBatchFetch() throws Exception {
 
+    objectUnderTest.callBatchFetch(Project.nameKey("test_repo"), List.of(refName), new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    assertThat(httpPost.getLastHeader("Content-Type").getValue())
+        .isEqualTo(expectedHeader.getValue());
+  }
+
+  @Test
+  public void shouldCallSendObjectEndpoint() throws Exception {
     objectUnderTest.callSendObject(
         Project.nameKey("test_repo"),
         refName,
@@ -428,19 +610,139 @@
     assertAuthentication(httpPut);
   }
 
+  @Test
+  public void shouldCallBatchSendObjectEndpoint() throws Exception {
+
+    List<BatchApplyObjectData> batchApplyObjects = new ArrayList<>();
+    batchApplyObjects.add(
+        BatchApplyObjectData.create(refName, Optional.of(createSampleRevisionData("a")), false));
+
+    objectUnderTest.callBatchSendObject(
+        Project.nameKey("test_repo"), batchApplyObjects, eventCreatedOn, new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    assertThat(httpPost.getURI().getHost()).isEqualTo("gerrit-host");
+    assertThat(httpPost.getURI().getPath())
+        .isEqualTo(
+            String.format(
+                "%s/projects/test_repo/pull-replication~batch-apply-object",
+                urlAuthenticationPrefix()));
+    assertAuthentication(httpPost);
+  }
+
+  @Test
+  public void shouldCallBatchApplyObjectEndpointWithAListOfRefsInPayload() throws Exception {
+    List<BatchApplyObjectData> batchApplyObjects = new ArrayList<>();
+    RevisionData revisionA = createSampleRevisionData("a");
+    RevisionData revisionB = createSampleRevisionData("b");
+    String refNameB = "refs/heads/b";
+    batchApplyObjects.add(BatchApplyObjectData.create(refName, Optional.of(revisionA), false));
+    batchApplyObjects.add(BatchApplyObjectData.create(refNameB, Optional.of(revisionB), false));
+
+    objectUnderTest.callBatchSendObject(
+        Project.nameKey("test_repo"), batchApplyObjects, eventCreatedOn, new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+
+    String expectedSendObjectsPayload =
+        "[{\"label\":\"Replication\",\"ref_name\":\""
+            + refName
+            + "\",\"event_created_on\":"
+            + eventCreatedOn
+            + ",\"revision_data\":{\"commit_object\":{\"sha1\":\""
+            + revisionA.getCommitObject().getSha1()
+            + "\",\"type\":1,\"content\":\"Y29tbWl0YWNvbnRlbnQ\\u003d\"},\"tree_object\":{\"sha1\":\""
+            + revisionA.getTreeObject().getSha1()
+            + "\",\"type\":2,\"content\":\"dHJlZWFjb250ZW50\"},\"blobs\":[{\"sha1\":\""
+            + revisionA.getBlobs().get(0).getSha1()
+            + "\",\"type\":3,\"content\":\"YmxvYmFjb250ZW50\"}]}},"
+            + "{\"label\":\"Replication\",\"ref_name\":\""
+            + refNameB
+            + "\",\"event_created_on\":"
+            + eventCreatedOn
+            + ",\"revision_data\":{\"commit_object\":{\"sha1\":\""
+            + revisionB.getCommitObject().getSha1()
+            + "\",\"type\":1,\"content\":\"Y29tbWl0YmNvbnRlbnQ\\u003d\"},\"tree_object\":{\"sha1\":\""
+            + revisionB.getTreeObject().getSha1()
+            + "\",\"type\":2,\"content\":\"dHJlZWJjb250ZW50\"},\"blobs\":[{\"sha1\":\""
+            + revisionB.getBlobs().get(0).getSha1()
+            + "\",\"type\":3,\"content\":\"YmxvYmJjb250ZW50\"}]}}]";
+    assertThat(readPayload(httpPost)).isEqualTo(expectedSendObjectsPayload);
+  }
+
+  @Test
+  public void shouldCallBatchApplyObjectEndpointWithNoRevisionDataForDeletes() throws Exception {
+    List<BatchApplyObjectData> batchApplyObjects = new ArrayList<>();
+    batchApplyObjects.add(BatchApplyObjectData.create(refName, Optional.empty(), true));
+
+    objectUnderTest.callBatchSendObject(
+        Project.nameKey("test_repo"), batchApplyObjects, eventCreatedOn, new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+
+    String expectedSendObjectsPayload =
+        "[{\"label\":\"Replication\",\"ref_name\":\""
+            + refName
+            + "\",\"event_created_on\":"
+            + eventCreatedOn
+            + "}]";
+    assertThat(readPayload(httpPost)).isEqualTo(expectedSendObjectsPayload);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void shouldThrowExceptionIfDeleteFlagIsSetButRevisionDataIsPresentForBatchSendEndpoint()
+      throws Exception {
+    List<BatchApplyObjectData> batchApplyObjects = new ArrayList<>();
+    batchApplyObjects.add(
+        BatchApplyObjectData.create(refName, Optional.of(createSampleRevisionData()), true));
+
+    objectUnderTest.callBatchSendObject(
+        Project.nameKey("test_repo"), batchApplyObjects, eventCreatedOn, new URIish(api));
+  }
+
   public String readPayload(HttpPost entity) throws Exception {
     ByteBuffer buf = IO.readWholeStream(entity.getEntity().getContent(), 1024);
     return RawParseUtils.decode(buf.array(), buf.arrayOffset(), buf.limit()).trim();
   }
 
-  private RevisionData createSampleRevisionData() {
+  private RevisionData createSampleRevisionData(String prefix) {
+    String commitPrefix = "commit" + prefix;
+    String treePrefix = "tree" + prefix;
+    String blobPrefix = "blob" + prefix;
+    return createSampleRevisionData(
+        commitPrefix,
+        commitPrefix + "content",
+        treePrefix,
+        treePrefix + "content",
+        blobPrefix,
+        blobPrefix + "content");
+  }
+
+  private RevisionData createSampleRevisionData(
+      String commitObjectId,
+      String commitContent,
+      String treeObjectId,
+      String treeContent,
+      String blobObjectId,
+      String blobContent) {
     RevisionObjectData commitData =
-        new RevisionObjectData(commitObjectId, Constants.OBJ_COMMIT, commitObject.getBytes());
+        new RevisionObjectData(commitObjectId, Constants.OBJ_COMMIT, commitContent.getBytes());
     RevisionObjectData treeData =
-        new RevisionObjectData(treeObjectId, Constants.OBJ_TREE, treeObject.getBytes());
+        new RevisionObjectData(treeObjectId, Constants.OBJ_TREE, treeContent.getBytes());
     RevisionObjectData blobData =
-        new RevisionObjectData(blobObjectId, Constants.OBJ_BLOB, blobObject.getBytes());
+        new RevisionObjectData(blobObjectId, Constants.OBJ_BLOB, blobContent.getBytes());
     return new RevisionData(
         Collections.emptyList(), commitData, treeData, Lists.newArrayList(blobData));
   }
+
+  private RevisionData createSampleRevisionData() {
+    return createSampleRevisionData(
+        commitObjectId, commitObject, treeObjectId, treeObject, blobObjectId, blobObject);
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResultTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResultTest.java
index d82f3a5..3f73729 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResultTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResultTest.java
@@ -35,6 +35,7 @@
           {HttpServletResponse.SC_ACCEPTED, true},
           {HttpServletResponse.SC_NO_CONTENT, true},
           {HttpServletResponse.SC_BAD_REQUEST, false},
+          {HttpServletResponse.SC_NOT_FOUND, false},
           {HttpServletResponse.SC_CONFLICT, false}
         });
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandlerTest.java
deleted file mode 100644
index 81a4fc0..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandlerTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-// Copyright (C) 2021 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication.pull.event;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import com.google.gerrit.entities.Change;
-import com.google.gerrit.entities.Project;
-import com.google.gerrit.server.index.change.ChangeIndexer;
-import com.googlesource.gerrit.plugins.replication.pull.Context;
-import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
-import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
-import org.eclipse.jgit.lib.RefUpdate;
-import org.eclipse.jgit.transport.URIish;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FetchRefReplicatedEventHandlerTest {
-  private ChangeIndexer changeIndexerMock;
-  private FetchRefReplicatedEventHandler fetchRefReplicatedEventHandler;
-  private static URIish sourceUri;
-
-  @Before
-  public void setUp() throws Exception {
-    changeIndexerMock = mock(ChangeIndexer.class);
-    fetchRefReplicatedEventHandler = new FetchRefReplicatedEventHandler(changeIndexerMock);
-    sourceUri = new URIish("git://aSourceNode/testProject.git");
-  }
-
-  @Test
-  public void onEventShouldIndexExistingChange() {
-    Project.NameKey projectNameKey = Project.nameKey("testProject");
-    String ref = "refs/changes/41/41/meta";
-    Change.Id changeId = Change.Id.fromRef(ref);
-    try {
-      Context.setLocalEvent(true);
-      fetchRefReplicatedEventHandler.onEvent(
-          new FetchRefReplicatedEvent(
-              projectNameKey.get(),
-              ref,
-              sourceUri,
-              ReplicationState.RefFetchResult.SUCCEEDED,
-              RefUpdate.Result.FAST_FORWARD));
-      verify(changeIndexerMock, times(1)).index(eq(projectNameKey), eq(changeId));
-    } finally {
-      Context.unsetLocalEvent();
-    }
-  }
-
-  @Test
-  public void onEventShouldNotIndexIfNotLocalEvent() {
-    Project.NameKey projectNameKey = Project.nameKey("testProject");
-    String ref = "refs/changes/41/41/meta";
-    Change.Id changeId = Change.Id.fromRef(ref);
-    fetchRefReplicatedEventHandler.onEvent(
-        new FetchRefReplicatedEvent(
-            projectNameKey.get(),
-            ref,
-            sourceUri,
-            ReplicationState.RefFetchResult.SUCCEEDED,
-            RefUpdate.Result.FAST_FORWARD));
-    verify(changeIndexerMock, never()).index(eq(projectNameKey), eq(changeId));
-  }
-
-  @Test
-  public void onEventShouldIndexOnlyMetaRef() {
-    Project.NameKey projectNameKey = Project.nameKey("testProject");
-    String ref = "refs/changes/41/41/1";
-    Change.Id changeId = Change.Id.fromRef(ref);
-    fetchRefReplicatedEventHandler.onEvent(
-        new FetchRefReplicatedEvent(
-            projectNameKey.get(),
-            ref,
-            sourceUri,
-            ReplicationState.RefFetchResult.SUCCEEDED,
-            RefUpdate.Result.FAST_FORWARD));
-    verify(changeIndexerMock, never()).index(eq(projectNameKey), eq(changeId));
-  }
-
-  @Test
-  public void onEventShouldNotIndexMissingChange() {
-    fetchRefReplicatedEventHandler.onEvent(
-        new FetchRefReplicatedEvent(
-            Project.nameKey("testProject").get(),
-            "invalidRef",
-            sourceUri,
-            ReplicationState.RefFetchResult.SUCCEEDED,
-            RefUpdate.Result.FAST_FORWARD));
-    verify(changeIndexerMock, never()).index(any(), any());
-  }
-
-  @Test
-  public void onEventShouldNotIndexFailingChange() {
-    Project.NameKey projectNameKey = Project.nameKey("testProject");
-    String ref = "refs/changes/41/41/meta";
-    fetchRefReplicatedEventHandler.onEvent(
-        new FetchRefReplicatedEvent(
-            projectNameKey.get(),
-            ref,
-            sourceUri,
-            ReplicationState.RefFetchResult.FAILED,
-            RefUpdate.Result.FAST_FORWARD));
-    verify(changeIndexerMock, never()).index(any(), any());
-  }
-
-  @Test
-  public void onEventShouldNotIndexNotAttemptedChange() {
-    Project.NameKey projectNameKey = Project.nameKey("testProject");
-    String ref = "refs/changes/41/41/meta";
-    fetchRefReplicatedEventHandler.onEvent(
-        new FetchRefReplicatedEvent(
-            projectNameKey.get(),
-            ref,
-            sourceUri,
-            ReplicationState.RefFetchResult.NOT_ATTEMPTED,
-            RefUpdate.Result.FAST_FORWARD));
-    verify(changeIndexerMock, never()).index(any(), any());
-  }
-}