Merge "Use `batch-ref-updated` stream event to trigger apply-object/fetch"
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index 13484b1..bcbd73d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -26,6 +26,9 @@
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.metrics.Timer1.Context;
 import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.config.GerritServerConfig;
+import com.google.gerrit.server.data.RefUpdateAttribute;
+import com.google.gerrit.server.events.BatchRefUpdateEvent;
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gerrit.server.events.RefUpdatedEvent;
@@ -59,6 +62,7 @@
 import org.eclipse.jgit.errors.InvalidObjectIdException;
 import org.eclipse.jgit.errors.MissingObjectException;
 import org.eclipse.jgit.errors.RepositoryNotFoundException;
+import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.transport.URIish;
 import org.slf4j.Logger;
@@ -75,6 +79,7 @@
   static final Logger repLog = LoggerFactory.getLogger(PULL_REPLICATION_LOG_NAME);
 
   private static final Integer DEFAULT_FETCH_CALLS_TIMEOUT = 0;
+  private static final String BATCH_REF_UPDATED_EVENT_TYPE = BatchRefUpdateEvent.TYPE;
   private static final String REF_UDPATED_EVENT_TYPE = new RefUpdatedEvent().type;
   private static final String ZEROS_OBJECTID = ObjectId.zeroId().getName();
   private final ReplicationStateListener stateLog;
@@ -92,6 +97,7 @@
   private final ApplyObjectMetrics applyObjectMetrics;
   private final FetchReplicationMetrics fetchMetrics;
   private final String instanceId;
+  private final boolean useBatchUpdateEvents;
   private ApplyObjectsRefsFilter applyObjectsRefsFilter;
 
   @Inject
@@ -106,6 +112,7 @@
       ApplyObjectMetrics applyObjectMetrics,
       FetchReplicationMetrics fetchMetrics,
       @GerritInstanceId String instanceId,
+      @GerritServerConfig Config gerritConfig,
       ApplyObjectsRefsFilter applyObjectsRefsFilter) {
     workQueue = wq;
     dispatcher = dis;
@@ -118,6 +125,8 @@
     this.applyObjectMetrics = applyObjectMetrics;
     this.fetchMetrics = fetchMetrics;
     this.instanceId = instanceId;
+    this.useBatchUpdateEvents =
+        gerritConfig.getBoolean("event", "stream-events", "enableBatchRefUpdatedEvents", false);
     this.applyObjectsRefsFilter = applyObjectsRefsFilter;
   }
 
@@ -158,22 +167,46 @@
 
   @Override
   public void onEvent(com.google.gerrit.server.events.Event e) {
-    if (e.type.equals(REF_UDPATED_EVENT_TYPE) && instanceId.equals(e.instanceId)) {
+    if (!instanceId.equals(e.instanceId)) {
+      return;
+    }
+
+    if (useBatchUpdateEvents) {
+      if (e.type.equals(BATCH_REF_UPDATED_EVENT_TYPE)) {
+        BatchRefUpdateEvent event = (BatchRefUpdateEvent) e;
+
+        event.refUpdates.get().stream()
+            .sorted(ReplicationQueue::sortByMetaRefAsLast)
+            .forEachOrdered(
+                refUpdateAttribute -> {
+                  if (isRefToBeReplicated(refUpdateAttribute.refName)) {
+                    fireRefUpdate(refUpdateAttribute, e.eventCreatedOn);
+                  }
+                });
+      }
+      return;
+    }
+
+    if (e.type.equals(REF_UDPATED_EVENT_TYPE)) {
       RefUpdatedEvent event = (RefUpdatedEvent) e;
 
       if (isRefToBeReplicated(event.getRefName())) {
-        repLog.info(
-            "Ref event received: {} on project {}:{} - {} => {}",
-            refUpdateType(event),
-            event.refUpdate.get().project,
-            event.getRefName(),
-            event.refUpdate.get().oldRev,
-            event.refUpdate.get().newRev);
-        fire(ReferenceUpdatedEvent.from(event));
+        fireRefUpdate(event.refUpdate.get(), event.eventCreatedOn);
       }
     }
   }
 
+  private void fireRefUpdate(RefUpdateAttribute refUpdate, long eventCreatedOn) {
+    repLog.info(
+        "Ref event received: {} on project {}:{} - {} => {}",
+        refUpdateType(refUpdate.oldRev, refUpdate.newRev),
+        refUpdate.project,
+        refUpdate.refName,
+        refUpdate.oldRev,
+        refUpdate.newRev);
+    fire(ReferenceUpdatedEvent.from(refUpdate, eventCreatedOn));
+  }
+
   @Override
   public void onProjectDeleted(ProjectDeletedListener.Event event) {
     Project.NameKey project = Project.nameKey(event.getProjectName());
@@ -184,10 +217,16 @@
                 source.getApis().forEach(apiUrl -> source.scheduleDeleteProject(apiUrl, project)));
   }
 
-  private static String refUpdateType(RefUpdatedEvent event) {
-    if (ZEROS_OBJECTID.equals(event.refUpdate.get().oldRev)) {
+  private static int sortByMetaRefAsLast(RefUpdateAttribute a, RefUpdateAttribute b) {
+    repLog.debug("sortByMetaRefAsLast({} <=> {})", a.refName, b.refName);
+    return Boolean.compare(
+        RefNames.isNoteDbMetaRef(a.refName), RefNames.isNoteDbMetaRef(b.refName));
+  }
+
+  private static String refUpdateType(String oldRev, String newRev) {
+    if (ZEROS_OBJECTID.equals(oldRev)) {
       return "CREATE";
-    } else if (ZEROS_OBJECTID.equals(event.refUpdate.get().newRev)) {
+    } else if (ZEROS_OBJECTID.equals(newRev)) {
       return "DELETE";
     } else {
       return "UPDATE";
@@ -550,13 +589,13 @@
           projectName, refName, objectId, eventCreatedOn, isDelete);
     }
 
-    static ReferenceUpdatedEvent from(RefUpdatedEvent event) {
+    static ReferenceUpdatedEvent from(RefUpdateAttribute refUpdateAttribute, long eventCreatedOn) {
       return ReferenceUpdatedEvent.create(
-          event.refUpdate.get().project,
-          event.getRefName(),
-          ObjectId.fromString(event.refUpdate.get().newRev),
-          event.eventCreatedOn,
-          ZEROS_OBJECTID.equals(event.refUpdate.get().newRev));
+          refUpdateAttribute.project,
+          refUpdateAttribute.refName,
+          ObjectId.fromString(refUpdateAttribute.newRev),
+          eventCreatedOn,
+          ZEROS_OBJECTID.equals(refUpdateAttribute.newRev));
     }
 
     public abstract String projectName();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FakeGitReferenceUpdatedEvent.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FakeGitReferenceUpdatedEvent.java
deleted file mode 100644
index 1472be2..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FakeGitReferenceUpdatedEvent.java
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright (C) 2020 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication.pull;
-
-import com.google.common.base.Suppliers;
-import com.google.gerrit.entities.Project;
-import com.google.gerrit.server.data.RefUpdateAttribute;
-import com.google.gerrit.server.events.RefUpdatedEvent;
-
-public class FakeGitReferenceUpdatedEvent extends RefUpdatedEvent {
-  FakeGitReferenceUpdatedEvent(
-      Project.NameKey project,
-      String ref,
-      String oldObjectId,
-      String newObjectId,
-      String instanceId) {
-    RefUpdateAttribute upd = new RefUpdateAttribute();
-    upd.newRev = newObjectId;
-    upd.oldRev = oldObjectId;
-    upd.project = project.get();
-    upd.refName = ref;
-    this.refUpdate = Suppliers.ofInstance(upd);
-    this.instanceId = instanceId;
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedAsyncIT.java
similarity index 90%
copy from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
copy to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedAsyncIT.java
index be017d0..139ddec 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedAsyncIT.java
@@ -28,10 +28,15 @@
     name = "pull-replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationAsyncIT extends PullReplicationITAbstract {
+public class PullReplicationBatchRefUpdatedAsyncIT extends PullReplicationITAbstract {
   @Inject private SitePaths sitePaths;
 
   @Override
+  protected boolean useBatchRefUpdateEvent() {
+    return true;
+  }
+
+  @Override
   public void setUpTestPlugin() throws Exception {
     FileBasedConfig config =
         new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedIT.java
similarity index 62%
copy from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
copy to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedIT.java
index be017d0..2e727d1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationBatchRefUpdatedIT.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2022 The Android Open Source Project
+// Copyright (C) 2023 The Android Open Source Project
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -17,10 +17,6 @@
 import com.google.gerrit.acceptance.SkipProjectClone;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.server.config.SitePaths;
-import com.google.inject.Inject;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.util.FS;
 
 @SkipProjectClone
 @UseLocalDisk
@@ -28,16 +24,10 @@
     name = "pull-replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationAsyncIT extends PullReplicationITAbstract {
-  @Inject private SitePaths sitePaths;
+public class PullReplicationBatchRefUpdatedIT extends PullReplicationITBase {
 
   @Override
-  public void setUpTestPlugin() throws Exception {
-    FileBasedConfig config =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
-    config.setString("replication", null, "syncRefs", "^$");
-    config.save();
-
-    super.setUpTestPlugin(true);
+  protected boolean useBatchRefUpdateEvent() {
+    return true;
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBase.java
similarity index 74%
rename from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
rename to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBase.java
index 4ede1ae..4349cde 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBase.java
@@ -17,27 +17,16 @@
 import static com.google.common.truth.Truth.assertThat;
 import static java.util.stream.Collectors.toList;
 
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.PushOneCommit.Result;
-import com.google.gerrit.acceptance.SkipProjectClone;
-import com.google.gerrit.acceptance.TestPlugin;
-import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.config.GerritConfig;
-import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
-import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.api.projects.BranchInput;
-import com.google.gerrit.server.config.SitePaths;
-import com.google.inject.Inject;
+import com.google.gerrit.server.events.ProjectEvent;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
 import java.io.IOException;
 import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
-import java.util.function.Supplier;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.Repository;
@@ -46,33 +35,14 @@
 import org.eclipse.jgit.util.FS;
 import org.junit.Test;
 
-@SkipProjectClone
-@UseLocalDisk
-@TestPlugin(
-    name = "pull-replication",
-    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
-    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationFanoutConfigIT extends LightweightPluginDaemonTest {
-  private static final Optional<String> ALL_PROJECTS = Optional.empty();
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+abstract class PullReplicationFanoutConfigBase extends PullReplicationSetupBase {
   private static final int TEST_REPLICATION_DELAY = 60;
-  private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2);
-  private static final String TEST_REPLICATION_SUFFIX = "suffix1";
-  private static final String TEST_REPLICATION_REMOTE = "remote1";
 
-  @Inject private SitePaths sitePaths;
-  @Inject private ProjectOperations projectOperations;
-  private Path gitPath;
-  private FileBasedConfig config;
   private FileBasedConfig remoteConfig;
-  private FileBasedConfig secureConfig;
 
   @Override
   public void setUpTestPlugin() throws Exception {
     gitPath = sitePaths.site_path.resolve("git");
-
-    config =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
     remoteConfig =
         new FileBasedConfig(
             sitePaths
@@ -81,17 +51,8 @@
                 .toFile(),
             FS.DETECTED);
 
-    setReplicationSource(
-        TEST_REPLICATION_REMOTE); // Simulates a full replication.config initialization
-
     setRemoteConfig(TEST_REPLICATION_SUFFIX, ALL_PROJECTS);
-
-    secureConfig =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("secure.config").toFile(), FS.DETECTED);
-    setReplicationCredentials(TEST_REPLICATION_REMOTE, admin.username(), admin.httpPassword());
-    secureConfig.save();
-
-    super.setUpTestPlugin();
+    super.setUpTestPlugin(false);
   }
 
   @Test
@@ -104,8 +65,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
@@ -140,8 +101,8 @@
     RevCommit sourceCommit = pushResult.getCommit();
     final String sourceRef = pushResult.getPatchSet().refName();
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
@@ -173,8 +134,8 @@
 
     ReplicationQueue pullReplicationQueue =
         plugin.getSysInjector().getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             newBranch,
             ObjectId.zeroId().getName(),
@@ -251,17 +212,11 @@
     waitUntil(() -> sources.getAll().size() == 1);
   }
 
-  private Ref getRef(Repository repo, String branchName) throws IOException {
-    return repo.getRefDatabase().exactRef(branchName);
-  }
-
-  private Ref checkedGetRef(Repository repo, String branchName) {
-    try {
-      return repo.getRefDatabase().exactRef(branchName);
-    } catch (Exception e) {
-      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
-      return null;
-    }
+  @Override
+  protected void setReplicationSource(
+      String remoteName, List<String> replicaSuffixes, Optional<String> project)
+      throws IOException {
+    setReplicationSource(remoteName);
   }
 
   private void setReplicationSource(String remoteName) throws IOException {
@@ -294,23 +249,4 @@
     project.ifPresent(prj -> remoteConfig.setString("remote", null, "projects", prj));
     remoteConfig.save();
   }
-
-  private void setReplicationCredentials(String remoteName, String username, String password)
-      throws IOException {
-    secureConfig.setString("remote", remoteName, "username", username);
-    secureConfig.setString("remote", remoteName, "password", password);
-    secureConfig.save();
-  }
-
-  private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
-    WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
-  }
-
-  private <T> T getInstance(Class<T> classObj) {
-    return plugin.getSysInjector().getInstance(classObj);
-  }
-
-  private Project.NameKey createTestProject(String name) throws Exception {
-    return projectOperations.newProject().name(name).create();
-  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBatchRefUpdateEventIT.java
similarity index 62%
copy from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
copy to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBatchRefUpdateEventIT.java
index be017d0..7babf34 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigBatchRefUpdateEventIT.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2022 The Android Open Source Project
+// Copyright (C) 2023 The Android Open Source Project
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -11,16 +11,11 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
-
 package com.googlesource.gerrit.plugins.replication.pull;
 
 import com.google.gerrit.acceptance.SkipProjectClone;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.server.config.SitePaths;
-import com.google.inject.Inject;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.util.FS;
 
 @SkipProjectClone
 @UseLocalDisk
@@ -28,16 +23,11 @@
     name = "pull-replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationAsyncIT extends PullReplicationITAbstract {
-  @Inject private SitePaths sitePaths;
+public class PullReplicationFanoutConfigBatchRefUpdateEventIT
+    extends PullReplicationFanoutConfigBase {
 
   @Override
-  public void setUpTestPlugin() throws Exception {
-    FileBasedConfig config =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
-    config.setString("replication", null, "syncRefs", "^$");
-    config.save();
-
-    super.setUpTestPlugin(true);
+  protected boolean useBatchRefUpdateEvent() {
+    return true;
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigRefUpdatedEventIT.java
similarity index 62%
copy from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
copy to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigRefUpdatedEventIT.java
index be017d0..16b0b02 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigRefUpdatedEventIT.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2022 The Android Open Source Project
+// Copyright (C) 2023 The Android Open Source Project
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -11,16 +11,11 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
-
 package com.googlesource.gerrit.plugins.replication.pull;
 
 import com.google.gerrit.acceptance.SkipProjectClone;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.server.config.SitePaths;
-import com.google.inject.Inject;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.util.FS;
 
 @SkipProjectClone
 @UseLocalDisk
@@ -28,16 +23,10 @@
     name = "pull-replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationAsyncIT extends PullReplicationITAbstract {
-  @Inject private SitePaths sitePaths;
+public class PullReplicationFanoutConfigRefUpdatedEventIT extends PullReplicationFanoutConfigBase {
 
   @Override
-  public void setUpTestPlugin() throws Exception {
-    FileBasedConfig config =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
-    config.setString("replication", null, "syncRefs", "^$");
-    config.save();
-
-    super.setUpTestPlugin(true);
+  protected boolean useBatchRefUpdateEvent() {
+    return false;
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
index ba812e2..50aa1b2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
@@ -31,6 +31,7 @@
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.server.events.ProjectEvent;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
 import java.io.IOException;
 import java.util.Collection;
@@ -83,8 +84,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
@@ -116,8 +117,8 @@
 
     ReplicationQueue pullReplicationQueue =
         plugin.getSysInjector().getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             newBranch,
             ObjectId.zeroId().getName(),
@@ -159,8 +160,8 @@
 
     ReplicationQueue pullReplicationQueue =
         plugin.getSysInjector().getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             newBranch,
             ObjectId.zeroId().getName(),
@@ -185,8 +186,8 @@
     assertThat(pushedRefs).hasSize(1);
     assertThat(pushedRefs.iterator().next().getStatus()).isEqualTo(Status.OK);
 
-    FakeGitReferenceUpdatedEvent forcedPushEvent =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent forcedPushEvent =
+        generateUpdateEvent(
             project,
             newBranch,
             branchRevision,
@@ -224,8 +225,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
@@ -265,8 +266,8 @@
 
     ReplicationQueue pullReplicationQueue =
         plugin.getSysInjector().getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             newBranch,
             ObjectId.zeroId().getName(),
@@ -357,8 +358,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java
similarity index 93%
rename from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
rename to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java
index 5d4aeb4..e76faf5 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITBase.java
@@ -21,8 +21,6 @@
 import static com.google.gerrit.server.group.SystemGroupBackend.REGISTERED_USERS;
 
 import com.google.gerrit.acceptance.PushOneCommit.Result;
-import com.google.gerrit.acceptance.SkipProjectClone;
-import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.config.GerritConfig;
 import com.google.gerrit.entities.Permission;
@@ -33,6 +31,7 @@
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.server.events.ProjectEvent;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
 import java.io.IOException;
 import java.util.Collection;
@@ -51,13 +50,7 @@
 import org.junit.Ignore;
 import org.junit.Test;
 
-@SkipProjectClone
-@UseLocalDisk
-@TestPlugin(
-    name = "pull-replication",
-    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
-    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationIT extends PullReplicationSetupBase {
+abstract class PullReplicationITBase extends PullReplicationSetupBase {
 
   @Override
   protected void setReplicationSource(
@@ -90,8 +83,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
@@ -123,8 +116,8 @@
 
     ReplicationQueue pullReplicationQueue =
         plugin.getSysInjector().getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             newBranch,
             ObjectId.zeroId().getName(),
@@ -166,8 +159,8 @@
 
     ReplicationQueue pullReplicationQueue =
         plugin.getSysInjector().getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             newBranch,
             ObjectId.zeroId().getName(),
@@ -192,8 +185,8 @@
     assertThat(pushedRefs).hasSize(1);
     assertThat(pushedRefs.iterator().next().getStatus()).isEqualTo(Status.OK);
 
-    FakeGitReferenceUpdatedEvent forcedPushEvent =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent forcedPushEvent =
+        generateUpdateEvent(
             project,
             newBranch,
             branchRevision,
@@ -231,8 +224,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
@@ -272,8 +265,8 @@
 
     ReplicationQueue pullReplicationQueue =
         plugin.getSysInjector().getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             newBranch,
             ObjectId.zeroId().getName(),
@@ -363,8 +356,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedAsyncIT.java
similarity index 90%
rename from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
rename to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedAsyncIT.java
index be017d0..2ca7538 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedAsyncIT.java
@@ -28,10 +28,15 @@
     name = "pull-replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationAsyncIT extends PullReplicationITAbstract {
+public class PullReplicationRefUpdatedAsyncIT extends PullReplicationITAbstract {
   @Inject private SitePaths sitePaths;
 
   @Override
+  protected boolean useBatchRefUpdateEvent() {
+    return false;
+  }
+
+  @Override
   public void setUpTestPlugin() throws Exception {
     FileBasedConfig config =
         new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedIT.java
similarity index 62%
copy from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
copy to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedIT.java
index be017d0..6e7c369 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationRefUpdatedIT.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2022 The Android Open Source Project
+// Copyright (C) 2023 The Android Open Source Project
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -17,10 +17,6 @@
 import com.google.gerrit.acceptance.SkipProjectClone;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.server.config.SitePaths;
-import com.google.inject.Inject;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.util.FS;
 
 @SkipProjectClone
 @UseLocalDisk
@@ -28,16 +24,10 @@
     name = "pull-replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationAsyncIT extends PullReplicationITAbstract {
-  @Inject private SitePaths sitePaths;
+public class PullReplicationRefUpdatedIT extends PullReplicationITBase {
 
   @Override
-  public void setUpTestPlugin() throws Exception {
-    FileBasedConfig config =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
-    config.setString("replication", null, "syncRefs", "^$");
-    config.save();
-
-    super.setUpTestPlugin(true);
+  protected boolean useBatchRefUpdateEvent() {
+    return false;
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
index e07d481..1ef4d35 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
@@ -16,13 +16,20 @@
 
 import static java.util.stream.Collectors.toList;
 
+import com.google.common.base.Suppliers;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.Project.NameKey;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.data.AccountAttribute;
+import com.google.gerrit.server.data.RefUpdateAttribute;
+import com.google.gerrit.server.events.BatchRefUpdateEvent;
+import com.google.gerrit.server.events.ProjectEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
 import com.google.inject.Inject;
 import java.io.File;
 import java.io.IOException;
@@ -54,6 +61,22 @@
   protected FileBasedConfig config;
   protected FileBasedConfig secureConfig;
 
+  protected abstract boolean useBatchRefUpdateEvent();
+
+  protected ProjectEvent generateUpdateEvent(
+      Project.NameKey project,
+      String ref,
+      String oldObjectId,
+      String newObjectId,
+      String instanceId) {
+
+    if (useBatchRefUpdateEvent()) {
+      return generateBatchRefUpdateEvent(project, ref, oldObjectId, newObjectId, instanceId);
+    }
+
+    return generateRefUpdateEvent(project, ref, oldObjectId, newObjectId, instanceId);
+  }
+
   protected void setUpTestPlugin(boolean loadExisting) throws Exception {
     gitPath = sitePaths.site_path.resolve("git");
 
@@ -73,6 +96,9 @@
     setReplicationCredentials(TEST_REPLICATION_REMOTE, admin.username(), admin.httpPassword());
     secureConfig.save();
 
+    cfg.setBoolean(
+        "event", "stream-events", "enableBatchRefUpdatedEvents", useBatchRefUpdateEvent());
+
     super.setUpTestPlugin();
   }
 
@@ -121,4 +147,38 @@
       List<String> replicaSuffixes, Function<String, String> toURL) {
     return replicaSuffixes.stream().map(suffix -> toURL.apply(suffix)).collect(toList());
   }
+
+  private BatchRefUpdateEvent generateBatchRefUpdateEvent(
+      Project.NameKey project,
+      String ref,
+      String oldObjectId,
+      String newObjectId,
+      String instanceId) {
+    RefUpdateAttribute upd = new RefUpdateAttribute();
+    upd.newRev = newObjectId;
+    upd.oldRev = oldObjectId;
+    upd.project = project.get();
+    upd.refName = ref;
+    BatchRefUpdateEvent event =
+        new BatchRefUpdateEvent(
+            project,
+            Suppliers.ofInstance(List.of(upd)),
+            Suppliers.ofInstance(new AccountAttribute(admin.id().get())));
+    event.instanceId = instanceId;
+    return event;
+  }
+
+  private ProjectEvent generateRefUpdateEvent(
+      NameKey project, String ref, String oldObjectId, String newObjectId, String instanceId) {
+    RefUpdateAttribute upd = new RefUpdateAttribute();
+    upd.newRev = newObjectId;
+    upd.oldRev = oldObjectId;
+    upd.project = project.get();
+    upd.refName = ref;
+    RefUpdatedEvent event = new RefUpdatedEvent();
+    event.refUpdate = Suppliers.ofInstance(upd);
+    event.submitter = Suppliers.ofInstance(new AccountAttribute(admin.id().get()));
+    event.instanceId = instanceId;
+    return event;
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java
similarity index 94%
rename from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolIT.java
rename to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java
index e55e383..2e95ef1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBase.java
@@ -21,6 +21,7 @@
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.config.GerritConfig;
+import com.google.gerrit.server.events.ProjectEvent;
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
@@ -36,7 +37,8 @@
     name = "pull-replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationWithGitHttpTransportProtocolIT extends PullReplicationSetupBase {
+public abstract class PullReplicationWithGitHttpTransportProtocolBase
+    extends PullReplicationSetupBase {
 
   @Override
   protected void setReplicationSource(
@@ -80,8 +82,8 @@
     String sourceRef = pushResult.getPatchSet().refName();
 
     ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
-    FakeGitReferenceUpdatedEvent event =
-        new FakeGitReferenceUpdatedEvent(
+    ProjectEvent event =
+        generateUpdateEvent(
             project,
             sourceRef,
             ObjectId.zeroId().getName(),
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBatchRefUpdatedIT.java
similarity index 62%
copy from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
copy to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBatchRefUpdatedIT.java
index be017d0..8c8ca37 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolBatchRefUpdatedIT.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2022 The Android Open Source Project
+// Copyright (C) 2023 The Android Open Source Project
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -17,10 +17,6 @@
 import com.google.gerrit.acceptance.SkipProjectClone;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.server.config.SitePaths;
-import com.google.inject.Inject;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.util.FS;
 
 @SkipProjectClone
 @UseLocalDisk
@@ -28,16 +24,11 @@
     name = "pull-replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationAsyncIT extends PullReplicationITAbstract {
-  @Inject private SitePaths sitePaths;
+public class PullReplicationWithGitHttpTransportProtocolBatchRefUpdatedIT
+    extends PullReplicationWithGitHttpTransportProtocolBase {
 
   @Override
-  public void setUpTestPlugin() throws Exception {
-    FileBasedConfig config =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
-    config.setString("replication", null, "syncRefs", "^$");
-    config.save();
-
-    super.setUpTestPlugin(true);
+  protected boolean useBatchRefUpdateEvent() {
+    return true;
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolRefUpdatedIT.java
similarity index 62%
copy from src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
copy to src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolRefUpdatedIT.java
index be017d0..5f3c7b6 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationAsyncIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolRefUpdatedIT.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2022 The Android Open Source Project
+// Copyright (C) 2023 The Android Open Source Project
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -17,10 +17,6 @@
 import com.google.gerrit.acceptance.SkipProjectClone;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.server.config.SitePaths;
-import com.google.inject.Inject;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.util.FS;
 
 @SkipProjectClone
 @UseLocalDisk
@@ -28,16 +24,11 @@
     name = "pull-replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationAsyncIT extends PullReplicationITAbstract {
-  @Inject private SitePaths sitePaths;
+public class PullReplicationWithGitHttpTransportProtocolRefUpdatedIT
+    extends PullReplicationWithGitHttpTransportProtocolBase {
 
   @Override
-  public void setUpTestPlugin() throws Exception {
-    FileBasedConfig config =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
-    config.setString("replication", null, "syncRefs", "^$");
-    config.save();
-
-    super.setUpTestPlugin(true);
+  protected boolean useBatchRefUpdateEvent() {
+    return false;
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
index 15e2c1b..4f853a2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -22,6 +22,7 @@
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -34,12 +35,13 @@
 import com.google.common.collect.Lists;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.api.changes.NotifyHandling;
-import com.google.gerrit.extensions.common.AccountInfo;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.metrics.DisabledMetricMaker;
 import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.data.AccountAttribute;
 import com.google.gerrit.server.data.RefUpdateAttribute;
+import com.google.gerrit.server.events.BatchRefUpdateEvent;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.events.RefUpdatedEvent;
@@ -59,7 +61,9 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 import org.eclipse.jgit.errors.LargeObjectException;
+import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.util.FS;
@@ -68,6 +72,7 @@
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
@@ -78,6 +83,10 @@
   private static final String FOREIGN_INSTANCE_ID = "any other instance id";
   private static final String TEST_REF_NAME = "refs/meta/heads/anyref";
 
+  private static final Project.NameKey PROJECT = Project.nameKey("defaultProject");
+  private static final String NEW_OBJECT_ID =
+      ObjectId.fromString("3c1ddc050d7906adb0e29bc3bc46af8749b2f63b").getName();
+
   @Mock private WorkQueue wq;
   @Mock private Source source;
   @Mock private SourcesCollection sourceCollection;
@@ -86,7 +95,7 @@
   @Mock ReplicationStateListeners sl;
   @Mock FetchRestApiClient fetchRestApiClient;
   @Mock FetchApiClient.Factory fetchClientFactory;
-  @Mock AccountInfo accountInfo;
+  @Mock AccountAttribute accountAttribute;
   @Mock RevisionReader revReader;
   @Mock RevisionData revisionData;
   @Mock HttpResult successfulHttpResult;
@@ -95,6 +104,8 @@
   List<ObjectId> revisionDataParentObjectIds;
   @Mock HttpResult httpResult;
   @Mock ApplyObjectsRefsFilter applyObjectsRefsFilter;
+
+  @Mock Config config;
   ApplyObjectMetrics applyObjectMetrics;
   FetchReplicationMetrics fetchMetrics;
 
@@ -121,6 +132,8 @@
     when(source.getApis()).thenReturn(apis);
     when(sourceCollection.getAll()).thenReturn(Lists.newArrayList(source));
     when(rd.get()).thenReturn(sourceCollection);
+    when(config.getBoolean("event", "stream-events", "enableBatchRefUpdatedEvents", false))
+        .thenReturn(true);
     lenient()
         .when(revReader.read(any(), any(), anyString(), eq(0)))
         .thenReturn(Optional.of(revisionData));
@@ -169,11 +182,39 @@
             applyObjectMetrics,
             fetchMetrics,
             LOCAL_INSTANCE_ID,
+            config,
             applyObjectsRefsFilter);
   }
 
   @Test
   public void shouldCallSendObjectWhenMetaRef() throws IOException {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta");
+    objectUnderTest.start();
+    objectUnderTest.onEvent(event);
+
+    verify(fetchRestApiClient).callSendObjects(any(), anyString(), anyLong(), any(), any());
+  }
+
+  @Test
+  public void shouldCallSendObjectWhenMetaRefAndRefUpdateEvent() throws IOException {
+    when(config.getBoolean("event", "stream-events", "enableBatchRefUpdatedEvents", false))
+        .thenReturn(false);
+
+    objectUnderTest =
+        new ReplicationQueue(
+            wq,
+            rd,
+            dis,
+            sl,
+            fetchClientFactory,
+            refsFilter,
+            () -> revReader,
+            applyObjectMetrics,
+            fetchMetrics,
+            LOCAL_INSTANCE_ID,
+            config,
+            applyObjectsRefsFilter);
+
     Event event = new TestEvent("refs/changes/01/1/meta");
     objectUnderTest.start();
     objectUnderTest.onEvent(event);
@@ -183,7 +224,7 @@
 
   @Test
   public void shouldIgnoreEventWhenIsNotLocalInstanceId() throws IOException {
-    Event event = new TestEvent();
+    Event event = generateBatchRefUpdateEvent(TEST_REF_NAME);
     event.instanceId = FOREIGN_INSTANCE_ID;
     objectUnderTest.start();
     objectUnderTest.onEvent(event);
@@ -194,7 +235,7 @@
 
   @Test
   public void shouldCallInitProjectWhenProjectIsMissing() throws IOException {
-    Event event = new TestEvent("refs/changes/01/1/meta");
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta");
     when(httpResult.isSuccessful()).thenReturn(false);
     when(httpResult.isProjectMissing(any())).thenReturn(true);
     when(source.isCreateMissingRepositories()).thenReturn(true);
@@ -206,8 +247,24 @@
   }
 
   @Test
+  public void shouldCallSendObjectReorderingRefsHavingMetaAtTheEnd() throws IOException {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta", "refs/changes/01/1/1");
+    objectUnderTest.start();
+    objectUnderTest.onEvent(event);
+    verifySendObjectOrdering("refs/changes/01/1/1", "refs/changes/01/1/meta");
+  }
+
+  @Test
+  public void shouldCallSendObjectKeepingMetaAtTheEnd() throws IOException {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1", "refs/changes/01/1/meta");
+    objectUnderTest.start();
+    objectUnderTest.onEvent(event);
+    verifySendObjectOrdering("refs/changes/01/1/1", "refs/changes/01/1/meta");
+  }
+
+  @Test
   public void shouldNotCallInitProjectWhenReplicateNewRepositoriesNotSet() throws IOException {
-    Event event = new TestEvent("refs/changes/01/1/meta");
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta");
     when(httpResult.isSuccessful()).thenReturn(false);
     when(httpResult.isProjectMissing(any())).thenReturn(true);
     when(source.isCreateMissingRepositories()).thenReturn(false);
@@ -219,7 +276,25 @@
   }
 
   @Test
-  public void shouldCallSendObjectWhenPatchSetRef() throws IOException {
+  public void shouldCallSendObjectWhenPatchSetRefAndRefUpdateEvent() throws IOException {
+    when(config.getBoolean("event", "stream-events", "enableBatchRefUpdatedEvents", false))
+        .thenReturn(false);
+
+    objectUnderTest =
+        new ReplicationQueue(
+            wq,
+            rd,
+            dis,
+            sl,
+            fetchClientFactory,
+            refsFilter,
+            () -> revReader,
+            applyObjectMetrics,
+            fetchMetrics,
+            LOCAL_INSTANCE_ID,
+            config,
+            applyObjectsRefsFilter);
+
     Event event = new TestEvent("refs/changes/01/1/1");
     objectUnderTest.start();
     objectUnderTest.onEvent(event);
@@ -228,9 +303,18 @@
   }
 
   @Test
+  public void shouldCallSendObjectWhenPatchSetRef() throws IOException {
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
+    objectUnderTest.start();
+    objectUnderTest.onEvent(event);
+
+    verify(fetchRestApiClient).callSendObjects(any(), anyString(), anyLong(), any(), any());
+  }
+
+  @Test
   public void shouldFallbackToCallFetchWhenIOException()
       throws IOException, LargeObjectException, RefUpdateException {
-    Event event = new TestEvent("refs/changes/01/1/meta");
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta");
     objectUnderTest.start();
 
     when(revReader.read(any(), any(), anyString(), anyInt())).thenThrow(IOException.class);
@@ -243,7 +327,7 @@
   @Test
   public void shouldFallbackToCallFetchWhenLargeRef()
       throws IOException, LargeObjectException, RefUpdateException {
-    Event event = new TestEvent("refs/changes/01/1/1");
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
     objectUnderTest.start();
 
     when(revReader.read(any(), any(), anyString(), anyInt())).thenReturn(Optional.empty());
@@ -255,7 +339,7 @@
 
   @Test
   public void shouldFallbackToCallFetchWhenParentObjectIsMissing() throws IOException {
-    Event event = new TestEvent("refs/changes/01/1/1");
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
     objectUnderTest.start();
 
     when(httpResult.isSuccessful()).thenReturn(false);
@@ -271,7 +355,7 @@
   @Test
   public void shouldFallbackToApplyAllParentObjectsWhenParentObjectIsMissingOnMetaRef()
       throws IOException {
-    Event event = new TestEvent("refs/changes/01/1/meta");
+    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta");
     objectUnderTest.start();
 
     when(httpResult.isSuccessful()).thenReturn(false, true);
@@ -298,7 +382,7 @@
   public void shouldFallbackToApplyAllParentObjectsWhenParentObjectIsMissingOnAllowedRefs()
       throws IOException {
     String refName = "refs/tags/test-tag";
-    Event event = new TestEvent(refName);
+    Event event = generateBatchRefUpdateEvent(refName);
     objectUnderTest.start();
 
     when(httpResult.isSuccessful()).thenReturn(false, true);
@@ -343,19 +427,20 @@
             applyObjectMetrics,
             fetchMetrics,
             LOCAL_INSTANCE_ID,
+            config,
             applyObjectsRefsFilter);
-    Event event = new TestEvent("refs/multi-site/version");
+    Event event = generateBatchRefUpdateEvent("refs/multi-site/version");
     objectUnderTest.onEvent(event);
 
-    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountAttribute);
   }
 
   @Test
   public void shouldSkipEventWhenStarredChangesRef() {
-    Event event = new TestEvent("refs/starred-changes/41/2941/1000000");
+    Event event = generateBatchRefUpdateEvent("refs/starred-changes/41/2941/1000000");
     objectUnderTest.onEvent(event);
 
-    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountAttribute);
   }
 
   @Test
@@ -418,6 +503,38 @@
     return createTempDirectory(prefix);
   }
 
+  private BatchRefUpdateEvent generateBatchRefUpdateEvent(String... refs) {
+    List<RefUpdateAttribute> refUpdates =
+        Arrays.stream(refs)
+            .map(
+                ref -> {
+                  RefUpdateAttribute upd = new RefUpdateAttribute();
+                  upd.newRev = NEW_OBJECT_ID;
+                  upd.oldRev = ObjectId.zeroId().getName();
+                  upd.project = PROJECT.get();
+                  upd.refName = ref;
+                  return upd;
+                })
+            .collect(Collectors.toList());
+
+    BatchRefUpdateEvent event =
+        new BatchRefUpdateEvent(
+            PROJECT, Suppliers.ofInstance(refUpdates), Suppliers.ofInstance(accountAttribute));
+    event.instanceId = LOCAL_INSTANCE_ID;
+    return event;
+  }
+
+  private void verifySendObjectOrdering(String firstRef, String secondRef) throws IOException {
+    InOrder inOrder = inOrder(fetchRestApiClient);
+
+    inOrder
+        .verify(fetchRestApiClient)
+        .callSendObjects(any(), eq(firstRef), anyLong(), any(), any());
+    inOrder
+        .verify(fetchRestApiClient)
+        .callSendObjects(any(), eq(secondRef), anyLong(), any(), any());
+  }
+
   private class TestEvent extends RefUpdatedEvent {
 
     public TestEvent() {