Merge "Merge branch 'stable-3.3' into stable-3.4" into stable-3.6
diff --git a/DESIGN.md b/DESIGN.md
index 1bbe78f..b114eb3 100644
--- a/DESIGN.md
+++ b/DESIGN.md
@@ -2,7 +2,7 @@
 
 This document collects and organizes thoughts about
 the design of the Gerrit multi-site plugin,  supporting the definition of the
-[implementation roadmap](#next-steps-in-the-road-map).
+[implementation roadmap](#next-steps-in-the-roadmap).
 
 It first presents background for the problems the plugin will address and
 the tools currently available in the Gerrit ecosystem that support the
diff --git a/e2e-tests/test.sh b/e2e-tests/test.sh
index 3ecb9fd..b069565 100755
--- a/e2e-tests/test.sh
+++ b/e2e-tests/test.sh
@@ -16,8 +16,8 @@
 
 LOCATION="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 LOCAL_ENV="$( cd "${LOCATION}/../setup_local_env" >/dev/null 2>&1 && pwd )"
-GERRIT_BRANCH=stable-3.4
-GERRIT_CI=https://archive-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
+GERRIT_BRANCH=stable-3.6
+GERRIT_CI=https://gerrit-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
 LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
 DEF_MULTISITE_LOCATION=${LOCATION}/../../../bazel-bin/plugins/multi-site/multi-site.jar
 DEF_GERRIT_IMAGE=3.4.0-centos8
@@ -25,6 +25,7 @@
 DEF_GERRIT_HEALTHCHECK_INTERVAL=5s
 DEF_GERRIT_HEALTHCHECK_TIMEOUT=5s
 DEF_GERRIT_HEALTHCHECK_RETRIES=5
+COMMON_PLUGINS_LIST="websession-broker healthcheck zookeeper-refdb"
 
 function check_application_requirements {
   type java >/dev/null 2>&1 || { echo >&2 "Require java but it's not installed. Aborting."; exit 1; }
@@ -112,6 +113,19 @@
   return 0
 }
 
+function download_plugin {
+  local PLUGIN_NAME=$1
+
+  echo "Downloading $PLUGIN_NAME plugin $GERRIT_BRANCH onto $TARGET_DIR"
+  wget $GERRIT_CI/plugin-$PLUGIN_NAME-bazel-$GERRIT_BRANCH/$LAST_BUILD/$PLUGIN_NAME/$PLUGIN_NAME.jar \
+    -O $COMMON_PLUGINS/$PLUGIN_NAME.jar || \
+  wget $GERRIT_CI/plugin-$PLUGIN_NAME-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/$PLUGIN_NAME/$PLUGIN_NAME.jar \
+    -O $COMMON_PLUGINS/$PLUGIN_NAME.jar || \
+  { echo >&2 "Cannot download $PLUGIN_NAME plugin: Check internet connection. Aborting"; exit 1; }
+
+  return 0
+}
+
 # Check application requirements
 check_application_requirements
 
@@ -220,27 +234,14 @@
 echo "Downloading common plugins"
 COMMON_PLUGINS=${DEPLOYMENT_LOCATION}/common_plugins
 mkdir -p ${COMMON_PLUGINS}
+for plugin in $COMMON_PLUGINS_LIST; do download_plugin $plugin; done
 
 echo "plugin location[${MULTISITE_LIB_LOCATION}]"
 cp -f $MULTISITE_LIB_LOCATION $COMMON_PLUGINS/multi-site.jar  >/dev/null 2>&1 || \
   { echo >&2 "$MULTISITE_LIB_LOCATION: Not able to copy the file. Aborting"; exit 1; }
 
-echo "Downloading websession-broker plugin $GERRIT_BRANCH"
-wget $GERRIT_CI/plugin-websession-broker-bazel-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
-  -O $COMMON_PLUGINS/websession-broker.jar || { echo >&2 "Cannot download websession-broker plugin: Check internet connection. Aborting"; exit 1; }
-
-echo "Downloading healthcheck plugin $GERRIT_BRANCH"
-wget $GERRIT_CI/plugin-healthcheck-bazel-$GERRIT_BRANCH/$LAST_BUILD/healthcheck/healthcheck.jar \
-  -O $COMMON_PLUGINS/healthcheck.jar || { echo >&2 "Cannot download healthcheck plugin: Check internet connection. Aborting"; exit 1; }
-
-echo "Downloading zookeeper plugin $GERRIT_BRANCH"
-wget $GERRIT_CI/plugin-zookeeper-refdb-bazel-$GERRIT_BRANCH/$LAST_BUILD/zookeeper-refdb/zookeeper-refdb.jar \
-  -O $COMMON_PLUGINS/zookeeper-refdb.jar || { echo >&2 "Cannot download zookeeper plugin: Check internet connection. Aborting"; exit 1; }
-
 if [ "$BROKER_TYPE" = "kafka" ]; then
-  echo "Downloading events-kafka plugin $GERRIT_BRANCH"
-  wget $GERRIT_CI/plugin-events-kafka-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-kafka/events-kafka.jar \
-    -O $COMMON_PLUGINS/events-kafka.jar || { echo >&2 "Cannot download events-kafka plugin: Check internet connection. Aborting"; exit 1; }
+  download_plugin events-kafka $COMMON_PLUGINS
   BROKER_PORT=9092
   BROKER_HOST=kafka
   BROKER_PLUGIN=events-kafka
diff --git a/setup_local_env/configs/gerrit.config b/setup_local_env/configs/gerrit.config
index 8a176f2..20075d7 100644
--- a/setup_local_env/configs/gerrit.config
+++ b/setup_local_env/configs/gerrit.config
@@ -39,8 +39,6 @@
     directory = cache
 [plugins]
     allowRemoteAdmin = true
-[plugin "websession-flatfile"]
-    directory = $FAKE_NFS
 [plugin "events-kafka"]
     sendAsync = true
     bootstrapServers = $BROKER_HOST:$BROKER_PORT
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index 247a721..7697f24 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -16,8 +16,8 @@
 
 
 SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
-GERRIT_BRANCH=stable-3.4
-GERRIT_CI=https://archive-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
+GERRIT_BRANCH=stable-3.6
+GERRIT_CI=https://gerrit-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
 LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
 
 function check_application_requirements {
@@ -224,6 +224,16 @@
   fi
 }
 
+function download_artifact_from_ci {
+  local artifact_name=$1
+  local prefix=${2:-plugin}
+  wget $GERRIT_CI/$prefix-$artifact_name-bazel-$GERRIT_BRANCH/$LAST_BUILD/$artifact_name/$artifact_name.jar \
+  -O $DEPLOYMENT_LOCATION/$artifact_name.jar || \
+  wget $GERRIT_CI/$prefix-$artifact_name-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/$artifact_name/$artifact_name.jar \
+  -O $DEPLOYMENT_LOCATION/$artifact_name.jar || \
+  { echo >&2 "Cannot download $artifact_name $prefix: Check internet connection. Aborting"; exit 1; }
+}
+
 while [ $# -ne 0 ]
 do
 case "$1" in
@@ -429,70 +439,37 @@
 
 if [ $DOWNLOAD_WEBSESSION_PLUGIN = "true" ];then
   echo "Downloading websession-broker plugin $GERRIT_BRANCH"
-  wget $GERRIT_CI/plugin-websession-broker-bazel-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
-  -O $DEPLOYMENT_LOCATION/websession-broker.jar || \
-  wget $GERRIT_CI/plugin-websession-broker-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
-  -O $DEPLOYMENT_LOCATION/websession-broker.jar || \
-  { echo >&2 "Cannot download websession-broker plugin: Check internet connection. Abort\
-ing"; exit 1; }
-  wget $GERRIT_CI/plugin-healthcheck-bazel-$GERRIT_BRANCH/$LAST_BUILD/healthcheck/healthcheck.jar \
-  -O $DEPLOYMENT_LOCATION/healthcheck.jar || { echo >&2 "Cannot download healthcheck plugin: Check internet connection. Abort\
-ing"; exit 1; }
+  download_artifact_from_ci websession-broker
+  download_artifact_from_ci healthcheck
+
 else
   echo "Without the websession-broker; user login via haproxy will fail."
 fi
 
 echo "Downloading zookeeper plugin $GERRIT_BRANCH"
-  wget $GERRIT_CI/plugin-zookeeper-refdb-bazel-$GERRIT_BRANCH/$LAST_BUILD/zookeeper-refdb/zookeeper-refdb.jar \
-  -O $DEPLOYMENT_LOCATION/zookeeper-refdb.jar || \
-  wget $GERRIT_CI/plugin-zookeeper-refdb-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/zookeeper-refdb/zookeeper-refdb.jar \
-  -O $DEPLOYMENT_LOCATION/zookeeper-refdb.jar || \
-  { echo >&2 "Cannot download zookeeper plugin: Check internet connection. Abort\
-ing"; exit 1; }
+  download_artifact_from_ci zookeeper-refdb
 
 if [ "$BROKER_TYPE" = "kafka" ]; then
 echo "Downloading events-kafka plugin $GERRIT_BRANCH"
-  wget $GERRIT_CI/plugin-events-kafka-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-kafka/events-kafka.jar \
-  -O $DEPLOYMENT_LOCATION/events-kafka.jar || \
-  wget $GERRIT_CI/plugin-events-kafka-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-kafka/events-kafka.jar \
-  -O $DEPLOYMENT_LOCATION/events-kafka.jar || \
-  { echo >&2 "Cannot download events-kafka plugin: Check internet connection. Abort\
-ing"; exit 1; }
+  download_artifact_from_ci events-kafka
 fi
 
 if [ "$BROKER_TYPE" = "kinesis" ]; then
 echo "Downloading events-aws-kinesis plugin $GERRIT_BRANCH"
-  wget $GERRIT_CI/plugin-events-aws-kinesis-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-aws-kinesis/events-aws-kinesis.jar \
-  -O $DEPLOYMENT_LOCATION/events-aws-kinesis.jar || \
-  wget $GERRIT_CI/plugin-events-aws-kinesis-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-aws-kinesis/events-aws-kinesis.jar \
-  -O $DEPLOYMENT_LOCATION/events-aws-kinesis.jar || \
-  { echo >&2 "Cannot download events-aws-kinesis plugin: Check internet connection. Abort\
-ing"; exit 1; }
+  download_artifact_from_ci events-aws-kinesis
 fi
 
 
 if [ "$BROKER_TYPE" = "gcloud-pubsub" ]; then
 echo "Downloading events-gcloud-pubsub plugin $GERRIT_BRANCH"
-  wget $GERRIT_CI/plugin-events-gcloud-pubsub-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-gcloud-pubsub/events-gcloud-pubsub.jar \
-  -O $DEPLOYMENT_LOCATION/events-gcloud-pubsub.jar || \
-  wget $GERRIT_CI/plugin-events-gcloud-pubsub-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-gcloud-pubsub/events-gcloud-pubsub.jar \
-  -O $DEPLOYMENT_LOCATION/events-gcloud-pubsub.jar || \
-  { echo >&2 "Cannot download events-gcloud-pubsub plugin: Check internet connection. Abort\
-ing"; exit 1; }
+  download_artifact_from_ci events-gcloud-pubsub
 fi
 
 echo "Downloading metrics-reporter-prometheus plugin $GERRIT_BRANCH"
-  wget $GERRIT_CI/plugin-metrics-reporter-prometheus-bazel-$GERRIT_BRANCH/$LAST_BUILD/metrics-reporter-prometheus/metrics-reporter-prometheus.jar \
-  -O $DEPLOYMENT_LOCATION/metrics-reporter-prometheus.jar || \
-  wget $GERRIT_CI/plugin-metrics-reporter-prometheus-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/metrics-reporter-prometheus/metrics-reporter-prometheus.jar \
-  -O $DEPLOYMENT_LOCATION/metrics-reporter-prometheus.jar || \
-  { echo >&2 "Cannot download metrics-reporter-prometheus plugin: Check internet connection. Abort\
-ing"; exit 1; }
+  download_artifact_from_ci metrics-reporter-prometheus
 
 echo "Downloading pull-replication plugin $GERRIT_BRANCH"
-  wget $GERRIT_CI/plugin-pull-replication-bazel-$GERRIT_BRANCH/$LAST_BUILD/pull-replication/pull-replication.jar \
-  -O $DEPLOYMENT_LOCATION/pull-replication.jar || { echo >&2 "Cannot download pull-replication plugin: Check internet connection. Abort\
-ing"; exit 1; }
+  download_artifact_from_ci pull-replication
 
 if [ "$HTTPS_ENABLED" = "true" ];then
   export HTTP_PROTOCOL="https"
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 2d67eab..e3f53ab 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -65,6 +65,8 @@
   private static final String NUM_STRIPED_LOCKS = "numStripedLocks";
   private static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
 
+  private static final long DEFALULT_STREAM_EVENT_PUBLISH_TIMEOUT = 30000;
+
   private final Supplier<Cache> cache;
   private final Supplier<Event> event;
   private final Supplier<Index> index;
@@ -227,6 +229,17 @@
     }
   }
 
+  private static long getLong(
+      Supplier<Config> cfg, String section, String subSection, String name, long defaultValue) {
+    try {
+      return cfg.get().getLong(section, subSection, name, defaultValue);
+    } catch (IllegalArgumentException e) {
+      log.error("invalid value for {}; using default value {}", name, defaultValue);
+      log.debug("Failed to retrieve long value: {}", e.getMessage(), e);
+      return defaultValue;
+    }
+  }
+
   public static class Projects {
     public static final String SECTION = "projects";
     public static final String PATTERN_KEY = "pattern";
@@ -293,12 +306,15 @@
     static final String INDEX_SECTION = "index";
     static final String MAX_TRIES_KEY = "maxTries";
     static final String RETRY_INTERVAL_KEY = "retryInterval";
+    static final String SYNCHRONIZE_FORCED_KEY = "synchronizeForced";
+    static final boolean DEFAULT_SYNCHRONIZE_FORCED = true;
 
     private final int threadPoolSize;
     private final int retryInterval;
     private final int maxTries;
 
     private final int numStripedLocks;
+    private final boolean synchronizeForced;
 
     private Index(Supplier<Config> cfg) {
       super(cfg, INDEX_SECTION);
@@ -309,6 +325,8 @@
       maxTries = getInt(cfg, INDEX_SECTION, null, MAX_TRIES_KEY, DEFAULT_INDEX_MAX_TRIES);
       numStripedLocks =
           getInt(cfg, INDEX_SECTION, null, NUM_STRIPED_LOCKS, DEFAULT_NUM_STRIPED_LOCKS);
+      synchronizeForced =
+          getBoolean(cfg, INDEX_SECTION, null, SYNCHRONIZE_FORCED_KEY, DEFAULT_SYNCHRONIZE_FORCED);
     }
 
     public int threadPoolSize() {
@@ -326,19 +344,36 @@
     public int numStripedLocks() {
       return numStripedLocks;
     }
+
+    public boolean synchronizeForced() {
+      return synchronizeForced;
+    }
   }
 
   public static class Broker {
     static final String BROKER_SECTION = "broker";
+    static final String STREAM_EVENT_PUBLISH_TIMEOUT = "streamEventPublishTimeoutMs";
     private final Config cfg;
+    private long streamEventPublishTimeout;
 
     Broker(Supplier<Config> cfgSupplier) {
       cfg = cfgSupplier.get();
+      streamEventPublishTimeout =
+          getLong(
+              cfgSupplier,
+              BROKER_SECTION,
+              null,
+              STREAM_EVENT_PUBLISH_TIMEOUT,
+              DEFALULT_STREAM_EVENT_PUBLISH_TIMEOUT);
     }
 
     public String getTopic(String topicKey, String defValue) {
       return MoreObjects.firstNonNull(cfg.getString(BROKER_SECTION, null, topicKey), defValue);
     }
+
+    public long getStreamEventPublishTimeout() {
+      return streamEventPublishTimeout;
+    }
   }
 
   public static class ReplicationFilter {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/LibModuleLogFile.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/LibModuleLogFile.java
index 106bcde..7c88185 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/LibModuleLogFile.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/LibModuleLogFile.java
@@ -23,10 +23,11 @@
 public abstract class LibModuleLogFile {
 
   public LibModuleLogFile(SystemLog systemLog, String logName, Layout layout) {
-    AsyncAppender asyncAppender = systemLog.createAsyncAppender(logName, layout, true, true);
     Logger logger = LogManager.getLogger(logName);
-    logger.removeAppender(logName);
-    logger.addAppender(asyncAppender);
-    logger.setAdditivity(false);
+    if (logger.getAppender(logName) == null) {
+      AsyncAppender asyncAppender = systemLog.createAsyncAppender(logName, layout, true, true);
+      logger.addAppender(asyncAppender);
+      logger.setAdditivity(false);
+    }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index f58efa7..88422e1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -22,11 +22,11 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
@@ -39,17 +39,20 @@
   private final DynamicItem<BrokerApi> apiDelegate;
   private final BrokerMetrics metrics;
   private final MessageLogger msgLog;
+  private final String nodeInstanceId;
 
   @Inject
   public BrokerApiWrapper(
       @BrokerExecutor Executor executor,
       DynamicItem<BrokerApi> apiDelegate,
       BrokerMetrics metrics,
-      MessageLogger msgLog) {
+      MessageLogger msgLog,
+      @GerritInstanceId String instanceId) {
     this.apiDelegate = apiDelegate;
     this.executor = executor;
     this.metrics = metrics;
     this.msgLog = msgLog;
+    this.nodeInstanceId = instanceId;
   }
 
   public boolean sendSync(String topic, Event event) {
@@ -70,7 +73,7 @@
   @Override
   public ListenableFuture<Boolean> send(String topic, Event message) {
     SettableFuture<Boolean> resultFuture = SettableFuture.create();
-    if (Context.isForwardedEvent()) {
+    if (!nodeInstanceId.equals(message.instanceId)) {
       resultFuture.set(true);
       return resultFuture;
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheModule.java
index 6373f58..5e54951 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheModule.java
@@ -14,18 +14,31 @@
 
 package com.googlesource.gerrit.plugins.multisite.cache;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.gerrit.extensions.events.NewProjectCreatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.gerrit.server.cache.CacheRemovalListener;
+import com.googlesource.gerrit.plugins.multisite.ExecutorProvider;
 import java.util.concurrent.Executor;
 
 public class CacheModule extends LifecycleModule {
 
+  private final Class<? extends ExecutorProvider> cacheExecutorProviderClass;
+
+  public CacheModule() {
+    this(CacheExecutorProvider.class);
+  }
+
+  @VisibleForTesting
+  public CacheModule(Class<? extends ExecutorProvider> cacheExecutorProviderClass) {
+    this.cacheExecutorProviderClass = cacheExecutorProviderClass;
+  }
+
   @Override
   protected void configure() {
-    bind(Executor.class).annotatedWith(CacheExecutor.class).toProvider(CacheExecutorProvider.class);
+    bind(Executor.class).annotatedWith(CacheExecutor.class).toProvider(cacheExecutorProviderClass);
     listener().to(CacheExecutorProvider.class);
     DynamicSet.bind(binder(), CacheRemovalListener.class).to(CacheEvictionHandler.class);
     DynamicSet.bind(binder(), NewProjectCreatedListener.class).to(ProjectListUpdateHandler.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
index 44f8417..6a2c7a5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
@@ -14,7 +14,6 @@
 
 package com.googlesource.gerrit.plugins.multisite.cache;
 
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.gerrit.extensions.events.NewProjectCreatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.events.ProjectEvent;
@@ -33,18 +32,15 @@
 
   private final DynamicSet<ProjectListUpdateForwarder> forwarders;
   private final Executor executor;
-  private final ProjectsFilter projectsFilter;
   private final String instanceId;
 
   @Inject
   public ProjectListUpdateHandler(
       DynamicSet<ProjectListUpdateForwarder> forwarders,
       @CacheExecutor Executor executor,
-      ProjectsFilter filter,
       @GerritInstanceId String instanceId) {
     this.forwarders = forwarders;
     this.executor = executor;
-    this.projectsFilter = filter;
     this.instanceId = instanceId;
   }
 
@@ -61,7 +57,7 @@
   }
 
   private void process(ProjectEvent event, boolean delete) {
-    if (!Context.isForwardedEvent() && projectsFilter.matches(event.getProjectName())) {
+    if (!Context.isForwardedEvent()) {
       executor.execute(
           new ProjectListUpdateTask(
               new ProjectListUpdateEvent(event.getProjectName(), delete, instanceId)));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 7107b87..c2b9e9c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -70,13 +70,9 @@
     if ((Strings.isNullOrEmpty(sourceInstanceId) || instanceId.equals(sourceInstanceId))
         || !shouldConsumeEvent(event)) {
       if (Strings.isNullOrEmpty(sourceInstanceId)) {
-        logger.atWarning().log(
-            String.format(
-                "Dropping event %s because sourceInstanceId cannot be null", event.toString()));
+        logger.atWarning().log("Dropping event %s because sourceInstanceId cannot be null", event);
       } else if (instanceId.equals(sourceInstanceId)) {
-        logger.atFiner().log(
-            String.format(
-                "Dropping event %s produced by our instanceId %s", event.toString(), instanceId));
+        logger.atFiner().log("Dropping event %s produced by our instanceId %s", event, instanceId);
       }
       droppedEventListeners.forEach(l -> l.onEventDropped(event));
     } else {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutor.java
deleted file mode 100644
index 07bbbf7..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutor.java
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.event;
-
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import com.google.inject.BindingAnnotation;
-import java.lang.annotation.Retention;
-
-@Retention(RUNTIME)
-@BindingAnnotation
-@interface EventExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProvider.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProvider.java
deleted file mode 100644
index e4979f9..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.event;
-
-import com.google.gerrit.server.git.WorkQueue;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.ExecutorProvider;
-
-@Singleton
-class EventExecutorProvider extends ExecutorProvider {
-
-  @Inject
-  EventExecutorProvider(WorkQueue workQueue) {
-    super(workQueue, 1, "Forward-Stream-Event");
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java
deleted file mode 100644
index b2efb80..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.event;
-
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventListener;
-import com.google.gerrit.server.events.ProjectEvent;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import java.util.concurrent.Executor;
-
-class EventHandler implements EventListener {
-  private final Executor executor;
-  private final DynamicSet<StreamEventForwarder> forwarders;
-  private final ProjectsFilter projectsFilter;
-
-  @Inject
-  EventHandler(
-      DynamicSet<StreamEventForwarder> forwarders,
-      @EventExecutor Executor executor,
-      ProjectsFilter projectsFilter) {
-    this.forwarders = forwarders;
-    this.executor = executor;
-    this.projectsFilter = projectsFilter;
-  }
-
-  @Override
-  public void onEvent(Event event) {
-    if (!Context.isForwardedEvent() && event instanceof ProjectEvent) {
-      if (projectsFilter.matches(event)) {
-        executor.execute(new EventTask(event));
-      }
-    }
-  }
-
-  class EventTask implements Runnable {
-    private final Event event;
-
-    EventTask(Event event) {
-      this.event = event;
-    }
-
-    @Override
-    public void run() {
-      forwarders.forEach(f -> f.send(event));
-    }
-
-    @Override
-    public String toString() {
-      return String.format("Send event '%s' to target instance", event.type);
-    }
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
index 5f16210..8c67823 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
@@ -14,37 +14,42 @@
 
 package com.googlesource.gerrit.plugins.multisite.event;
 
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisherConfig;
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisherModule;
+import com.google.gerrit.extensions.events.GitBatchRefUpdateListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.gerrit.server.events.EventListener;
-import com.google.inject.Scopes;
+import com.google.inject.Inject;
 import com.google.inject.multibindings.OptionalBinder;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdateImpl;
-import java.util.concurrent.Executor;
 
 public class EventModule extends LifecycleModule {
+  private final Configuration configuration;
 
-  private final Configuration config;
-
-  public EventModule(Configuration config) {
-    this.config = config;
+  @Inject
+  public EventModule(Configuration configuration) {
+    this.configuration = configuration;
   }
 
   @Override
   protected void configure() {
-    bind(Executor.class).annotatedWith(EventExecutor.class).toProvider(EventExecutorProvider.class);
-    listener().to(EventExecutorProvider.class);
-    DynamicSet.bind(binder(), EventListener.class).to(EventHandler.class);
+    bind(StreamEventPublisherConfig.class)
+        .toInstance(
+            new StreamEventPublisherConfig(
+                EventTopic.STREAM_EVENT_TOPIC.topic(configuration),
+                configuration.broker().getStreamEventPublishTimeout()));
+
+    install(new StreamEventPublisherModule());
+
     OptionalBinder<ProjectVersionRefUpdate> projectVersionRefUpdateBinder =
         OptionalBinder.newOptionalBinder(binder(), ProjectVersionRefUpdate.class);
-    if (config.getSharedRefDbConfiguration().getSharedRefDb().isEnabled()) {
-      DynamicSet.bind(binder(), EventListener.class).to(ProjectVersionRefUpdateImpl.class);
-      projectVersionRefUpdateBinder
-          .setBinding()
-          .to(ProjectVersionRefUpdateImpl.class)
-          .in(Scopes.SINGLETON);
+    if (configuration.getSharedRefDbConfiguration().getSharedRefDb().isEnabled()) {
+      DynamicSet.bind(binder(), GitBatchRefUpdateListener.class)
+          .to(ProjectVersionRefUpdateImpl.class);
+      projectVersionRefUpdateBinder.setBinding().to(ProjectVersionRefUpdateImpl.class);
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java
index dcfd1b0..e2e3954 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java
@@ -51,11 +51,8 @@
    */
   public void dispatch(Event event) throws PermissionBackendException {
     try (ManualRequestContext ctx = oneOffCtx.open()) {
-      Context.setForwardedEvent(true);
       log.debug("dispatching event {}", event.getType());
       dispatcher.get().postEvent(event);
-    } finally {
-      Context.unsetForwardedEvent();
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderModule.java
index ca17004..1d36e23 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderModule.java
@@ -24,6 +24,5 @@
     DynamicSet.setOf(binder(), CacheEvictionForwarder.class);
     DynamicSet.setOf(binder(), IndexEventForwarder.class);
     DynamicSet.setOf(binder(), ProjectListUpdateForwarder.class);
-    DynamicSet.setOf(binder(), StreamEventForwarder.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/StreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/StreamEventForwarder.java
deleted file mode 100644
index 79a9af2..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/StreamEventForwarder.java
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.forwarder;
-
-import com.google.gerrit.server.events.Event;
-
-public interface StreamEventForwarder {
-  /**
-   * Forward a stream event to the other master.
-   *
-   * @param event the event to forward.
-   * @return true if successful, otherwise false.
-   */
-  boolean send(Event event);
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
index 6bd6437..d6c5658 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
@@ -19,7 +19,6 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
 
 public class BrokerForwarderModule extends LifecycleModule {
   @Override
@@ -28,6 +27,5 @@
     DynamicSet.bind(binder(), CacheEvictionForwarder.class).to(BrokerCacheEvictionForwarder.class);
     DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
         .to(BrokerProjectListUpdateForwarder.class);
-    DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
deleted file mode 100644
index 8a020fc..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
-
-import com.google.gerrit.server.events.Event;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-
-@Singleton
-public class BrokerStreamEventForwarder implements StreamEventForwarder {
-  private final BrokerApiWrapper broker;
-  private final Configuration cfg;
-
-  @Inject
-  BrokerStreamEventForwarder(BrokerApiWrapper broker, Configuration cfg) {
-    this.broker = broker;
-    this.cfg = cfg;
-  }
-
-  @Override
-  public boolean send(Event event) {
-    return broker.sendSync(EventTopic.STREAM_EVENT_TOPIC.topic(cfg), event);
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index 4e7a781..3255bcf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -21,7 +21,7 @@
   BATCH_INDEX_TOPIC("GERRIT.EVENT.BATCH.INDEX", "batchIndexEvent"),
   CACHE_TOPIC("GERRIT.EVENT.CACHE", "cacheEvent"),
   PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST", "projectListEvent"),
-  STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM", "streamEvent");
+  STREAM_EVENT_TOPIC("gerrit", "streamEvent");
 
   private final String topic;
   private final String aliasKey;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
index 4ef5b7f..063d7bf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
@@ -14,10 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.index;
 
-import com.google.gerrit.entities.Change;
-import com.google.gerrit.entities.HumanComment;
 import com.google.gerrit.exceptions.StorageException;
-import com.google.gerrit.server.CommentsUtil;
 import com.google.gerrit.server.change.ChangeFinder;
 import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.gerrit.server.git.GitRepositoryManager;
@@ -42,7 +39,6 @@
 public class ChangeCheckerImpl implements ChangeChecker {
   private static final Logger log = LoggerFactory.getLogger(ChangeCheckerImpl.class);
   private final GitRepositoryManager gitRepoMgr;
-  private final CommentsUtil commentsUtil;
   private final OneOffRequestContext oneOffReqCtx;
   private final String changeId;
   private final ChangeFinder changeFinder;
@@ -57,14 +53,12 @@
   @Inject
   public ChangeCheckerImpl(
       GitRepositoryManager gitRepoMgr,
-      CommentsUtil commentsUtil,
       ChangeFinder changeFinder,
       OneOffRequestContext oneOffReqCtx,
       @GerritInstanceId String instanceId,
       @Assisted String changeId) {
     this.changeFinder = changeFinder;
     this.gitRepoMgr = gitRepoMgr;
-    this.commentsUtil = commentsUtil;
     this.oneOffReqCtx = oneOffReqCtx;
     this.changeId = changeId;
     this.instanceId = instanceId;
@@ -177,20 +171,7 @@
   }
 
   private Optional<Long> computeLastChangeTs() {
-    return getChangeNotes().map(notes -> getTsFromChangeAndDraftComments(notes));
-  }
-
-  private long getTsFromChangeAndDraftComments(ChangeNotes notes) {
-    Change change = notes.getChange();
-    Timestamp changeTs = change.getLastUpdatedOn();
-    try {
-      for (HumanComment comment : commentsUtil.draftByChange(changeNotes.get())) {
-        Timestamp commentTs = comment.writtenOn;
-        changeTs = commentTs.after(changeTs) ? commentTs : changeTs;
-      }
-    } catch (StorageException e) {
-      log.warn("Unable to access draft comments for change {}", change, e);
-    }
-    return changeTs.getTime() / 1000;
+    return getChangeNotes()
+        .map(notes -> Timestamp.from(notes.getChange().getLastUpdatedOn()).getTime() / 1000);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/CurrentRequestContext.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/CurrentRequestContext.java
new file mode 100644
index 0000000..d6598b9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/CurrentRequestContext.java
@@ -0,0 +1,58 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.index;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import java.util.function.Consumer;
+
+@Singleton
+public class CurrentRequestContext {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private ThreadLocalRequestContext threadLocalCtx;
+  private Configuration cfg;
+  private OneOffRequestContext oneOffCtx;
+
+  @Inject
+  public CurrentRequestContext(
+      ThreadLocalRequestContext threadLocalCtx, Configuration cfg, OneOffRequestContext oneOffCtx) {
+    this.threadLocalCtx = threadLocalCtx;
+    this.cfg = cfg;
+    this.oneOffCtx = oneOffCtx;
+  }
+
+  public void onlyWithContext(Consumer<RequestContext> body) {
+    RequestContext ctx = threadLocalCtx.getContext();
+    if (ctx == null && !cfg.index().synchronizeForced()) {
+      logger.atFine().log("No context, skipping event (index.synchronizeForced is false)");
+      return;
+    }
+
+    if (ctx == null) {
+      try (ManualRequestContext manualCtx = oneOffCtx.open()) {
+        body.accept(manualCtx);
+      }
+    } else {
+      body.accept(ctx);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index 02f1b1c..9aee56e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -14,7 +14,6 @@
 
 package com.googlesource.gerrit.plugins.multisite.index;
 
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.common.base.Objects;
 import com.google.gerrit.extensions.events.AccountIndexedListener;
 import com.google.gerrit.extensions.events.ChangeIndexedListener;
@@ -47,39 +46,42 @@
   private final DynamicSet<IndexEventForwarder> forwarders;
   private final Set<IndexTask> queuedTasks = Collections.newSetFromMap(new ConcurrentHashMap<>());
   private final ChangeCheckerImpl.Factory changeChecker;
-  private final ProjectsFilter projectsFilter;
   private final GroupChecker groupChecker;
   private final String instanceId;
+  private final CurrentRequestContext currCtx;
 
   @Inject
   IndexEventHandler(
       @IndexExecutor Executor executor,
       DynamicSet<IndexEventForwarder> forwarders,
       ChangeCheckerImpl.Factory changeChecker,
-      ProjectsFilter projectsFilter,
       GroupChecker groupChecker,
-      @GerritInstanceId String instanceId) {
+      @GerritInstanceId String instanceId,
+      CurrentRequestContext currCtx) {
     this.forwarders = forwarders;
     this.executor = executor;
     this.changeChecker = changeChecker;
-    this.projectsFilter = projectsFilter;
     this.groupChecker = groupChecker;
     this.instanceId = instanceId;
+    this.currCtx = currCtx;
   }
 
   @Override
   public void onAccountIndexed(int id) {
-    if (!Context.isForwardedEvent()) {
-      IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id, instanceId));
-      if (queuedTasks.add(task)) {
-        executor.execute(task);
-      }
-    }
+    currCtx.onlyWithContext(
+        (ctx) -> {
+          if (!Context.isForwardedEvent()) {
+            IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id, instanceId));
+            if (queuedTasks.add(task)) {
+              executor.execute(task);
+            }
+          }
+        });
   }
 
   @Override
   public void onChangeIndexed(String projectName, int id) {
-    executeIndexChangeTask(projectName, id);
+    currCtx.onlyWithContext((ctx) -> executeIndexChangeTask(projectName, id));
   }
 
   @Override
@@ -101,7 +103,7 @@
 
   @Override
   public void onProjectIndexed(String projectName) {
-    if (!Context.isForwardedEvent() && projectsFilter.matches(projectName)) {
+    if (!Context.isForwardedEvent()) {
       IndexProjectTask task = new IndexProjectTask(new ProjectIndexEvent(projectName, instanceId));
       if (queuedTasks.add(task)) {
         executor.execute(task);
@@ -110,7 +112,7 @@
   }
 
   private void executeIndexChangeTask(String projectName, int id) {
-    if (!Context.isForwardedEvent() && projectsFilter.matches(projectName)) {
+    if (!Context.isForwardedEvent()) {
       ChangeChecker checker = changeChecker.create(projectName + "~" + id);
 
       try {
@@ -182,7 +184,9 @@
 
     @Override
     public String toString() {
-      return String.format("Index change %s in target instance", changeIndexEvent.changeId);
+      return String.format(
+          "Index change %s for project %s produced by instance %s",
+          changeIndexEvent.changeId, changeIndexEvent.projectName, changeIndexEvent.instanceId);
     }
   }
 
@@ -213,7 +217,9 @@
 
     @Override
     public String toString() {
-      return String.format("Index change %s in target instance", changeIndexEvent.changeId);
+      return String.format(
+          "Index change %s for project %s produced by instance %s",
+          changeIndexEvent.changeId, changeIndexEvent.projectName, changeIndexEvent.instanceId);
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilter.java
index d851420..3fc57ae 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilter.java
@@ -86,7 +86,7 @@
     } catch (IOException ioe) {
       String message = String.format("Error while opening project: '%s'", projectName);
       repLog.error(message);
-      logger.atSevere().withCause(ioe).log(message);
+      logger.atSevere().withCause(ioe).log("%s", message);
       return Collections.emptySet();
     }
   }
@@ -157,13 +157,13 @@
     } catch (GlobalRefDbLockException gle) {
       String message = String.format("%s is locked on shared-refdb", ref);
       repLog.error(message);
-      logger.atSevere().withCause(gle).log(message);
+      logger.atSevere().withCause(gle).log("%s", message);
       return Optional.empty();
     } catch (IOException ioe) {
       String message =
           String.format("Error while extracting ref '%s' for project '%s'", ref, projectName);
       repLog.error(message);
-      logger.atSevere().withCause(ioe).log(message);
+      logger.atSevere().withCause(ioe).log("%s", message);
       return Optional.empty();
     }
   }
@@ -194,7 +194,7 @@
       String message =
           String.format("Error while waiting for next check for '%s', ref '%s'", projectName, ref);
       repLog.error(message);
-      logger.atWarning().withCause(ie).log(message);
+      logger.atWarning().withCause(ie).log("%s", message);
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
index 6f9c2e7..0aba056 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
@@ -102,9 +102,9 @@
           .collect(Collectors.toList());
 
     } catch (IOException ioe) {
-      String message = String.format("Error while opening project: '%s'", projectName);
-      repLog.error(message);
-      logger.atSevere().withCause(ioe).log(message);
+      final String messageFmt = "Error while opening project: '%s'";
+      repLog.error(messageFmt, projectName);
+      logger.atSevere().withCause(ioe).log(messageFmt, projectName);
       return Collections.emptyList();
     }
   }
@@ -132,16 +132,14 @@
           ? Optional.of(refUpdateReloaded)
           : Optional.empty();
     } catch (GlobalRefDbLockException gle) {
-      String message =
-          String.format("%s is locked on shared-refdb and thus will NOT BE replicated", ref);
-      repLog.error(message);
-      logger.atSevere().withCause(gle).log(message);
+      final String messageFmt = "%s is locked on shared-refdb and thus will NOT BE replicated";
+      repLog.error(messageFmt, ref);
+      logger.atSevere().withCause(gle).log(messageFmt, ref);
       return Optional.empty();
     } catch (IOException ioe) {
-      String message =
-          String.format("Error while extracting ref '%s' for project '%s'", ref, projectName);
-      repLog.error(message);
-      logger.atSevere().withCause(ioe).log(message);
+      final String messageFmt = "Error while extracting ref '%s' for project '%s'";
+      repLog.error(messageFmt, ref, projectName);
+      logger.atSevere().withCause(ioe).log(messageFmt, ref, projectName);
       return Optional.empty();
     }
   }
@@ -174,10 +172,9 @@
     try {
       Thread.sleep(randomSleepTimeMsec);
     } catch (InterruptedException ie) {
-      String message =
-          String.format("Error while waiting for next check for '%s', ref '%s'", projectName, ref);
-      repLog.error(message);
-      logger.atWarning().withCause(ie).log(message);
+      final String messageFmt = "Error while waiting for next check for '%s', ref '%s'";
+      repLog.error(messageFmt, projectName, ref);
+      logger.atWarning().withCause(ie).log(messageFmt, projectName, ref);
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java
index e07fef0..150ea52 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java
@@ -18,22 +18,18 @@
 import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
 
 import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.Project.NameKey;
-import com.google.gerrit.entities.RefNames;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventListener;
-import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.extensions.events.GitBatchRefUpdateListener;
 import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
@@ -44,7 +40,9 @@
 import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.Repository;
 
-public class ProjectVersionRefUpdateImpl implements EventListener, ProjectVersionRefUpdate {
+@Singleton
+public class ProjectVersionRefUpdateImpl
+    implements GitBatchRefUpdateListener, ProjectVersionRefUpdate {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final Set<RefUpdate.Result> SUCCESSFUL_RESULTS =
       ImmutableSet.of(RefUpdate.Result.NEW, RefUpdate.Result.FORCED, RefUpdate.Result.NO_CHANGE);
@@ -52,7 +50,6 @@
   private final GitRepositoryManager gitRepositoryManager;
   private final GitReferenceUpdated gitReferenceUpdated;
   private final ProjectVersionLogger verLogger;
-  private final ProjectsFilter projectsFilter;
 
   protected final SharedRefDatabaseWrapper sharedRefDb;
 
@@ -61,63 +58,43 @@
       GitRepositoryManager gitRepositoryManager,
       SharedRefDatabaseWrapper sharedRefDb,
       GitReferenceUpdated gitReferenceUpdated,
-      ProjectVersionLogger verLogger,
-      ProjectsFilter projectsFilter) {
+      ProjectVersionLogger verLogger) {
     this.gitRepositoryManager = gitRepositoryManager;
     this.sharedRefDb = sharedRefDb;
     this.gitReferenceUpdated = gitReferenceUpdated;
     this.verLogger = verLogger;
-    this.projectsFilter = projectsFilter;
   }
 
   @Override
-  public void onEvent(Event event) {
-    logger.atFine().log("Processing event type: " + event.type);
+  public void onGitBatchRefUpdate(Event event) {
     // Producer of the Event use RefUpdatedEvent to trigger the version update
-    if (!Context.isForwardedEvent() && event instanceof RefUpdatedEvent) {
-      if (projectsFilter.matches(event)) {
-        updateProducerProjectVersionUpdate((RefUpdatedEvent) event);
-      }
-    }
+    updateProducerProjectVersionUpdate(event);
   }
 
-  private boolean isSpecialRefName(String refName) {
-    return refName.startsWith(RefNames.REFS_SEQUENCES)
-        || refName.startsWith(RefNames.REFS_STARRED_CHANGES)
-        || refName.equals(MULTI_SITE_VERSIONING_REF);
-  }
-
-  private void updateProducerProjectVersionUpdate(RefUpdatedEvent refUpdatedEvent) {
-    String refName = refUpdatedEvent.getRefName();
-
-    if (isSpecialRefName(refName)) {
+  private void updateProducerProjectVersionUpdate(Event refUpdatedEvent) {
+    if (refUpdatedEvent.getRefNames().stream()
+        .allMatch(refName -> refName.equals(MULTI_SITE_VERSIONING_REF))) {
       logger.atFine().log(
           "Found a special ref name %s, skipping update for %s",
-          refName, refUpdatedEvent.getProjectNameKey().get());
+          MULTI_SITE_VERSIONING_REF, refUpdatedEvent.getProjectName());
       return;
     }
+
     try {
-      Project.NameKey projectNameKey = refUpdatedEvent.getProjectNameKey();
+      Project.NameKey projectNameKey = Project.nameKey(refUpdatedEvent.getProjectName());
       long newVersion = getCurrentGlobalVersionNumber();
 
-      Optional<RefUpdate> newProjectVersionRefUpdate =
-          updateLocalProjectVersion(projectNameKey, newVersion);
+      RefUpdate newProjectVersionRefUpdate = updateLocalProjectVersion(projectNameKey, newVersion);
 
-      if (newProjectVersionRefUpdate.isPresent()) {
-        verLogger.log(projectNameKey, newVersion, 0L);
+      verLogger.log(projectNameKey, newVersion, 0L);
 
-        if (updateSharedProjectVersion(projectNameKey, newVersion)) {
-          gitReferenceUpdated.fire(projectNameKey, newProjectVersionRefUpdate.get(), null);
-        }
-      } else {
-        logger.atWarning().log(
-            "Ref %s not found on projet %s: skipping project version update",
-            refUpdatedEvent.getRefName(), projectNameKey);
+      if (updateSharedProjectVersion(projectNameKey, newVersion)) {
+        gitReferenceUpdated.fire(projectNameKey, newProjectVersionRefUpdate, null);
       }
     } catch (LocalProjectVersionUpdateException | SharedProjectVersionUpdateException e) {
       logger.atSevere().withCause(e).log(
-          "Issue encountered when updating version for project "
-              + refUpdatedEvent.getProjectNameKey());
+          "Issue encountered when updating version for project %s",
+          refUpdatedEvent.getProjectName());
     }
   }
 
@@ -136,6 +113,7 @@
     return newId;
   }
 
+  @SuppressWarnings("FloggerLogString")
   private boolean updateSharedProjectVersion(Project.NameKey projectNameKey, Long newVersion)
       throws SharedProjectVersionUpdateException {
 
@@ -147,15 +125,13 @@
     try {
       if (sharedVersion.isPresent() && sharedVersion.get() >= newVersion) {
         logger.atWarning().log(
-            String.format(
-                "NOT Updating project %s value=%d in shared ref-db because is more recent than the local value=%d",
-                projectNameKey.get(), newVersion, sharedVersion.get()));
+            "NOT Updating project %s value=%d in shared ref-db because is more recent than the local value=%d",
+            projectNameKey.get(), newVersion, sharedVersion.get());
         return false;
       }
 
       logger.atFine().log(
-          String.format(
-              "Updating shared project %s value to %d", projectNameKey.get(), newVersion));
+          "Updating shared project %s value to %d", projectNameKey.get(), newVersion);
 
       updateProjectVersionValue(projectNameKey, newVersion, sharedVersion);
       return true;
@@ -250,8 +226,8 @@
     }
   }
 
-  private Optional<RefUpdate> updateLocalProjectVersion(
-      Project.NameKey projectNameKey, long newVersionNumber)
+  @SuppressWarnings("FloggerLogString")
+  private RefUpdate updateLocalProjectVersion(Project.NameKey projectNameKey, long newVersionNumber)
       throws LocalProjectVersionUpdateException {
     logger.atFine().log(
         "Updating local version for project %s with version %d",
@@ -268,7 +244,7 @@
         throw new LocalProjectVersionUpdateException(message);
       }
 
-      return Optional.of(refUpdate);
+      return refUpdate;
     } catch (IOException e) {
       String message = "Cannot create versioning command for " + projectNameKey.get();
       logger.atSevere().withCause(e).log(message);
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index e451cb3..0485d5b 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -47,6 +47,11 @@
     service such as ElasticSearch.
     Defaults to true.
 
+```index.synchronizeForced```
+:   Whether to synchronize forced index events. E.g. on-line reindex
+automatically triggered upon version upgrades.
+Defaults to true.
+
 ```index.threadPoolSize```
 :   Maximum number of threads used to send index events to the target instance.
     Defaults to 4.
@@ -67,9 +72,9 @@
 :   Name of the topic to use for publishing indexing events
     Defaults to GERRIT.EVENT.INDEX
 
-```broker.streamEventTopic```
-:   Name of the topic to use for publishing stream events
-    Defaults to GERRIT.EVENT.STREAM
+`broker.streamEventTopic`
+:   Name of the topic to use for publishing all stream events.
+    Default: gerrit
 
 ```broker.cacheEventTopic```
 :   Name of the topic to use for publishing cache eviction events
@@ -79,6 +84,10 @@
 :   Name of the topic to use for publishing cache eviction events
     Defaults to GERRIT.EVENT.PROJECT.LIST
 
+```broker.streamEventPublishTimeoutMs```
+:   The timeout in milliseconds for publishing stream events.
+    Defaults to 30000 (30 seconds).
+
 **NOTE**: All broker settings are ignored when all of the `cache`,
 `index` or `event` synchronization is disabled.
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
index 0863f55..3a24773 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
@@ -21,7 +21,9 @@
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Event.EVENT_SECTION;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.DEFAULT_SYNCHRONIZE;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.SYNCHRONIZE_KEY;
+import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.DEFAULT_SYNCHRONIZE_FORCED;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.INDEX_SECTION;
+import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.SYNCHRONIZE_FORCED_KEY;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.THREAD_POOL_SIZE_KEY;
 
 import com.google.common.collect.ImmutableList;
@@ -129,4 +131,13 @@
     replicationConfig.setBoolean("gerrit", null, "replicateOnStartup", true);
     assertThat(new Configuration(globalPluginConfig, replicationConfig).validate()).isNotEmpty();
   }
+
+  @Test
+  public void testGetIndexSynchronizeForced() throws Exception {
+    assertThat(getConfiguration().index().synchronizeForced())
+        .isEqualTo(DEFAULT_SYNCHRONIZE_FORCED);
+
+    globalPluginConfig.setBoolean(INDEX_SECTION, null, SYNCHRONIZE_FORCED_KEY, false);
+    assertThat(getConfiguration().index().synchronizeForced()).isFalse();
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
index 7d1751c..50f55b2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -22,6 +22,7 @@
 
 @RunWith(MockitoJUnitRunner.class)
 public class BrokerApiWrapperTest {
+  private static final String DEFAULT_INSTANCE_ID = "instance-id";
   @Mock private BrokerMetrics brokerMetrics;
   @Mock private BrokerApi brokerApi;
   @Mock Event event;
@@ -32,13 +33,14 @@
 
   @Before
   public void setUp() {
-    event.instanceId = "instance-id";
+    event.instanceId = DEFAULT_INSTANCE_ID;
     objectUnderTest =
         new BrokerApiWrapper(
             MoreExecutors.directExecutor(),
             DynamicItem.itemOf(BrokerApi.class, brokerApi),
             brokerMetrics,
-            msgLog);
+            msgLog,
+            DEFAULT_INSTANCE_ID);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
index ce222d0..ed51e1b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.cache;
 
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
 
 import com.google.common.cache.RemovalCause;
 import com.google.common.cache.RemovalNotification;
@@ -44,6 +44,6 @@
     handler.onRemoval(
         "test", "accounts", RemovalNotification.create("test", "accounts", RemovalCause.EXPLICIT));
 
-    verifyZeroInteractions(executorMock);
+    verifyNoInteractions(executorMock);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
index c8216bd..3a4242c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
@@ -20,10 +20,9 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.when;
 
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gerrit.extensions.events.NewProjectCreatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
@@ -46,14 +45,12 @@
   private ProjectListUpdateHandler handler;
 
   @Mock private ProjectListUpdateForwarder forwarder;
-  @Mock private ProjectsFilter projectsFilter;
 
   @Before
   public void setUp() {
-    when(projectsFilter.matches(any(String.class))).thenReturn(true);
     handler =
         new ProjectListUpdateHandler(
-            asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter, INSTANCE_ID);
+            asDynamicSet(forwarder), MoreExecutors.directExecutor(), INSTANCE_ID);
   }
 
   private DynamicSet<ProjectListUpdateForwarder> asDynamicSet(
@@ -93,12 +90,11 @@
     handler.onNewProjectCreated(mock(NewProjectCreatedListener.Event.class));
     handler.onProjectDeleted(mock(ProjectDeletedListener.Event.class));
     Context.unsetForwardedEvent();
-    verifyZeroInteractions(forwarder);
+    verifyNoInteractions(forwarder);
   }
 
   @Test
   public void shouldNotForwardIfFilteredOutByProjectName() throws Exception {
-    when(projectsFilter.matches(any(String.class))).thenReturn(false);
     String projectName = "projectToAdd";
     NewProjectCreatedListener.Event event = mock(NewProjectCreatedListener.Event.class);
     when(event.getProjectName()).thenReturn(projectName);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
index 0e63528..fd4a0aa 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
@@ -128,6 +128,6 @@
         Change.id(CHANGE_ID),
         Account.id(9999),
         BranchNameKey.create(Project.nameKey(PROJECT_NAME), "refs/heads/master"),
-        TimeUtil.nowTs());
+        TimeUtil.now());
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
index 6413b18..6a03a6c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
@@ -16,7 +16,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.when;
 
 import com.google.common.base.Suppliers;
@@ -140,7 +140,7 @@
 
     metrics.updateReplicationStatusMetrics(eventMessage);
 
-    verifyZeroInteractions(verLogger);
+    verifyNoInteractions(verLogger);
   }
 
   @Test
@@ -152,7 +152,7 @@
 
     metrics.updateReplicationStatusMetrics(eventMessage);
 
-    verifyZeroInteractions(verLogger);
+    verifyNoInteractions(verLogger);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProviderTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProviderTest.java
deleted file mode 100644
index a025dd0..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProviderTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.event;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.gerrit.server.git.WorkQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class EventExecutorProviderTest {
-  @Mock private ScheduledThreadPoolExecutor executorMock;
-  private EventExecutorProvider eventsExecutorProvider;
-
-  @Before
-  public void setUp() throws Exception {
-    WorkQueue workQueueMock = mock(WorkQueue.class);
-    when(workQueueMock.createQueue(1, "Forward-Stream-Event")).thenReturn(executorMock);
-    eventsExecutorProvider = new EventExecutorProvider(workQueueMock);
-  }
-
-  @Test
-  public void shouldReturnExecutor() throws Exception {
-    assertThat(eventsExecutorProvider.get()).isEqualTo(executorMock);
-  }
-
-  @Test
-  public void testStop() throws Exception {
-    eventsExecutorProvider.start();
-    assertThat(eventsExecutorProvider.get()).isEqualTo(executorMock);
-    eventsExecutorProvider.stop();
-    verify(executorMock).shutdown();
-    assertThat(eventsExecutorProvider.get()).isNull();
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java
deleted file mode 100644
index b9b416d..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.event;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.ProjectEvent;
-import com.google.gerrit.server.events.RefUpdatedEvent;
-import com.googlesource.gerrit.plugins.multisite.event.EventHandler.EventTask;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class EventHandlerTest {
-
-  private EventHandler eventHandler;
-
-  @Mock private StreamEventForwarder forwarder;
-  @Mock private ProjectsFilter projectsFilter;
-
-  @Before
-  public void setUp() {
-    when(projectsFilter.matches(any(ProjectEvent.class))).thenReturn(true);
-    eventHandler =
-        new EventHandler(asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter);
-  }
-
-  private DynamicSet<StreamEventForwarder> asDynamicSet(StreamEventForwarder forwarder) {
-    DynamicSet<StreamEventForwarder> result = new DynamicSet<>();
-    result.add("multi-site", forwarder);
-    return result;
-  }
-
-  @Test
-  public void shouldForwardAnyProjectEvent() throws Exception {
-    ProjectEvent event = mock(ProjectEvent.class);
-    eventHandler.onEvent(event);
-    verify(forwarder).send(event);
-  }
-
-  @Test
-  public void shouldNotForwardNonProjectEvent() throws Exception {
-    eventHandler.onEvent(mock(Event.class));
-    verifyZeroInteractions(forwarder);
-  }
-
-  @Test
-  public void shouldNotForwardIfAlreadyForwardedEvent() throws Exception {
-    Context.setForwardedEvent(true);
-    eventHandler.onEvent(mock(ProjectEvent.class));
-    Context.unsetForwardedEvent();
-    verifyZeroInteractions(forwarder);
-  }
-
-  @Test
-  public void shouldNotForwardIfFilteredOutByProjectName() throws Exception {
-    when(projectsFilter.matches(any(ProjectEvent.class))).thenReturn(false);
-
-    ProjectEvent event = mock(ProjectEvent.class);
-
-    eventHandler.onEvent(event);
-    verify(forwarder, never()).send(event);
-  }
-
-  @Test
-  public void tesEventTaskToString() throws Exception {
-    Event event = new RefUpdatedEvent();
-    EventTask task = eventHandler.new EventTask(event);
-    assertThat(task.toString())
-        .isEqualTo(String.format("Send event '%s' to target instance", event.type));
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
index 413b6c1..4ffdbbc 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
@@ -16,7 +16,7 @@
 
 import static com.google.gerrit.testing.GerritJUnit.assertThrows;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
 
 import com.google.gerrit.entities.Account;
 import com.google.gerrit.server.config.AllUsersName;
@@ -73,7 +73,7 @@
     verify(indexAccountHandler)
         .indexAsync(Account.id(event.accountId), ForwardedIndexingHandler.Operation.INDEX);
 
-    verifyZeroInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
+    verifyNoInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
   }
 
   @Test
@@ -87,7 +87,7 @@
     verify(indexAccountHandler)
         .indexAsync(Account.id(event.accountId), ForwardedIndexingHandler.Operation.INDEX);
 
-    verifyZeroInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
+    verifyNoInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
 
     streamEventRouter.route(new RefReplicationDoneEvent(allUsersName.get(), "refs/any", 1));
 
@@ -103,7 +103,7 @@
     verify(indexGroupHandler)
         .index(groupId, ForwardedIndexingHandler.Operation.INDEX, Optional.of(event));
 
-    verifyZeroInteractions(indexAccountHandler, indexChangeHandler, indexProjectHandler);
+    verifyNoInteractions(indexAccountHandler, indexChangeHandler, indexProjectHandler);
   }
 
   @Test
@@ -115,7 +115,7 @@
     verify(indexProjectHandler)
         .index(projectName, ForwardedIndexingHandler.Operation.INDEX, Optional.of(event));
 
-    verifyZeroInteractions(indexAccountHandler, indexChangeHandler, indexGroupHandler);
+    verifyNoInteractions(indexAccountHandler, indexChangeHandler, indexGroupHandler);
   }
 
   @Test
@@ -129,7 +129,7 @@
             ForwardedIndexingHandler.Operation.INDEX,
             Optional.of(event));
 
-    verifyZeroInteractions(indexAccountHandler, indexGroupHandler, indexProjectHandler);
+    verifyNoInteractions(indexAccountHandler, indexGroupHandler, indexProjectHandler);
   }
 
   @Test
@@ -143,7 +143,7 @@
             ForwardedIndexingHandler.Operation.DELETE,
             Optional.of(event));
 
-    verifyZeroInteractions(indexAccountHandler, indexGroupHandler, indexProjectHandler);
+    verifyNoInteractions(indexAccountHandler, indexGroupHandler, indexProjectHandler);
   }
 
   @Test
@@ -151,7 +151,7 @@
     final IndexEvent newEventType = new IndexEvent("new-type", INSTANCE_ID) {};
 
     assertThrows(UnsupportedOperationException.class, () -> router.route(newEventType));
-    verifyZeroInteractions(
+    verifyNoInteractions(
         indexAccountHandler, indexChangeHandler, indexGroupHandler, indexProjectHandler);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
index 3b4bec5..ac90436 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
@@ -55,6 +55,6 @@
         Change.id(1),
         Account.id(1),
         BranchNameKey.create("proj", "refs/heads/master"),
-        TimeUtil.nowTs());
+        TimeUtil.now());
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
index 28bf56d..c583455 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
@@ -16,7 +16,7 @@
 
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
 
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
@@ -99,21 +99,21 @@
   @Test
   public void shouldSkipEventFromHighAvailabilityPluginThread() {
     brokerForwarder.send(newForwarderTask(HIGH_AVAILABILITY_PLUGIN), testTopic, testEvent);
-    verifyZeroInteractions(brokerMock);
+    verifyNoInteractions(brokerMock);
   }
 
   @Test
   public void shouldSkipEventFromHighAvailabilityPluginForwardedThread() {
     brokerForwarder.send(newForwarderTask(HIGH_AVAILABILITY_FORWARDED), testTopic, testEvent);
 
-    verifyZeroInteractions(brokerMock);
+    verifyNoInteractions(brokerMock);
   }
 
   @Test
   public void shouldSkipEventFromHighAvailabilityPluginBatchForwardedThread() {
     brokerForwarder.send(newForwarderTask(HIGH_AVAILABILITY_BATCH_FORWARDED), testTopic, testEvent);
 
-    verifyZeroInteractions(brokerMock);
+    verifyNoInteractions(brokerMock);
   }
 
   private ForwarderTask newForwarderTask(String threadName) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
index 75596ed..6ca1a7a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
@@ -29,10 +29,13 @@
 import com.google.gerrit.extensions.registration.RegistrationHandle;
 import com.google.gerrit.server.cache.CacheRemovalListener;
 import com.google.gerrit.server.events.EventGson;
+import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.project.ProjectCacheImpl;
 import com.google.gson.Gson;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.ExecutorProvider;
 import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.CacheEvictionEventRouter;
@@ -43,7 +46,10 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.eclipse.jgit.lib.Config;
 import org.junit.After;
 import org.junit.Before;
@@ -69,7 +75,7 @@
     @Override
     protected void configure() {
       install(new ForwarderModule());
-      install(new CacheModule());
+      install(new CacheModule(TestForwardingExecutorProvider.class));
       install(new RouterModule());
       install(new IndexModule());
       SharedRefDbConfiguration sharedRefDbConfig =
@@ -78,11 +84,44 @@
     }
   }
 
+  @Singleton
+  public static class TestForwardingExecutorProvider extends ExecutorProvider {
+    private final ScheduledThreadPoolExecutor executor;
+    private final AtomicInteger executionsCounter;
+
+    @Inject
+    protected TestForwardingExecutorProvider(WorkQueue workQueue) {
+      super(workQueue, 1, "test");
+      executionsCounter = new AtomicInteger();
+      executor =
+          new ScheduledThreadPoolExecutor(1) {
+
+            @Override
+            public void execute(Runnable command) {
+              @SuppressWarnings("unused")
+              int ignored = executionsCounter.incrementAndGet();
+              super.execute(command);
+            }
+          };
+    }
+
+    @Override
+    public ScheduledExecutorService get() {
+      return executor;
+    }
+
+    public int executions() {
+      return executionsCounter.get();
+    }
+  }
+
   public static class CacheEvictionsTracker<K, V> implements CacheRemovalListener<K, V> {
     private final Map<String, Set<Object>> trackedEvictions;
     private final CountDownLatch allExpectedEvictionsArrived;
+    private final String trackedCacheName;
 
-    public CacheEvictionsTracker(int numExpectedEvictions) {
+    public CacheEvictionsTracker(String cacheName, int numExpectedEvictions) {
+      this.trackedCacheName = cacheName;
       allExpectedEvictionsArrived = new CountDownLatch(numExpectedEvictions);
       trackedEvictions = Maps.newHashMap();
     }
@@ -99,22 +138,24 @@
     @Override
     public void onRemoval(
         String pluginName, String cacheName, RemovalNotification<K, V> notification) {
-      trackedEvictions.compute(
-          cacheName,
-          (k, v) -> {
-            if (v == null) {
-              return Sets.newHashSet(notification.getKey());
-            }
-            v.add(notification.getKey());
-            return v;
-          });
-      allExpectedEvictionsArrived.countDown();
+      if (cacheName.equals(trackedCacheName)) {
+        trackedEvictions.compute(
+            cacheName,
+            (k, v) -> {
+              if (v == null) {
+                return Sets.newHashSet(notification.getKey());
+              }
+              v.add(notification.getKey());
+              return v;
+            });
+        allExpectedEvictionsArrived.countDown();
+      }
     }
   }
 
   @Before
   public void startTrackingCacheEvictions() {
-    evictionsCacheTracker = new CacheEvictionsTracker<>(1);
+    evictionsCacheTracker = new CacheEvictionsTracker<>(ProjectCacheImpl.CACHE_NAME, 1);
     cacheEvictionRegistrationHandle = cacheRemovalListeners.add("gerrit", evictionsCacheTracker);
   }
 
@@ -135,6 +176,36 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+  @GerritConfig(name = "cache.threads", value = "0")
+  public void shouldNotForwardProjectCacheEvictionsWhenEventIsForwarded() throws Exception {
+    TestForwardingExecutorProvider cacheForwarder =
+        plugin.getSysInjector().getInstance(TestForwardingExecutorProvider.class);
+    Context.setForwardedEvent(true);
+    projectCache.evict(allProjects);
+
+    evictionsCacheTracker.waitForExpectedEvictions();
+    assertThat(evictionsCacheTracker.trackedEvictionsFor(ProjectCacheImpl.CACHE_NAME))
+        .contains(allProjects);
+
+    assertThat(cacheForwarder.executions()).isEqualTo(0);
+  }
+
+  @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+  public void shouldForwardProjectCacheEvictions() throws Exception {
+    TestForwardingExecutorProvider cacheForwarder =
+        plugin.getSysInjector().getInstance(TestForwardingExecutorProvider.class);
+    projectCache.evict(allProjects);
+
+    evictionsCacheTracker.waitForExpectedEvictions();
+    assertThat(evictionsCacheTracker.trackedEvictionsFor(ProjectCacheImpl.CACHE_NAME))
+        .contains(allProjects);
+
+    assertThat(cacheForwarder.executions()).isEqualTo(1);
+  }
+
+  @Test
   @GerritConfig(name = "gerrit.instanceId", value = "instance-id")
   public void shouldEvictProjectCacheWithSlash() throws Exception {
     ProjectInput in = new ProjectInput();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java
index 704401b..73ef353 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java
@@ -14,13 +14,9 @@
 
 package com.googlesource.gerrit.plugins.multisite.forwarder;
 
-import static com.google.common.truth.Truth.assertThat;
-import static com.google.gerrit.testing.GerritJUnit.assertThrows;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.google.gerrit.exceptions.StorageException;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventDispatcher;
@@ -33,7 +29,6 @@
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
 
 @RunWith(MockitoJUnitRunner.class)
 public class ForwardedEventHandlerTest {
@@ -56,45 +51,4 @@
     handler.dispatch(event);
     verify(dispatcherMock).postEvent(event);
   }
-
-  @Test
-  public void shouldSetAndUnsetForwardedContext() throws Exception {
-    Event event = new ProjectCreatedEvent();
-    // this doAnswer is to allow to assert that context is set to forwarded
-    // while cache eviction is called.
-    doAnswer(
-            (Answer<Void>)
-                invocation -> {
-                  assertThat(Context.isForwardedEvent()).isTrue();
-                  return null;
-                })
-        .when(dispatcherMock)
-        .postEvent(event);
-
-    assertThat(Context.isForwardedEvent()).isFalse();
-    handler.dispatch(event);
-    assertThat(Context.isForwardedEvent()).isFalse();
-
-    verify(dispatcherMock).postEvent(event);
-  }
-
-  @Test
-  public void shouldSetAndUnsetForwardedContextEvenIfExceptionIsThrown() throws Exception {
-    Event event = new ProjectCreatedEvent();
-    doAnswer(
-            (Answer<Void>)
-                invocation -> {
-                  assertThat(Context.isForwardedEvent()).isTrue();
-                  throw new StorageException("someMessage");
-                })
-        .when(dispatcherMock)
-        .postEvent(event);
-
-    assertThat(Context.isForwardedEvent()).isFalse();
-    StorageException thrown = assertThrows(StorageException.class, () -> handler.dispatch(event));
-    assertThat(thrown).hasMessageThat().isEqualTo("someMessage");
-    assertThat(Context.isForwardedEvent()).isFalse();
-
-    verify(dispatcherMock).postEvent(event);
-  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
index b827cdf..2c4b157 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
@@ -87,7 +87,7 @@
   public void setUp() throws Exception {
     when(ctxMock.open()).thenReturn(manualRequestContextMock);
     id = Change.id(TEST_CHANGE_NUMBER);
-    change = new Change(null, id, null, null, TimeUtil.nowTs());
+    change = new Change(null, id, null, null, TimeUtil.now());
     when(changeNotes.getChange()).thenReturn(change);
     when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerAbsentMock);
     when(configurationMock.index()).thenReturn(index);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
index 8cf6ea9..7e27eeb 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
@@ -15,23 +15,26 @@
 package com.googlesource.gerrit.plugins.multisite.index;
 
 import static com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationEndpoints.APPLY_OBJECT_API_ENDPOINT;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.lenient;
-import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
-import com.googlesource.gerrit.plugins.multisite.index.IndexEventHandler.IndexProjectTask;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.function.Consumer;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -42,23 +45,34 @@
 public class IndexEventHandlerTest {
 
   private static final String INSTANCE_ID = "instance-id";
+  private static final String PROJECT_NAME = "test_project";
+  private static final int CHANGE_ID = 1;
 
   private IndexEventHandler eventHandler;
 
-  @Mock private ProjectsFilter projectsFilter;
   @Mock private IndexEventForwarder forwarder;
   @Mock private ChangeCheckerImpl.Factory changeChecker;
+  @Mock private ChangeChecker changeCheckerMock;
+  @Mock private RequestContext mockCtx;
+
+  private CurrentRequestContext currCtx =
+      new CurrentRequestContext(null, null, null) {
+        @Override
+        public void onlyWithContext(Consumer<RequestContext> body) {
+          body.accept(mockCtx);
+        }
+      };
 
   @Before
-  public void setUp() {
+  public void setUp() throws IOException {
     eventHandler =
         new IndexEventHandler(
             MoreExecutors.directExecutor(),
             asDynamicSet(forwarder),
             changeChecker,
-            projectsFilter,
             new TestGroupChecker(true),
-            INSTANCE_ID);
+            INSTANCE_ID,
+            currCtx);
   }
 
   private DynamicSet<IndexEventForwarder> asDynamicSet(IndexEventForwarder forwarder) {
@@ -68,41 +82,59 @@
   }
 
   @Test
-  public void shouldNotForwardProjectIndexedIfFilteredOutByProjectName() throws Exception {
-    when(projectsFilter.matches(any(String.class))).thenReturn(false);
-
-    eventHandler.onProjectIndexed("test_project");
-    verify(forwarder, never())
-        .index(any(IndexProjectTask.class), eq(new ProjectIndexEvent("test_project", INSTANCE_ID)));
-  }
-
-  @Test
-  public void shouldNotForwardIndexChangeIfFilteredOutByProjectName() throws Exception {
-    int changeId = 1;
-    when(projectsFilter.matches(any(String.class))).thenReturn(false);
-
-    eventHandler.onChangeIndexed("test_project", changeId);
-    verifyZeroInteractions(changeChecker);
-  }
-
-  @Test
   public void shouldNotForwardIndexChangeIfCurrentThreadIsPullReplicationApplyObject()
       throws Exception {
     String currentThreadName = Thread.currentThread().getName();
     try {
       Thread.currentThread().setName("pull-replication~" + APPLY_OBJECT_API_ENDPOINT);
-      int changeId = 1;
       Context.setForwardedEvent(false);
-      lenient().when(projectsFilter.matches(any(String.class))).thenReturn(true);
       lenient()
           .when(changeChecker.create(anyString()))
           .thenThrow(
               new IllegalStateException("Change indexing event should have not been triggered"));
 
-      eventHandler.onChangeIndexed("test_project", changeId);
+      eventHandler.onChangeIndexed(PROJECT_NAME, CHANGE_ID);
       verifyNoInteractions(changeChecker);
     } finally {
       Thread.currentThread().setName(currentThreadName);
     }
   }
+
+  @Test
+  public void shouldNotForwardIndexChangeWhenContextIsMissingAndForcedIndexingDisabled()
+      throws Exception {
+    eventHandler = createIndexEventHandler(changeChecker, false);
+    eventHandler.onChangeIndexed(PROJECT_NAME, CHANGE_ID);
+    verifyNoInteractions(changeChecker);
+    verifyNoInteractions(forwarder);
+  }
+
+  @Test
+  public void shouldForwardIndexChangeWhenContextIsMissingAndForcedIndexingEnabled()
+      throws Exception {
+    when(changeChecker.create(any())).thenReturn(changeCheckerMock);
+    when(changeCheckerMock.newIndexEvent(PROJECT_NAME, CHANGE_ID, false))
+        .thenReturn(Optional.of(new ChangeIndexEvent(PROJECT_NAME, CHANGE_ID, false, INSTANCE_ID)));
+    eventHandler = createIndexEventHandler(changeChecker, true);
+    eventHandler.onChangeIndexed(PROJECT_NAME, CHANGE_ID);
+    verify(changeCheckerMock).newIndexEvent(PROJECT_NAME, CHANGE_ID, false);
+    verify(forwarder).index(any(), any());
+  }
+
+  private IndexEventHandler createIndexEventHandler(
+      ChangeCheckerImpl.Factory changeChecker, boolean synchronizeForced) {
+    ThreadLocalRequestContext threadLocalCtxMock = mock(ThreadLocalRequestContext.class);
+    OneOffRequestContext oneOffCtxMock = mock(OneOffRequestContext.class);
+    Configuration cfgMock = mock(Configuration.class);
+    Configuration.Index cfgIndex = mock(Configuration.Index.class);
+    when(cfgMock.index()).thenReturn(cfgIndex);
+    when(cfgIndex.synchronizeForced()).thenReturn(synchronizeForced);
+    return new IndexEventHandler(
+        MoreExecutors.directExecutor(),
+        asDynamicSet(forwarder),
+        changeChecker,
+        new TestGroupChecker(true),
+        INSTANCE_ID,
+        new CurrentRequestContext(threadLocalCtxMock, cfgMock, oneOffCtxMock));
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
index d99fb4b..e2e3113 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
@@ -24,29 +24,25 @@
 import static org.mockito.Mockito.atMost;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.when;
 
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
-import com.google.common.base.Suppliers;
 import com.google.gerrit.entities.Project;
-import com.google.gerrit.entities.RefNames;
-import com.google.gerrit.server.data.RefUpdateAttribute;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.extensions.common.AccountInfo;
+import com.google.gerrit.extensions.events.GitBatchRefUpdateListener;
 import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
 import com.google.gerrit.server.project.ProjectConfig;
 import com.google.gerrit.testing.InMemoryRepositoryManager;
 import com.google.gerrit.testing.InMemoryTestEnvironment;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.RefFixture;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
+import java.util.Set;
 import org.eclipse.jgit.errors.LargeObjectException;
 import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
 import org.eclipse.jgit.junit.TestRepository;
@@ -54,7 +50,7 @@
 import org.eclipse.jgit.lib.ObjectLoader;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.revwalk.RevCommit;
-import org.junit.After;
+import org.eclipse.jgit.transport.ReceiveCommand;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -65,24 +61,26 @@
 @RunWith(MockitoJUnitRunner.class)
 public class ProjectVersionRefUpdateTest implements RefFixture {
 
+  private static final int A_TEST_ACCOUNT_ID = 1;
+  private static final GitReferenceUpdated.UpdatedRef A_TEST_UPDATED_REF =
+      new GitReferenceUpdated.UpdatedRef(
+          A_TEST_REF_NAME, ObjectId.zeroId(), ObjectId.zeroId(), ReceiveCommand.Type.UPDATE);
+
   @Rule public InMemoryTestEnvironment testEnvironment = new InMemoryTestEnvironment();
 
-  @Mock RefUpdatedEvent refUpdatedEvent;
+  @Mock GitBatchRefUpdateListener.Event refUpdatedEvent;
   @Mock SharedRefDatabaseWrapper sharedRefDb;
   @Mock GitReferenceUpdated gitReferenceUpdated;
   @Mock ProjectVersionLogger verLogger;
-  @Mock ProjectsFilter projectsFilter;
 
   @Inject private ProjectConfig.Factory projectConfigFactory;
   @Inject private InMemoryRepositoryManager repoManager;
-
   private TestRepository<InMemoryRepository> repo;
   private ProjectConfig project;
   private RevCommit masterCommit;
 
   @Before
   public void setUp() throws Exception {
-    when(projectsFilter.matches(any(Event.class))).thenReturn(true);
     InMemoryRepository inMemoryRepo = repoManager.createRepository(A_TEST_PROJECT_NAME_KEY);
     project = projectConfigFactory.create(A_TEST_PROJECT_NAME_KEY);
     project.load(inMemoryRepo);
@@ -90,14 +88,8 @@
     masterCommit = repo.branch("master").commit().create();
   }
 
-  @After
-  public void tearDown() {
-    Context.unsetForwardedEvent();
-  }
-
   @Test
   public void producerShouldUpdateProjectVersionUponRefUpdatedEvent() throws IOException {
-    Context.setForwardedEvent(false);
     when(sharedRefDb.get(
             A_TEST_PROJECT_NAME_KEY,
             ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_VALUE_REF,
@@ -105,12 +97,11 @@
         .thenReturn(Optional.of("" + (masterCommit.getCommitTime() - 1)));
     when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
         .thenReturn(true);
-    when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
-    when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
+    when(refUpdatedEvent.getProjectName()).thenReturn(A_TEST_PROJECT_NAME);
+    when(refUpdatedEvent.getRefNames()).thenReturn(Set.of(A_TEST_REF_NAME));
 
-    new ProjectVersionRefUpdateImpl(
-            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
-        .onEvent(refUpdatedEvent);
+    new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger)
+        .onGitBatchRefUpdate(refUpdatedEvent);
 
     Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
 
@@ -127,18 +118,47 @@
   }
 
   @Test
+  public void producerShouldUpdateProjectVersionOnceUponMultipleRefUpdatedEvent() {
+    when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
+        .thenReturn(true);
+    when(refUpdatedEvent.getProjectName()).thenReturn(A_TEST_PROJECT_NAME);
+    when(refUpdatedEvent.getRefNames())
+        .thenReturn(Set.of(A_TEST_REF_NAME, A_REF_NAME_OF_A_PATCHSET));
+
+    new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger)
+        .onGitBatchRefUpdate(refUpdatedEvent);
+
+    verify(sharedRefDb, atMost(1))
+        .compareAndPut(any(Project.NameKey.class), any(Ref.class), any(ObjectId.class));
+  }
+
+  @Test
+  public void producerShouldUpdateProjectVersionOnceUponMultipleRefWithOneMetaUpdatedEvent() {
+    when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
+        .thenReturn(true);
+    when(refUpdatedEvent.getProjectName()).thenReturn(A_TEST_PROJECT_NAME);
+    when(refUpdatedEvent.getRefNames())
+        .thenReturn(Set.of(A_TEST_REF_NAME, MULTI_SITE_VERSIONING_REF));
+
+    new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger)
+        .onGitBatchRefUpdate(refUpdatedEvent);
+
+    verify(sharedRefDb, atMost(1))
+        .compareAndPut(any(Project.NameKey.class), any(Ref.class), any(ObjectId.class));
+  }
+
+  @Test
   public void producerShouldUsePutInsteadOfCompareAndPutWhenExtendedGlobalRefDb()
       throws IOException {
     when(sharedRefDb.isSetOperationSupported()).thenReturn(true);
-    RefUpdatedEvent refUpdatedEvent = new RefUpdatedEvent();
-    RefUpdateAttribute refUpdatedAttribute = new RefUpdateAttribute();
-    refUpdatedAttribute.project = A_TEST_PROJECT_NAME_KEY.get();
-    refUpdatedAttribute.refName = A_TEST_REF_NAME;
-    refUpdatedEvent.refUpdate = Suppliers.memoize(() -> refUpdatedAttribute);
+    GitBatchRefUpdateListener.Event refUpdatedEvent =
+        new GitReferenceUpdated.GitBatchRefUpdateEvent(
+            A_TEST_PROJECT_NAME_KEY,
+            Set.of(A_TEST_UPDATED_REF),
+            new AccountInfo(A_TEST_ACCOUNT_ID));
 
-    new ProjectVersionRefUpdateImpl(
-            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
-        .onEvent(refUpdatedEvent);
+    new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger)
+        .onGitBatchRefUpdate(refUpdatedEvent);
 
     Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
 
@@ -157,8 +177,6 @@
 
   @Test
   public void producerShouldUpdateProjectVersionUponForcedPushRefUpdatedEvent() throws Exception {
-    Context.setForwardedEvent(false);
-
     Thread.sleep(1000L);
     RevCommit masterPlusOneCommit = repo.branch("master").commit().create();
 
@@ -171,13 +189,12 @@
         .thenReturn(Optional.of("" + (masterCommit.getCommitTime() - 1)));
     when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
         .thenReturn(true);
-    when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
-    when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
+    when(refUpdatedEvent.getProjectName()).thenReturn(A_TEST_PROJECT_NAME);
+    when(refUpdatedEvent.getRefNames()).thenReturn(Set.of(A_TEST_REF_NAME));
 
     ProjectVersionRefUpdateImpl projectVersion =
-        new ProjectVersionRefUpdateImpl(
-            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter);
-    projectVersion.onEvent(refUpdatedEvent);
+        new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger);
+    projectVersion.onGitBatchRefUpdate(refUpdatedEvent);
 
     Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
 
@@ -200,7 +217,6 @@
   @Test
   public void producerShouldCreateNewProjectVersionWhenMissingUponRefUpdatedEvent()
       throws IOException {
-    Context.setForwardedEvent(false);
     when(sharedRefDb.get(
             A_TEST_PROJECT_NAME_KEY,
             ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_VALUE_REF,
@@ -209,12 +225,11 @@
 
     when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
         .thenReturn(true);
-    when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
-    when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
+    when(refUpdatedEvent.getProjectName()).thenReturn(A_TEST_PROJECT_NAME);
+    when(refUpdatedEvent.getRefNames()).thenReturn(Set.of(A_TEST_REF_NAME));
 
-    new ProjectVersionRefUpdateImpl(
-            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
-        .onEvent(refUpdatedEvent);
+    new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger)
+        .onGitBatchRefUpdate(refUpdatedEvent);
 
     Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
 
@@ -230,55 +245,24 @@
     verify(verLogger).log(A_TEST_PROJECT_NAME_KEY, storedVersion, 0);
   }
 
-  @Test
-  public void producerShouldNotUpdateProjectVersionUponSequenceRefUpdatedEvent() throws Exception {
-    producerShouldNotUpdateProjectVersionUponMagicRefUpdatedEvent(RefNames.REFS_SEQUENCES);
-  }
-
-  @Test
-  public void producerShouldNotUpdateProjectVersionUponStarredChangesRefUpdatedEvent()
-      throws Exception {
-    producerShouldNotUpdateProjectVersionUponMagicRefUpdatedEvent(RefNames.REFS_STARRED_CHANGES);
-  }
-
   private long readLongObject(ObjectLoader loader)
       throws LargeObjectException, UnsupportedEncodingException {
     String boutString = new String(loader.getBytes(), StandardCharsets.UTF_8.name());
     return Long.parseLong(boutString);
   }
 
-  private void producerShouldNotUpdateProjectVersionUponMagicRefUpdatedEvent(String magicRefPrefix)
-      throws Exception {
-    String magicRefName = magicRefPrefix + "/foo";
-    Context.setForwardedEvent(false);
-    when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
-    when(refUpdatedEvent.getRefName()).thenReturn(magicRefName);
-    repo.branch(magicRefName).commit().create();
-
-    new ProjectVersionRefUpdateImpl(
-            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
-        .onEvent(refUpdatedEvent);
-
-    Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
-    assertThat(ref).isNull();
-
-    verifyZeroInteractions(verLogger);
-  }
-
   @Test
   public void shouldNotUpdateProjectVersionWhenProjectDoesntExist() throws IOException {
-    Context.setForwardedEvent(false);
-    when(refUpdatedEvent.getProjectNameKey()).thenReturn(Project.nameKey("aNonExistentProject"));
-    when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
+    when(refUpdatedEvent.getProjectName()).thenReturn("aNonExistentProject");
+    when(refUpdatedEvent.getRefNames()).thenReturn(Set.of(A_TEST_REF_NAME));
 
-    new ProjectVersionRefUpdateImpl(
-            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
-        .onEvent(refUpdatedEvent);
+    new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger)
+        .onGitBatchRefUpdate(refUpdatedEvent);
 
     Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
     assertThat(ref).isNull();
 
-    verifyZeroInteractions(verLogger);
+    verifyNoInteractions(verLogger);
   }
 
   @Test
@@ -287,8 +271,7 @@
         .thenReturn(Optional.of("123"));
 
     Optional<Long> version =
-        new ProjectVersionRefUpdateImpl(
-                repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+        new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger)
             .getProjectRemoteVersion(A_TEST_PROJECT_NAME);
 
     assertThat(version.isPresent()).isTrue();