Merge branch 'stable-3.3' into stable-3.4

* stable-3.3:
  Inject ProjectVersionRefUpdate as Provider<>

Change-Id: I52f184253c02e87358d662526461a67bb0b71b7f
diff --git a/e2e-tests/test.sh b/e2e-tests/test.sh
index 49615e7..567e824 100755
--- a/e2e-tests/test.sh
+++ b/e2e-tests/test.sh
@@ -16,11 +16,11 @@
 
 LOCATION="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 LOCAL_ENV="$( cd "${LOCATION}/../setup_local_env" >/dev/null 2>&1 && pwd )"
-GERRIT_BRANCH=stable-3.3
+GERRIT_BRANCH=stable-3.4
 GERRIT_CI=https://gerrit-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
 LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
 DEF_MULTISITE_LOCATION=${LOCATION}/../../../bazel-bin/plugins/multi-site/multi-site.jar
-DEF_GERRIT_IMAGE=3.3.6-centos8
+DEF_GERRIT_IMAGE=3.4.0-centos8
 DEF_GERRIT_HEALTHCHECK_START_PERIOD=60s
 DEF_GERRIT_HEALTHCHECK_INTERVAL=5s
 DEF_GERRIT_HEALTHCHECK_TIMEOUT=5s
@@ -64,7 +64,7 @@
 
   export BROKER_HOST=$2
   export BROKER_PORT=$3
-  export GROUP_ID=$4
+  export INSTANCE_ID=$4
   export SSH_ADVERTISED_PORT=$5
   export LOCATION_TEST_SITE=/var/gerrit
   export REMOTE_DEBUG_PORT=5005
@@ -228,7 +228,7 @@
   { echo >&2 "$MULTISITE_LIB_LOCATION: Not able to copy the file. Aborting"; exit 1; }
 
 echo "Downloading websession-broker plugin $GERRIT_BRANCH"
-wget $GERRIT_CI/plugin-websession-broker-bazel-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
+wget $GERRIT_CI/plugin-websession-broker-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
   -O $COMMON_PLUGINS/websession-broker.jar || { echo >&2 "Cannot download websession-broker plugin: Check internet connection. Aborting"; exit 1; }
 
 echo "Downloading healthcheck plugin $GERRIT_BRANCH"
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 223778e..b91d0d7 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -9,6 +9,6 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.3.2",
-        sha1 = "d8bcb77047cc12dd7c623b5b4de70a25499d3d6c",
+        artifact = "com.gerritforge:events-broker:3.4.0.4",
+        sha1 = "8d361d863382290e33828116e65698190118d0f1",
     )
diff --git a/setup_local_env/configs/gerrit.config b/setup_local_env/configs/gerrit.config
index 2cd8c9d..8a176f2 100644
--- a/setup_local_env/configs/gerrit.config
+++ b/setup_local_env/configs/gerrit.config
@@ -5,6 +5,7 @@
     installModule = com.gerritforge.gerrit.eventbroker.BrokerApiModule # events-broker module to setup BrokerApi dynamic item
     installModule = com.googlesource.gerrit.plugins.multisite.Module # multi-site needs to be a gerrit lib
     installDbModule = com.googlesource.gerrit.plugins.multisite.GitModule
+    instanceId = $INSTANCE_ID
 [database]
     type = h2
     database = $LOCATION_TEST_SITE/db/ReviewDB
@@ -43,7 +44,7 @@
 [plugin "events-kafka"]
     sendAsync = true
     bootstrapServers = $BROKER_HOST:$BROKER_PORT
-    groupId = $GROUP_ID
+    groupId = $INSTANCE_ID
     numberOfSubscribers = 6
     securityProtocol = PLAINTEXT
     pollingIntervalMs = 1000
@@ -55,12 +56,12 @@
     pollingIntervalMs = 1000
     region = us-east-1
     endpoint = http://localhost:$BROKER_PORT
-    applicationName = $GROUP_ID
+    applicationName = $INSTANCE_ID
     initialPosition = trim_horizon
 [plugin "events-gcloud-pubsub"]
     numberOfSubscribers = 6
     gcloudProject="test-project"
-    subscriptionId=$GROUP_ID
+    subscriptionId=$INSTANCE_ID
     privateKeyLocation="not used in local mode"
 [plugin "metrics-reporter-prometheus"]
     prometheusBearerToken = token
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index 5342fbf..4fee129 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -16,7 +16,7 @@
 
 
 SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
-GERRIT_BRANCH=stable-3.3
+GERRIT_BRANCH=stable-3.4
 GERRIT_CI=https://gerrit-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
 LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
 EVENTS_BROKER_VER=`grep 'com.gerritforge:events-broker' $(dirname $0)/../external_plugin_deps.bzl | cut -d '"' -f 2 | cut -d ':' -f 3`
@@ -70,7 +70,7 @@
     export GERRIT_HOSTNAME=$7
     export REPLICATION_HOSTNAME=$8
     export REMOTE_DEBUG_PORT=$9
-    export GROUP_ID=${10}
+    export INSTANCE_ID=${10}
     export REPLICATION_URL=$(get_replication_url $REPLICATION_LOCATION_TEST_SITE $REPLICATION_HOSTNAME)
 
     echo "Replacing variables for file $file and copying to $CONFIG_TEST_SITE/$file_name"
@@ -126,21 +126,21 @@
   GERRIT_SITE1_SSHD_PORT=$3
   CONFIG_TEST_SITE_1=$LOCATION_TEST_SITE_1/etc
   GERRIT_SITE1_REMOTE_DEBUG_PORT="5005"
-  GERRIT_SITE1_GROUP_ID="instance-1"
+  GERRIT_SITE1_INSTANCE_ID="instance-1"
   # SITE 2
   GERRIT_SITE2_HOSTNAME=$4
   GERRIT_SITE2_HTTPD_PORT=$5
   GERRIT_SITE2_SSHD_PORT=$6
   CONFIG_TEST_SITE_2=$LOCATION_TEST_SITE_2/etc
   GERRIT_SITE2_REMOTE_DEBUG_PORT="5006"
-  GERRIT_SITE2_GROUP_ID="instance-2"
+  GERRIT_SITE2_INSTANCE_ID="instance-2"
 
   # Set config SITE1
-  copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_GROUP_ID
+  copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_INSTANCE_ID
 
 
   # Set config SITE2
-  copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_GROUP_ID
+  copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_INSTANCE_ID
 }
 
 function is_docker_desktop {
@@ -415,7 +415,7 @@
 fi
 if [ $DOWNLOAD_WEBSESSION_PLUGIN = "true" ];then
   echo "Downloading websession-broker plugin $GERRIT_BRANCH"
-  wget $GERRIT_CI/plugin-websession-broker-bazel-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
+  wget $GERRIT_CI/plugin-websession-broker-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
   -O $DEPLOYMENT_LOCATION/websession-broker.jar || { echo >&2 "Cannot download websession-broker plugin: Check internet connection. Abort\
 ing"; exit 1; }
   wget $GERRIT_CI/plugin-healthcheck-bazel-$GERRIT_BRANCH/$LAST_BUILD/healthcheck/healthcheck.jar \
@@ -449,7 +449,7 @@
 
 if [ "$BROKER_TYPE" = "kinesis" ]; then
 echo "Downloading events-aws-kinesis plugin $GERRIT_BRANCH"
-  wget $GERRIT_CI/plugin-events-aws-kinesis-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-aws-kinesis/events-aws-kinesis.jar \
+  wget $GERRIT_CI/plugin-events-aws-kinesis-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-aws-kinesis/events-aws-kinesis.jar \
   -O $DEPLOYMENT_LOCATION/events-aws-kinesis.jar || { echo >&2 "Cannot download events-aws-kinesis plugin: Check internet connection. Abort\
 ing"; exit 1; }
 fi
@@ -457,7 +457,7 @@
 
 if [ "$BROKER_TYPE" = "gcloud-pubsub" ]; then
 echo "Downloading events-gcloud-pubsub plugin $GERRIT_BRANCH"
-  wget $GERRIT_CI/plugin-events-gcloud-pubsub-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-gcloud-pubsub/events-gcloud-pubsub.jar \
+  wget $GERRIT_CI/plugin-events-gcloud-pubsub-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-gcloud-pubsub/events-gcloud-pubsub.jar \
   -O $DEPLOYMENT_LOCATION/events-gcloud-pubsub.jar || { echo >&2 "Cannot download events-gcloud-pubsub plugin: Check internet connection. Abort\
 ing"; exit 1; }
 fi
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
index 7c88655..95c417f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
@@ -14,9 +14,9 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gerrit.server.util.PluginLogFile;
 import com.google.gerrit.server.util.SystemLog;
 import com.google.gson.Gson;
@@ -41,7 +41,7 @@
   }
 
   @Override
-  public void log(Direction direction, String topic, EventMessage event) {
+  public void log(Direction direction, String topic, Event event) {
     msgLog.info("{} {} {}", direction, topic, gson.toJson(event));
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jProjectVersionLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jProjectVersionLogger.java
index c2c4b46..23c720a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jProjectVersionLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jProjectVersionLogger.java
@@ -45,4 +45,9 @@
       verLog.info("{ \"project\":\"{}\", \"version\":{} }", projectName, currentVersion);
     }
   }
+
+  @Override
+  public void logDeleted(Project.NameKey projectName) {
+    verLog.info("{ \"project\":\"{}\", \"status\":\"DELETED\" }", projectName);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
index b1f3e79..cc64b02 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
 
 public interface MessageLogger {
 
@@ -23,5 +23,5 @@
     CONSUME;
   }
 
-  public void log(Direction direction, String topic, EventMessage event);
+  public void log(Direction direction, String topic, Event event);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index f44f4f7..ad53449 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -16,30 +16,18 @@
 
 import com.gerritforge.gerrit.globalrefdb.validation.LibModule;
 import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.CreationException;
 import com.google.inject.Inject;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
 import com.google.inject.spi.Message;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
 import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
 import com.googlesource.gerrit.plugins.multisite.event.EventModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
 import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.Collection;
-import java.util.UUID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class Module extends LifecycleModule {
-  private static final Logger log = LoggerFactory.getLogger(Module.class);
   private Configuration config;
 
   @Inject
@@ -72,57 +60,8 @@
       install(new IndexModule());
     }
 
+    install(new BrokerModule());
+
     install(new RouterModule());
   }
-
-  @Provides
-  @Singleton
-  @InstanceId
-  UUID getInstanceId(SitePaths sitePaths) throws IOException {
-    UUID instanceId = null;
-    Path dataDir = sitePaths.data_dir.resolve(Configuration.PLUGIN_NAME);
-    if (!dataDir.toFile().exists()) {
-      dataDir.toFile().mkdirs();
-    }
-    String serverIdFile =
-        dataDir.toAbsolutePath().toString() + "/" + Configuration.INSTANCE_ID_FILE;
-
-    instanceId = tryToLoadSavedInstanceId(serverIdFile);
-
-    if (instanceId == null) {
-      instanceId = UUID.randomUUID();
-      Files.createFile(Paths.get(serverIdFile));
-      try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(serverIdFile))) {
-        writer.write(instanceId.toString());
-      } catch (IOException e) {
-        log.warn(
-            String.format(
-                "Cannot write instance ID, a new one will be generated at instance restart. (%s)",
-                e.getMessage()));
-      }
-    }
-    return instanceId;
-  }
-
-  private UUID tryToLoadSavedInstanceId(String serverIdFile) {
-    if (Files.exists(Paths.get(serverIdFile))) {
-      try (BufferedReader br = Files.newBufferedReader(Paths.get(serverIdFile))) {
-        return UUID.fromString(br.readLine());
-      } catch (IOException e) {
-        log.warn(
-            String.format(
-                "Cannot read instance ID from path '%s', deleting the old file and generating a new ID: (%s)",
-                serverIdFile, e.getMessage()));
-        try {
-          Files.delete(Paths.get(serverIdFile));
-        } catch (IOException e1) {
-          log.warn(
-              String.format(
-                  "Cannot delete old instance ID file at path '%s' with instance ID while generating a new one: (%s)",
-                  serverIdFile, e1.getMessage()));
-        }
-      }
-    }
-    return null;
-  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectVersionLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectVersionLogger.java
index 6ababb6..2ee2c13 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectVersionLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectVersionLogger.java
@@ -19,4 +19,6 @@
 public interface ProjectVersionLogger {
 
   public void log(Project.NameKey projectName, long currentVersion, long replicationLag);
+
+  public void logDeleted(Project.NameKey projectName);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index 71be5e6..f58efa7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -15,62 +15,101 @@
 package com.googlesource.gerrit.plugins.multisite.broker;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import java.util.Set;
-import java.util.UUID;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class BrokerApiWrapper implements BrokerApi {
+  private static final Logger log = LoggerFactory.getLogger(BrokerApiWrapper.class);
+  private final Executor executor;
   private final DynamicItem<BrokerApi> apiDelegate;
   private final BrokerMetrics metrics;
   private final MessageLogger msgLog;
-  private final UUID instanceId;
 
   @Inject
   public BrokerApiWrapper(
+      @BrokerExecutor Executor executor,
       DynamicItem<BrokerApi> apiDelegate,
       BrokerMetrics metrics,
-      MessageLogger msgLog,
-      @InstanceId UUID instanceId) {
+      MessageLogger msgLog) {
     this.apiDelegate = apiDelegate;
+    this.executor = executor;
     this.metrics = metrics;
     this.msgLog = msgLog;
-    this.instanceId = instanceId;
   }
 
-  public boolean send(String topic, Event event) {
-    return send(topic, apiDelegate.get().newMessage(instanceId, event));
-  }
-
-  @Override
-  public boolean send(String topic, EventMessage message) {
-    if (Context.isForwardedEvent()) {
-      return true;
-    }
-    boolean succeeded = false;
+  public boolean sendSync(String topic, Event event) {
     try {
-      succeeded = apiDelegate.get().send(topic, message);
-    } finally {
-      if (succeeded) {
-        msgLog.log(Direction.PUBLISH, topic, message);
-        metrics.incrementBrokerPublishedMessage();
-      } else {
-        metrics.incrementBrokerFailedToPublishMessage();
-      }
+      return send(topic, event).get();
+    } catch (Throwable e) {
+      log.error(
+          "Failed to publish event '{}' to topic '{}' - error: {} - stack trace: {}",
+          event,
+          topic,
+          e.getMessage(),
+          e.getStackTrace());
+      metrics.incrementBrokerFailedToPublishMessage();
+      return false;
     }
-    return succeeded;
   }
 
   @Override
-  public void receiveAsync(String topic, Consumer<EventMessage> messageConsumer) {
+  public ListenableFuture<Boolean> send(String topic, Event message) {
+    SettableFuture<Boolean> resultFuture = SettableFuture.create();
+    if (Context.isForwardedEvent()) {
+      resultFuture.set(true);
+      return resultFuture;
+    }
+
+    if (Strings.isNullOrEmpty(message.instanceId)) {
+      log.warn(
+          "Dropping event '{}' because event instance id cannot be null or empty",
+          message.toString());
+      resultFuture.set(true);
+      return resultFuture;
+    }
+
+    ListenableFuture<Boolean> resfultF = apiDelegate.get().send(topic, message);
+    Futures.addCallback(
+        resfultF,
+        new FutureCallback<Boolean>() {
+          @Override
+          public void onSuccess(Boolean result) {
+            msgLog.log(Direction.PUBLISH, topic, message);
+            metrics.incrementBrokerPublishedMessage();
+          }
+
+          @Override
+          public void onFailure(Throwable throwable) {
+            log.error(
+                "Failed to publish message '{}' to topic '{}' - error: {}",
+                message.toString(),
+                topic,
+                throwable.getMessage());
+            metrics.incrementBrokerFailedToPublishMessage();
+          }
+        },
+        executor);
+
+    return resfultF;
+  }
+
+  @Override
+  public void receiveAsync(String topic, Consumer<Event> messageConsumer) {
     apiDelegate.get().receiveAsync(topic, messageConsumer);
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/InstanceId.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutor.java
similarity index 83%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/InstanceId.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutor.java
index 87306a2..aa24eb1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/InstanceId.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutor.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2019 The Android Open Source Project
+// Copyright (C) 2021 The Android Open Source Project
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite;
+package com.googlesource.gerrit.plugins.multisite.broker;
 
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 
@@ -21,4 +21,4 @@
 
 @Retention(RUNTIME)
 @BindingAnnotation
-public @interface InstanceId {}
+@interface BrokerExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutorProvider.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutorProvider.java
new file mode 100644
index 0000000..e843263
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutorProvider.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.ExecutorProvider;
+
+@Singleton
+class BrokerExecutorProvider extends ExecutorProvider {
+
+  @Inject
+  BrokerExecutorProvider(WorkQueue workQueue) {
+    super(workQueue, 1, "Multi-Site-Broker");
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
new file mode 100644
index 0000000..a5dac4a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -0,0 +1,28 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.lifecycle.LifecycleModule;
+import java.util.concurrent.Executor;
+
+public class BrokerModule extends LifecycleModule {
+
+  @Override
+  protected void configure() {
+    bind(Executor.class)
+        .annotatedWith(BrokerExecutor.class)
+        .toProvider(BrokerExecutorProvider.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
index 2990264..e418da5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
@@ -17,6 +17,7 @@
 import com.google.common.cache.RemovalNotification;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.cache.CacheRemovalListener;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
@@ -28,21 +29,25 @@
   private final Executor executor;
   private final DynamicSet<CacheEvictionForwarder> forwarders;
   private final CachePatternMatcher matcher;
+  private final String instanceId;
 
   @Inject
   CacheEvictionHandler(
       DynamicSet<CacheEvictionForwarder> forwarders,
       @CacheExecutor Executor executor,
-      CachePatternMatcher matcher) {
+      CachePatternMatcher matcher,
+      @GerritInstanceId String instanceId) {
     this.forwarders = forwarders;
     this.executor = executor;
     this.matcher = matcher;
+    this.instanceId = instanceId;
   }
 
   @Override
   public void onRemoval(String plugin, String cache, RemovalNotification<K, V> notification) {
     if (!Context.isForwardedEvent() && !notification.wasEvicted() && matcher.matches(cache)) {
-      executor.execute(new CacheEvictionTask(new CacheEvictionEvent(cache, notification.getKey())));
+      executor.execute(
+          new CacheEvictionTask(new CacheEvictionEvent(cache, notification.getKey(), instanceId)));
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CachePatternMatcher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CachePatternMatcher.java
index b8521a3..2dcb09a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CachePatternMatcher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CachePatternMatcher.java
@@ -25,7 +25,7 @@
 
 @Singleton
 class CachePatternMatcher {
-  private static final List<String> DEFAULT_PATTERNS =
+  private static final ImmutableList<String> DEFAULT_PATTERNS =
       ImmutableList.of("^groups.*", "ldap_groups", "ldap_usernames", "projects", "sshkeys");
 
   private final Pattern pattern;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
index fdc6fc3..44f8417 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
@@ -19,6 +19,7 @@
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.events.ProjectEvent;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
@@ -33,15 +34,18 @@
   private final DynamicSet<ProjectListUpdateForwarder> forwarders;
   private final Executor executor;
   private final ProjectsFilter projectsFilter;
+  private final String instanceId;
 
   @Inject
   public ProjectListUpdateHandler(
       DynamicSet<ProjectListUpdateForwarder> forwarders,
       @CacheExecutor Executor executor,
-      ProjectsFilter filter) {
+      ProjectsFilter filter,
+      @GerritInstanceId String instanceId) {
     this.forwarders = forwarders;
     this.executor = executor;
     this.projectsFilter = filter;
+    this.instanceId = instanceId;
   }
 
   @Override
@@ -59,7 +63,8 @@
   private void process(ProjectEvent event, boolean delete) {
     if (!Context.isForwardedEvent() && projectsFilter.matches(event.getProjectName())) {
       executor.execute(
-          new ProjectListUpdateTask(new ProjectListUpdateEvent(event.getProjectName(), delete)));
+          new ProjectListUpdateTask(
+              new ProjectListUpdateEvent(event.getProjectName(), delete, instanceId)));
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index b37f434..7107b87 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -14,20 +14,19 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.base.Strings;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
 import java.io.IOException;
-import java.util.UUID;
 import java.util.function.Consumer;
 
 public abstract class AbstractSubcriber {
@@ -44,13 +43,13 @@
   public AbstractSubcriber(
       ForwardedEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String gerritInstanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
       Configuration cfg) {
     this.eventRouter = eventRouter;
     this.droppedEventListeners = droppedEventListeners;
-    this.instanceId = instanceId.toString();
+    this.instanceId = gerritInstanceId;
     this.msgLog = msgLog;
     this.subscriberMetrics = subscriberMetrics;
     this.cfg = cfg;
@@ -59,19 +58,22 @@
 
   protected abstract EventTopic getTopic();
 
-  public Consumer<EventMessage> getConsumer() {
+  protected abstract Boolean shouldConsumeEvent(Event event);
+
+  public Consumer<Event> getConsumer() {
     return this::processRecord;
   }
 
-  private void processRecord(EventMessage event) {
-    String sourceInstanceId = event.getHeader().sourceInstanceId;
+  private void processRecord(Event event) {
+    String sourceInstanceId = event.instanceId;
 
-    if (Strings.isNullOrEmpty(sourceInstanceId) || instanceId.equals(sourceInstanceId)) {
+    if ((Strings.isNullOrEmpty(sourceInstanceId) || instanceId.equals(sourceInstanceId))
+        || !shouldConsumeEvent(event)) {
       if (Strings.isNullOrEmpty(sourceInstanceId)) {
         logger.atWarning().log(
             String.format(
                 "Dropping event %s because sourceInstanceId cannot be null", event.toString()));
-      } else {
+      } else if (instanceId.equals(sourceInstanceId)) {
         logger.atFiner().log(
             String.format(
                 "Dropping event %s produced by our instanceId %s", event.toString(), instanceId));
@@ -80,16 +82,16 @@
     } else {
       try {
         msgLog.log(Direction.CONSUME, topic, event);
-        eventRouter.route(event.getEvent());
+        eventRouter.route(event);
         subscriberMetrics.incrementSubscriberConsumedMessage();
-        subscriberMetrics.updateReplicationStatusMetrics(event);
       } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Malformed event '%s'", event.getHeader());
+        logger.atSevere().withCause(e).log("Malformed event '%s'", event);
         subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
       } catch (PermissionBackendException | CacheNotFoundException e) {
-        logger.atSevere().withCause(e).log("Cannot handle message '%s'", event.getHeader());
+        logger.atSevere().withCause(e).log("Cannot handle message '%s'", event);
         subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
       }
     }
+    subscriberMetrics.updateReplicationStatusMetrics(event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
index 5bbaee0..cdbf220 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
@@ -14,31 +14,49 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
-import java.util.UUID;
 
 @Singleton
 public class BatchIndexEventSubscriber extends AbstractSubcriber {
+  private final ProjectsFilter projectsFilter;
+
   @Inject
   public BatchIndexEventSubscriber(
       IndexEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
-      Configuration cfg) {
+      Configuration cfg,
+      ProjectsFilter projectsFilter) {
     super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+    this.projectsFilter = projectsFilter;
   }
 
   @Override
   protected EventTopic getTopic() {
     return EventTopic.BATCH_INDEX_TOPIC;
   }
+
+  @Override
+  protected Boolean shouldConsumeEvent(Event event) {
+    if (event instanceof ChangeIndexEvent) {
+      return projectsFilter.matches(((ChangeIndexEvent) event).projectName);
+    }
+    if (event instanceof ProjectIndexEvent) {
+      return projectsFilter.matches(((ProjectIndexEvent) event).projectName);
+    }
+    return true;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index eae66b4..d8342b0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -15,14 +15,14 @@
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.CacheEvictionEventRouter;
-import java.util.UUID;
 
 @Singleton
 public class CacheEvictionEventSubscriber extends AbstractSubcriber {
@@ -30,7 +30,7 @@
   public CacheEvictionEventSubscriber(
       CacheEvictionEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
       Configuration cfg) {
@@ -42,4 +42,9 @@
   protected EventTopic getTopic() {
     return EventTopic.CACHE_TOPIC;
   }
+
+  @Override
+  protected Boolean shouldConsumeEvent(Event event) {
+    return true;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
index 6f4680c..b34e02c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
 
 public interface DroppedEventListener {
   /**
@@ -22,5 +22,5 @@
    *
    * @param event information about the event.
    */
-  void onEventDropped(EventMessage event);
+  void onEventDropped(Event event);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index 49d470a..480fc50 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -14,31 +14,68 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
+import com.google.gerrit.entities.Change;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.change.ChangeFinder;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
-import java.util.UUID;
+import java.util.Optional;
 
 @Singleton
 public class IndexEventSubscriber extends AbstractSubcriber {
+  private final ProjectsFilter projectsFilter;
+  private final ChangeFinder changeFinder;
+
   @Inject
   public IndexEventSubscriber(
       IndexEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
-      Configuration cfg) {
+      Configuration cfg,
+      ProjectsFilter projectsFilter,
+      ChangeFinder changeFinder) {
     super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+    this.projectsFilter = projectsFilter;
+    this.changeFinder = changeFinder;
   }
 
   @Override
   protected EventTopic getTopic() {
     return EventTopic.INDEX_TOPIC;
   }
+
+  @Override
+  protected Boolean shouldConsumeEvent(Event event) {
+    if (event instanceof ChangeIndexEvent) {
+      ChangeIndexEvent changeIndexEvent = (ChangeIndexEvent) event;
+      String projectName = changeIndexEvent.projectName;
+      if (isDeletedChangeWithEmptyProject(changeIndexEvent)) {
+        projectName = findProjectFromChangeId(changeIndexEvent.changeId).orElse(projectName);
+      }
+      return projectsFilter.matches(projectName);
+    }
+    if (event instanceof ProjectIndexEvent) {
+      return projectsFilter.matches(((ProjectIndexEvent) event).projectName);
+    }
+    return true;
+  }
+
+  private boolean isDeletedChangeWithEmptyProject(ChangeIndexEvent changeIndexEvent) {
+    return changeIndexEvent.deleted && changeIndexEvent.projectName.isEmpty();
+  }
+
+  private Optional<String> findProjectFromChangeId(int changeId) {
+    return changeFinder.findOne(Change.id(changeId)).map(c -> c.getChange().getProject().get());
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index 6ff0969..069a516 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -14,31 +14,42 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
-import java.util.UUID;
 
 @Singleton
 public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
+  private final ProjectsFilter projectsFilter;
+
   @Inject
   public ProjectUpdateEventSubscriber(
       ProjectListUpdateRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
-      Configuration cfg) {
+      Configuration cfg,
+      ProjectsFilter projectsFilter) {
     super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+    this.projectsFilter = projectsFilter;
   }
 
   @Override
   protected EventTopic getTopic() {
     return EventTopic.PROJECT_LIST_TOPIC;
   }
+
+  @Override
+  protected Boolean shouldConsumeEvent(Event event) {
+    return projectsFilter.matches(((ProjectListUpdateEvent) event).projectName);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
index c45dfa7..40168c5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
@@ -19,6 +19,7 @@
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.server.cache.CacheModule;
 import com.google.gerrit.server.cache.serialize.JavaCacheSerializer;
 import com.google.gerrit.server.cache.serialize.StringCacheSerializer;
@@ -40,7 +41,7 @@
 import java.util.stream.Collectors;
 
 @Singleton
-public class ReplicationStatus implements LifecycleListener {
+public class ReplicationStatus implements LifecycleListener, ProjectDeletedListener {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   private final Map<String, Long> replicationStatusPerProject = new HashMap<>();
@@ -120,12 +121,35 @@
     }
   }
 
+  void removeProjectFromReplicationLagMetrics(Project.NameKey projectName) {
+    Optional<Long> localVersion =
+        projectVersionRefUpdate.get().getProjectLocalVersion(projectName.get());
+
+    if (!localVersion.isPresent() && replicationStatusPerProject.containsKey(projectName.get())) {
+      cache.invalidate(projectName.get());
+      replicationStatusPerProject.remove(projectName.get());
+      localVersionPerProject.remove(projectName.get());
+      verLogger.logDeleted(projectName);
+      logger.atFine().log("Removed project '%s' from replication lag metrics", projectName);
+    }
+  }
+
   @VisibleForTesting
   public void doUpdateLag(Project.NameKey projectName, Long lag) {
     cache.put(projectName.get(), lag);
     replicationStatusPerProject.put(projectName.get(), lag);
   }
 
+  @VisibleForTesting
+  Long getReplicationStatus(String projectName) {
+    return replicationStatusPerProject.get(projectName);
+  }
+
+  @VisibleForTesting
+  Long getLocalVersion(String projectName) {
+    return localVersionPerProject.get(projectName);
+  }
+
   @Override
   public void start() {
     loadAllFromCache();
@@ -139,4 +163,9 @@
         projectCache.all().stream().map(Project.NameKey::get).collect(Collectors.toSet());
     replicationStatusPerProject.putAll(cache.getAllPresent(cachedProjects));
   }
+
+  @Override
+  public void onProjectDeleted(Event event) {
+    removeProjectFromReplicationLagMetrics(Project.nameKey(event.getProjectName()));
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
index e954bc4..791517f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.Scopes;
 
@@ -23,5 +25,6 @@
     bind(ReplicationStatus.class).in(Scopes.SINGLETON);
     install(ReplicationStatus.cacheModule());
     listener().to(ReplicationStatus.class);
+    DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ReplicationStatus.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index 20c355e..ab64651 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -14,31 +14,45 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectEvent;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
-import java.util.UUID;
 
 @Singleton
 public class StreamEventSubscriber extends AbstractSubcriber {
+  private final ProjectsFilter projectsFilter;
+
   @Inject
   public StreamEventSubscriber(
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
-      Configuration cfg) {
+      Configuration cfg,
+      ProjectsFilter projectsFilter) {
     super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+    this.projectsFilter = projectsFilter;
   }
 
   @Override
   protected EventTopic getTopic() {
     return EventTopic.STREAM_EVENT_TOPIC;
   }
+
+  @Override
+  protected Boolean shouldConsumeEvent(Event event) {
+    if (event instanceof ProjectEvent) {
+      return projectsFilter.matches(((ProjectEvent) event).getProjectNameKey().get());
+    }
+    return true;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
index 899233e..cce74f9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
@@ -14,19 +14,20 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.metrics.Counter1;
 import com.google.gerrit.metrics.Description;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectEvent;
 import com.google.gerrit.server.events.RefUpdatedEvent;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.MultiSiteMetrics;
-import com.googlesource.gerrit.plugins.replication.RefReplicatedEvent;
-import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
-import com.googlesource.gerrit.plugins.replication.ReplicationScheduledEvent;
+import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationSucceededEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
 
 @Singleton
 public class SubscriberMetrics extends MultiSiteMetrics {
@@ -74,20 +75,18 @@
     subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
   }
 
-  public void updateReplicationStatusMetrics(EventMessage eventMessage) {
-    Event event = eventMessage.getEvent();
-    if (event instanceof RefReplicationDoneEvent) {
-      RefReplicationDoneEvent replicationDone = (RefReplicationDoneEvent) event;
-      replicationStatus.updateReplicationLag(replicationDone.getProjectNameKey());
-    } else if (event instanceof RefReplicatedEvent) {
-      RefReplicatedEvent replicated = (RefReplicatedEvent) event;
-      replicationStatus.updateReplicationLag(replicated.getProjectNameKey());
-    } else if (event instanceof ReplicationScheduledEvent) {
-      ReplicationScheduledEvent updated = (ReplicationScheduledEvent) event;
-      replicationStatus.updateReplicationLag(updated.getProjectNameKey());
-    } else if (event instanceof RefUpdatedEvent) {
-      RefUpdatedEvent updated = (RefUpdatedEvent) event;
-      replicationStatus.updateReplicationLag(updated.getProjectNameKey());
+  public void updateReplicationStatusMetrics(Event event) {
+
+    if (event instanceof RefReplicationDoneEvent
+        || event instanceof RefReplicatedEvent
+        || event instanceof ReplicationScheduledEvent
+        || event instanceof RefUpdatedEvent) {
+      ProjectEvent projectEvent = (ProjectEvent) event;
+      replicationStatus.updateReplicationLag(projectEvent.getProjectNameKey());
+    } else if (event instanceof ProjectDeletionReplicationSucceededEvent) {
+      ProjectDeletionReplicationSucceededEvent projectDeletion =
+          (ProjectDeletionReplicationSucceededEvent) event;
+      replicationStatus.removeProjectFromReplicationLagMetrics(projectDeletion.getProjectNameKey());
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
index 975916a..19936ef 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
@@ -49,6 +49,6 @@
       return true;
     }
 
-    return broker.send(eventTopic.topic(cfg), event);
+    return broker.sendSync(eventTopic.topic(cfg), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
index 9ff4688..8a020fc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
@@ -35,6 +35,6 @@
 
   @Override
   public boolean send(Event event) {
-    return broker.send(EventTopic.STREAM_EVENT_TOPIC.topic(cfg), event);
+    return broker.sendSync(EventTopic.STREAM_EVENT_TOPIC.topic(cfg), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java
index 317eb76..065ef7e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java
@@ -21,8 +21,8 @@
 
   public int accountId;
 
-  public AccountIndexEvent(int accountId) {
-    super(TYPE);
+  public AccountIndexEvent(int accountId, String instanceId) {
+    super(TYPE, instanceId);
     this.accountId = accountId;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java
index 1c2185f..4444756 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java
@@ -22,8 +22,8 @@
   public String cacheName;
   public Object key;
 
-  public CacheEvictionEvent(String cacheName, Object key) {
-    super(TYPE);
+  public CacheEvictionEvent(String cacheName, Object key, String instanceId) {
+    super(TYPE, instanceId);
     this.cacheName = cacheName;
     this.key = key;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java
index ab4ddf4..64fbdfb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java
@@ -28,8 +28,8 @@
   public String targetSha;
   public boolean deleted;
 
-  public ChangeIndexEvent(String projectName, int changeId, boolean deleted) {
-    super(TYPE);
+  public ChangeIndexEvent(String projectName, int changeId, boolean deleted, String instanceId) {
+    super(TYPE, instanceId);
     this.projectName = projectName;
     this.changeId = changeId;
     this.deleted = deleted;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java
index 4981b2f..05ac198 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java
@@ -24,8 +24,8 @@
   public final String groupUUID;
   public final ObjectId sha1;
 
-  public GroupIndexEvent(String groupUUID, @Nullable ObjectId sha1) {
-    super(TYPE);
+  public GroupIndexEvent(String groupUUID, @Nullable ObjectId sha1, String instanceId) {
+    super(TYPE, instanceId);
     this.groupUUID = groupUUID;
     this.sha1 = sha1;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java
index ea2c3fb..2fdda72 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.multisite.forwarder.events;
 
 public abstract class IndexEvent extends MultiSiteEvent {
-  protected IndexEvent(String type) {
-    super(type);
+  protected IndexEvent(String type, String instanceId) {
+    super(type, instanceId);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java
index 404d168..f29204b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java
@@ -29,7 +29,8 @@
     register(ProjectListUpdateEvent.TYPE, ProjectListUpdateEvent.class);
   }
 
-  protected MultiSiteEvent(String type) {
+  protected MultiSiteEvent(String type, String instanceId) {
     super(type);
+    this.instanceId = instanceId;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java
index 8bdb7b5..954befb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java
@@ -21,8 +21,8 @@
 
   public String projectName;
 
-  public ProjectIndexEvent(String projectName) {
-    super(TYPE);
+  public ProjectIndexEvent(String projectName, String instanceId) {
+    super(TYPE, instanceId);
     this.projectName = projectName;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java
index d030e5b..0e18b27 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java
@@ -22,8 +22,8 @@
   public String projectName;
   public boolean remove;
 
-  public ProjectListUpdateEvent(String projectName, boolean remove) {
-    super(TYPE);
+  public ProjectListUpdateEvent(String projectName, boolean remove, String instanceId) {
+    super(TYPE, instanceId);
     this.projectName = projectName;
     this.remove = remove;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
index 202fb42..2cf83cd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
@@ -32,7 +32,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
-import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
 import java.io.IOException;
 import java.util.Optional;
 import java.util.Set;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
index 4ef3426..95a3e66 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
@@ -18,7 +18,7 @@
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
-import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
 import java.io.IOException;
 
 public class StreamEventRouter implements ForwardedEventRouter<Event> {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
index 08b26f7..983a07b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
@@ -19,6 +19,7 @@
 import com.google.gerrit.exceptions.StorageException;
 import com.google.gerrit.server.CommentsUtil;
 import com.google.gerrit.server.change.ChangeFinder;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.notedb.ChangeNotes;
 import com.google.gerrit.server.util.ManualRequestContext;
@@ -42,6 +43,7 @@
   private final OneOffRequestContext oneOffReqCtx;
   private final String changeId;
   private final ChangeFinder changeFinder;
+  private final String instanceId;
   private Optional<Long> computedChangeTs = Optional.empty();
   private Optional<ChangeNotes> changeNotes = Optional.empty();
 
@@ -55,12 +57,14 @@
       CommentsUtil commentsUtil,
       ChangeFinder changeFinder,
       OneOffRequestContext oneOffReqCtx,
+      @GerritInstanceId String instanceId,
       @Assisted String changeId) {
     this.changeFinder = changeFinder;
     this.gitRepoMgr = gitRepoMgr;
     this.commentsUtil = commentsUtil;
     this.oneOffReqCtx = oneOffReqCtx;
     this.changeId = changeId;
+    this.instanceId = instanceId;
   }
 
   @Override
@@ -69,7 +73,8 @@
     return getComputedChangeTs()
         .map(
             ts -> {
-              ChangeIndexEvent event = new ChangeIndexEvent(projectName, changeId, deleted);
+              ChangeIndexEvent event =
+                  new ChangeIndexEvent(projectName, changeId, deleted, instanceId);
               event.eventCreatedOn = ts;
               event.targetSha = getBranchTargetSha();
               return event;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index eef3e4b..02f1b1c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -21,6 +21,7 @@
 import com.google.gerrit.extensions.events.GroupIndexedListener;
 import com.google.gerrit.extensions.events.ProjectIndexedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
@@ -48,6 +49,7 @@
   private final ChangeCheckerImpl.Factory changeChecker;
   private final ProjectsFilter projectsFilter;
   private final GroupChecker groupChecker;
+  private final String instanceId;
 
   @Inject
   IndexEventHandler(
@@ -55,18 +57,20 @@
       DynamicSet<IndexEventForwarder> forwarders,
       ChangeCheckerImpl.Factory changeChecker,
       ProjectsFilter projectsFilter,
-      GroupChecker groupChecker) {
+      GroupChecker groupChecker,
+      @GerritInstanceId String instanceId) {
     this.forwarders = forwarders;
     this.executor = executor;
     this.changeChecker = changeChecker;
     this.projectsFilter = projectsFilter;
     this.groupChecker = groupChecker;
+    this.instanceId = instanceId;
   }
 
   @Override
   public void onAccountIndexed(int id) {
     if (!Context.isForwardedEvent()) {
-      IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id));
+      IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id, instanceId));
       if (queuedTasks.add(task)) {
         executor.execute(task);
       }
@@ -87,7 +91,8 @@
   public void onGroupIndexed(String groupUUID) {
     if (!Context.isForwardedEvent()) {
       IndexGroupTask task =
-          new IndexGroupTask(new GroupIndexEvent(groupUUID, groupChecker.getGroupHead(groupUUID)));
+          new IndexGroupTask(
+              new GroupIndexEvent(groupUUID, groupChecker.getGroupHead(groupUUID), instanceId));
       if (queuedTasks.add(task)) {
         executor.execute(task);
       }
@@ -97,7 +102,7 @@
   @Override
   public void onProjectIndexed(String projectName) {
     if (!Context.isForwardedEvent() && projectsFilter.matches(projectName)) {
-      IndexProjectTask task = new IndexProjectTask(new ProjectIndexEvent(projectName));
+      IndexProjectTask task = new IndexProjectTask(new ProjectIndexEvent(projectName, instanceId));
       if (queuedTasks.add(task)) {
         executor.execute(task);
       }
@@ -132,7 +137,7 @@
 
   private void executeDeleteChangeTask(int id) {
     if (!Context.isForwardedEvent()) {
-      IndexChangeTask task = new IndexChangeTask(new ChangeIndexEvent("", id, true));
+      IndexChangeTask task = new IndexChangeTask(new ChangeIndexEvent("", id, true, instanceId));
       if (queuedTasks.add(task)) {
         executor.execute(task);
       }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
index 28eddbb..ea7dada 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
@@ -36,7 +36,6 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import java.io.IOException;
 import java.util.Optional;
-import java.util.Set;
 import org.eclipse.jgit.errors.RepositoryNotFoundException;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.ObjectIdRef;
@@ -48,7 +47,7 @@
 @Singleton
 public class ProjectVersionRefUpdate implements EventListener {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-  private static final Set<RefUpdate.Result> SUCCESSFUL_RESULTS =
+  private static final ImmutableSet<RefUpdate.Result> SUCCESSFUL_RESULTS =
       ImmutableSet.of(RefUpdate.Result.NEW, RefUpdate.Result.FORCED, RefUpdate.Result.NO_CHANGE);
 
   public static final String MULTI_SITE_VERSIONING_REF = "refs/multi-site/version";
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 623871b..159e4a0 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -7,7 +7,8 @@
 message broker for aligning with the other masters over different sites.
 
 The masters must be:
-
+* Gerrit instance id is mandatory for @PLUGIN@ plugin. All the master 
+  must have [gerrit.instanceId](https://gerrit-review.googlesource.com/Documentation/config-gerrit.html#gerrit) populated.
 * events-broker library must be installed as a library module in the
   `$GERRIT_SITE/lib` directory of all the masters
 * global-refdb library must be installed as a library module in the
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
deleted file mode 100644
index 2df60dd..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-// Copyright (C) 2017 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import com.google.gerrit.server.config.SitePaths;
-import java.io.File;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.UUID;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.mockito.Answers;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class ModuleTest {
-
-  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
-  private Configuration configMock;
-
-  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
-
-  private Module module;
-
-  @Before
-  public void setup() {
-    module = new Module(configMock);
-  }
-
-  @Test
-  public void shouldGetInstanceId() throws Exception {
-    File tmpSitePath = tempFolder.newFolder();
-    File tmpPluginDataPath =
-        Paths.get(tmpSitePath.getPath(), "data", Configuration.PLUGIN_NAME).toFile();
-    tmpPluginDataPath.mkdirs();
-    Path path = Paths.get(tmpPluginDataPath.getPath(), Configuration.INSTANCE_ID_FILE);
-    SitePaths sitePaths = new SitePaths(Paths.get(tmpSitePath.getPath()));
-    assertThat(path.toFile().exists()).isFalse();
-
-    UUID gotUUID1 = module.getInstanceId(sitePaths);
-    assertThat(gotUUID1).isNotNull();
-    assertThat(path.toFile().exists()).isTrue();
-
-    UUID gotUUID2 = module.getInstanceId(sitePaths);
-    assertThat(gotUUID1).isEqualTo(gotUUID2);
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
index 92fa101..7d1751c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -1,15 +1,19 @@
 package com.googlesource.gerrit.plugins.multisite.broker;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.only;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import java.util.UUID;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -22,28 +26,35 @@
   @Mock private BrokerApi brokerApi;
   @Mock Event event;
   @Mock MessageLogger msgLog;
-  private UUID instanceId = UUID.randomUUID();
   private String topic = "index";
 
   private BrokerApiWrapper objectUnderTest;
 
   @Before
   public void setUp() {
+    event.instanceId = "instance-id";
     objectUnderTest =
         new BrokerApiWrapper(
-            DynamicItem.itemOf(BrokerApi.class, brokerApi), brokerMetrics, msgLog, instanceId);
+            MoreExecutors.directExecutor(),
+            DynamicItem.itemOf(BrokerApi.class, brokerApi),
+            brokerMetrics,
+            msgLog);
   }
 
   @Test
   public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
-    when(brokerApi.send(any(), any())).thenReturn(true);
+    SettableFuture<Boolean> resultF = SettableFuture.create();
+    resultF.set(true);
+    when(brokerApi.send(any(), any())).thenReturn(resultF);
     objectUnderTest.send(topic, event);
     verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
   }
 
   @Test
   public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailed() {
-    when(brokerApi.send(any(), any())).thenReturn(false);
+    SettableFuture<Boolean> resultF = SettableFuture.create();
+    resultF.setException(new Exception("Force Future failure"));
+    when(brokerApi.send(any(), any())).thenReturn(resultF);
     objectUnderTest.send(topic, event);
     verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
   }
@@ -53,10 +64,26 @@
     when(brokerApi.send(any(), any()))
         .thenThrow(new RuntimeException("Unexpected runtime exception"));
     try {
-      objectUnderTest.send(topic, event);
+      objectUnderTest.sendSync(topic, event);
     } catch (RuntimeException e) {
       // expected
     }
     verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
   }
+
+  @Test
+  public void shouldSkipMessageSendingWhenInstanceIdIsNull() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = null;
+    objectUnderTest.send(topic, event);
+    verify(brokerApi, never()).send(any(), eq(event));
+  }
+
+  @Test
+  public void shouldSkipMessageSendingWhenInstanceIdIsEmpty() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = "";
+    objectUnderTest.send(topic, event);
+    verify(brokerApi, never()).send(any(), eq(event));
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
index 67be583..ce222d0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
@@ -36,9 +36,10 @@
 
   @Test
   public void shouldNotPublishAccountsCacheEvictions() {
-
+    String instanceId = "instance-id";
     final CacheEvictionHandler<String, String> handler =
-        new CacheEvictionHandler<>(DynamicSet.emptySet(), executorMock, defaultCacheMatcher);
+        new CacheEvictionHandler<>(
+            DynamicSet.emptySet(), executorMock, defaultCacheMatcher, instanceId);
 
     handler.onRemoval(
         "test", "accounts", RemovalNotification.create("test", "accounts", RemovalCause.EXPLICIT));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
index 4263ddb..c8216bd 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
@@ -41,6 +41,8 @@
 @RunWith(MockitoJUnitRunner.class)
 public class ProjectListUpdateHandlerTest {
 
+  private static final String INSTANCE_ID = "instance-id";
+
   private ProjectListUpdateHandler handler;
 
   @Mock private ProjectListUpdateForwarder forwarder;
@@ -51,7 +53,7 @@
     when(projectsFilter.matches(any(String.class))).thenReturn(true);
     handler =
         new ProjectListUpdateHandler(
-            asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter);
+            asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter, INSTANCE_ID);
   }
 
   private DynamicSet<ProjectListUpdateForwarder> asDynamicSet(
@@ -69,7 +71,8 @@
     handler.onNewProjectCreated(event);
     verify(forwarder)
         .updateProjectList(
-            any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, false)));
+            any(ProjectListUpdateTask.class),
+            eq(new ProjectListUpdateEvent(projectName, false, INSTANCE_ID)));
   }
 
   @Test
@@ -80,7 +83,8 @@
     handler.onProjectDeleted(event);
     verify(forwarder)
         .updateProjectList(
-            any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, true)));
+            any(ProjectListUpdateTask.class),
+            eq(new ProjectListUpdateEvent(projectName, true, INSTANCE_ID)));
   }
 
   @Test
@@ -101,18 +105,22 @@
     handler.onNewProjectCreated(event);
     verify(forwarder, never())
         .updateProjectList(
-            any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, true)));
+            any(ProjectListUpdateTask.class),
+            eq(new ProjectListUpdateEvent(projectName, true, INSTANCE_ID)));
   }
 
   @Test
   public void testProjectUpdateTaskToString() throws Exception {
     String projectName = "someProjectName";
     ProjectListUpdateTask task =
-        handler.new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, false));
+        handler
+        .new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, false, INSTANCE_ID));
     assertThat(task.toString())
         .isEqualTo(String.format("Update project list in target instance: add '%s'", projectName));
 
-    task = handler.new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, true));
+    task =
+        handler
+        .new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, true, INSTANCE_ID));
     assertThat(task.toString())
         .isEqualTo(
             String.format("Update project list in target instance: remove '%s'", projectName));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
new file mode 100644
index 0000000..1e11bf9
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
@@ -0,0 +1,164 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.Configuration.Broker;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+@Ignore
+public abstract class AbstractSubscriberTestBase {
+  protected static final String NODE_INSTANCE_ID = "node-instance-id";
+  protected static final String INSTANCE_ID = "other-node-instance-id";
+  protected static final String PROJECT_NAME = "project-name";
+
+  @Mock protected DroppedEventListener droppedEventListeners;
+  @Mock protected MessageLogger msgLog;
+  @Mock protected SubscriberMetrics subscriberMetrics;
+  @Mock protected Configuration cfg;
+  @Mock protected Broker brokerCfg;
+  @Mock protected ProjectsFilter projectsFilter;
+
+  @SuppressWarnings("rawtypes")
+  protected ForwardedEventRouter eventRouter;
+
+  protected AbstractSubcriber objectUnderTest;
+
+  @Before
+  public void setup() {
+    when(cfg.broker()).thenReturn(brokerCfg);
+    when(brokerCfg.getTopic(any(), any())).thenReturn("test-topic");
+    eventRouter = eventRouter();
+    objectUnderTest = objectUnderTest();
+  }
+
+  @Test
+  public void shouldConsumeEventsWhenNotFilteredByProjectName()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    for (Event event : events()) {
+      when(projectsFilter.matches(any(String.class))).thenReturn(true);
+      objectUnderTest.getConsumer().accept(event);
+      verifyConsumed(event);
+    }
+  }
+
+  @Test
+  public void shouldSkipEventsWhenFilteredByProjectName()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    for (Event event : events()) {
+      when(projectsFilter.matches(any(String.class))).thenReturn(false);
+      objectUnderTest.getConsumer().accept(event);
+      verifySkipped(event);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldSkipLocalEvents()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    for (Event event : events()) {
+      event.instanceId = NODE_INSTANCE_ID;
+      when(projectsFilter.matches(any(String.class))).thenReturn(true);
+
+      objectUnderTest.getConsumer().accept(event);
+
+      verify(projectsFilter, never()).matches(PROJECT_NAME);
+      verify(eventRouter, never()).route(event);
+      verify(droppedEventListeners, times(1)).onEventDropped(event);
+      reset(projectsFilter, eventRouter, droppedEventListeners);
+    }
+  }
+
+  @Test
+  public void shouldUpdateReplicationMetricsWithLocalEvents()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    for (Event event : events()) {
+      event.instanceId = NODE_INSTANCE_ID;
+      when(projectsFilter.matches(any(String.class))).thenReturn(true);
+
+      objectUnderTest.getConsumer().accept(event);
+
+      verify(subscriberMetrics, times(1)).updateReplicationStatusMetrics(event);
+      reset(projectsFilter, eventRouter, droppedEventListeners, subscriberMetrics);
+    }
+  }
+
+  @Test
+  public void shouldUpdateReplicationMetricsWithNonLocalEvents()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    for (Event event : events()) {
+      event.instanceId = INSTANCE_ID;
+      when(projectsFilter.matches(any(String.class))).thenReturn(true);
+
+      objectUnderTest.getConsumer().accept(event);
+
+      verify(subscriberMetrics, times(1)).updateReplicationStatusMetrics(event);
+      reset(projectsFilter, eventRouter, droppedEventListeners, subscriberMetrics);
+    }
+  }
+
+  protected abstract AbstractSubcriber objectUnderTest();
+
+  protected abstract List<Event> events();
+
+  @SuppressWarnings("rawtypes")
+  protected abstract ForwardedEventRouter eventRouter();
+
+  @SuppressWarnings("unchecked")
+  protected void verifySkipped(Event event)
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    verify(projectsFilter, times(1)).matches(PROJECT_NAME);
+    verify(eventRouter, never()).route(event);
+    verify(droppedEventListeners, times(1)).onEventDropped(event);
+    reset(projectsFilter, eventRouter, droppedEventListeners);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected void verifyConsumed(Event event)
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    verify(projectsFilter, times(1)).matches(PROJECT_NAME);
+    verify(eventRouter, times(1)).route(event);
+    verify(droppedEventListeners, never()).onEventDropped(event);
+    reset(projectsFilter, eventRouter, droppedEventListeners);
+  }
+
+  protected DynamicSet<DroppedEventListener> asDynamicSet(DroppedEventListener listener) {
+    DynamicSet<DroppedEventListener> result = new DynamicSet<>();
+    result.add("multi-site", listener);
+    return result;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriberTest.java
new file mode 100644
index 0000000..5031f36
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriberTest.java
@@ -0,0 +1,77 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+
+public class BatchIndexEventSubscriberTest extends AbstractSubscriberTestBase {
+  private static final boolean DELETED = false;
+  private static final int CHANGE_ID = 1;
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldConsumeNonProjectAndNonChangeIndexingEventsTypes()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    IndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
+
+    objectUnderTest.getConsumer().accept(event);
+
+    verify(projectsFilter, never()).matches(PROJECT_NAME);
+    verify(eventRouter, times(1)).route(event);
+    verify(droppedEventListeners, never()).onEventDropped(event);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  protected ForwardedEventRouter eventRouter() {
+    return mock(IndexEventRouter.class);
+  }
+
+  @Override
+  protected List<Event> events() {
+    return ImmutableList.of(
+        new ProjectIndexEvent(PROJECT_NAME, INSTANCE_ID),
+        new ChangeIndexEvent(PROJECT_NAME, CHANGE_ID, DELETED, INSTANCE_ID));
+  }
+
+  @Override
+  protected AbstractSubcriber objectUnderTest() {
+    return new BatchIndexEventSubscriber(
+        (IndexEventRouter) eventRouter,
+        asDynamicSet(droppedEventListeners),
+        NODE_INSTANCE_ID,
+        msgLog,
+        subscriberMetrics,
+        cfg,
+        projectsFilter);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
new file mode 100644
index 0000000..0e63528
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
@@ -0,0 +1,133 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.Account;
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.change.ChangeFinder;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.notedb.ChangeNotes;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.gerrit.server.util.time.TimeUtil;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import org.junit.Test;
+import org.mockito.Mock;
+
+public class IndexEventSubscriberTest extends AbstractSubscriberTestBase {
+  private static final boolean DELETED = false;
+  private static final int CHANGE_ID = 1;
+  private static final String EMPTY_PROJECT_NAME = "";
+
+  @Mock protected ChangeFinder changeFinderMock;
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldConsumeNonProjectAndNonChangeIndexingEventsTypes()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    IndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
+
+    objectUnderTest.getConsumer().accept(event);
+
+    verify(projectsFilter, never()).matches(PROJECT_NAME);
+    verify(eventRouter, times(1)).route(event);
+    verify(droppedEventListeners, never()).onEventDropped(event);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldConsumeDeleteChangeIndexEventWithEmptyProjectNameWhenFound()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    ChangeIndexEvent event = new ChangeIndexEvent(EMPTY_PROJECT_NAME, CHANGE_ID, true, INSTANCE_ID);
+
+    ChangeNotes changeNotesMock = mock(ChangeNotes.class);
+    when(changeNotesMock.getChange()).thenReturn(newChange());
+    when(changeFinderMock.findOne(any(Change.Id.class))).thenReturn(Optional.of(changeNotesMock));
+    when(projectsFilter.matches(PROJECT_NAME)).thenReturn(true);
+
+    objectUnderTest.getConsumer().accept(event);
+
+    verify(projectsFilter, times(1)).matches(PROJECT_NAME);
+    verify(eventRouter, times(1)).route(event);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldNOTConsumeDeleteChangeIndexEventWithEmptyProjectNameWhenNotFound()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    ChangeIndexEvent event = new ChangeIndexEvent("", CHANGE_ID, true, INSTANCE_ID);
+
+    when(changeFinderMock.findOne(any(Change.Id.class))).thenReturn(Optional.empty());
+    when(projectsFilter.matches(EMPTY_PROJECT_NAME)).thenReturn(false);
+
+    objectUnderTest.getConsumer().accept(event);
+
+    verify(projectsFilter, times(1)).matches(EMPTY_PROJECT_NAME);
+    verify(eventRouter, never()).route(event);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  protected ForwardedEventRouter eventRouter() {
+    return mock(IndexEventRouter.class);
+  }
+
+  @Override
+  protected List<Event> events() {
+    return ImmutableList.of(
+        new ProjectIndexEvent(PROJECT_NAME, INSTANCE_ID),
+        new ChangeIndexEvent(PROJECT_NAME, CHANGE_ID, DELETED, INSTANCE_ID));
+  }
+
+  @Override
+  protected AbstractSubcriber objectUnderTest() {
+    return new IndexEventSubscriber(
+        (IndexEventRouter) eventRouter,
+        asDynamicSet(droppedEventListeners),
+        NODE_INSTANCE_ID,
+        msgLog,
+        subscriberMetrics,
+        cfg,
+        projectsFilter,
+        changeFinderMock);
+  }
+
+  private Change newChange() {
+    return new Change(
+        Change.key(Integer.toString(CHANGE_ID)),
+        Change.id(CHANGE_ID),
+        Account.id(9999),
+        BranchNameKey.create(Project.nameKey(PROJECT_NAME), "refs/heads/master"),
+        TimeUtil.nowTs());
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriberTest.java
new file mode 100644
index 0000000..201ee81
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriberTest.java
@@ -0,0 +1,50 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.mock;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
+import java.util.List;
+
+public class ProjectUpdateEventSubscriberTest extends AbstractSubscriberTestBase {
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  protected ForwardedEventRouter eventRouter() {
+    return mock(ProjectListUpdateRouter.class);
+  }
+
+  @Override
+  protected List<Event> events() {
+    return ImmutableList.of(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID));
+  }
+
+  @Override
+  protected AbstractSubcriber objectUnderTest() {
+    return new ProjectUpdateEventSubscriber(
+        (ProjectListUpdateRouter) eventRouter,
+        asDynamicSet(droppedEventListeners),
+        NODE_INSTANCE_ID,
+        msgLog,
+        subscriberMetrics,
+        cfg,
+        projectsFilter);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
index dc1a564..a2a5e73 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
@@ -16,16 +16,19 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ImmutableSortedSet;
 import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
+import java.util.Optional;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -90,4 +93,73 @@
 
     assertThat(replicationStatusCache.getIfPresent("projectA")).isEqualTo(20L);
   }
+
+  @Test
+  public void shouldRemoveProjectFromPersistedCache() {
+    String projectName = "projectA";
+    long lag = 100;
+    setupReplicationLag(projectName, lag);
+    when(projectVersionRefUpdate.getProjectLocalVersion(projectName)).thenReturn(Optional.empty());
+
+    objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+    assertThat(replicationStatusCache.getIfPresent(projectName)).isNull();
+  }
+
+  @Test
+  public void shouldRemoveProjectFromReplicationLags() {
+    String projectName = "projectA";
+    long lag = 100;
+    setupReplicationLag(projectName, lag);
+    when(projectVersionRefUpdate.getProjectLocalVersion(projectName)).thenReturn(Optional.empty());
+
+    assertThat(objectUnderTest.getReplicationLags(1).keySet()).containsExactly(projectName);
+
+    objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+    assertThat(objectUnderTest.getReplicationLags(1).keySet()).isEmpty();
+  }
+
+  @Test
+  public void shouldNotRemoveProjectFromReplicationLagsIfLocalVersionStillExists() {
+    String projectName = "projectA";
+    long lag = 100;
+    setupReplicationLag(projectName, lag);
+    when(projectVersionRefUpdate.getProjectLocalVersion(projectName))
+        .thenReturn(Optional.of(System.currentTimeMillis()));
+
+    objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+    assertThat(objectUnderTest.getReplicationLags(1).keySet()).containsExactly(projectName);
+  }
+
+  @Test
+  public void shouldNotEvictFromPersistentCacheIfLocalVersionStillExists() {
+    String projectName = "projectA";
+    long lag = 100;
+    setupReplicationLag(projectName, lag);
+    when(projectVersionRefUpdate.getProjectLocalVersion(projectName))
+        .thenReturn(Optional.of(System.currentTimeMillis()));
+
+    objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+    assertThat(replicationStatusCache.getIfPresent(projectName)).isEqualTo(lag);
+  }
+
+  private void setupReplicationLag(String projectName, long lag) {
+    long currentVersion = System.currentTimeMillis();
+    long newVersion = currentVersion + lag;
+    replicationStatusCache.put(projectName, 3L);
+    when(projectVersionRefUpdate.getProjectRemoteVersion(projectName))
+        .thenReturn(Optional.of(newVersion));
+    when(projectVersionRefUpdate.getProjectLocalVersion(projectName))
+        .thenReturn(Optional.of(currentVersion));
+    objectUnderTest.updateReplicationLag(Project.nameKey(projectName));
+  }
+
+  private ProjectDeletedListener.Event projectDeletedEvent(String projectName) {
+    ProjectDeletedListener.Event event = mock(ProjectDeletedListener.Event.class);
+    when(event.getProjectName()).thenReturn(projectName);
+    return event;
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java
new file mode 100644
index 0000000..a0e97a8
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java
@@ -0,0 +1,93 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+import org.mockito.Mock;
+
+public class StreamEventSubscriberTest extends AbstractSubscriberTestBase {
+  private static final NameKey PROJECT_NAME_KEY = NameKey.parse(PROJECT_NAME);
+  private @Mock RefUpdatedEvent refUpdatedEvent;
+  private @Mock RefReplicationDoneEvent refReplicationDoneEvent;
+  private @Mock ReplicationScheduledEvent replicationScheduledEvent;
+  private @Mock RefReplicatedEvent refReplicatedEvent;
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  protected ForwardedEventRouter eventRouter() {
+    return mock(StreamEventRouter.class);
+  }
+
+  @Override
+  protected List<Event> events() {
+    when(refUpdatedEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+    refUpdatedEvent.instanceId = INSTANCE_ID;
+    when(refReplicationDoneEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+    refReplicationDoneEvent.instanceId = INSTANCE_ID;
+    when(replicationScheduledEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+    replicationScheduledEvent.instanceId = INSTANCE_ID;
+    when(refReplicatedEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+    refReplicatedEvent.instanceId = INSTANCE_ID;
+
+    return ImmutableList.of(
+        refUpdatedEvent, refReplicationDoneEvent, replicationScheduledEvent, refReplicatedEvent);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldNotConsumeNonProjectEventTypeEvents()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    IndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
+
+    objectUnderTest.getConsumer().accept(event);
+
+    verify(projectsFilter, never()).matches(PROJECT_NAME);
+    verify(eventRouter, times(1)).route(event);
+    verify(droppedEventListeners, never()).onEventDropped(event);
+  }
+
+  @Override
+  protected AbstractSubcriber objectUnderTest() {
+    return new StreamEventSubscriber(
+        (StreamEventRouter) eventRouter,
+        asDynamicSet(droppedEventListeners),
+        NODE_INSTANCE_ID,
+        msgLog,
+        subscriberMetrics,
+        cfg,
+        projectsFilter);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
index 6aa2ce6..8ed4a14 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
@@ -14,24 +14,26 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
 import com.google.common.base.Suppliers;
 import com.google.common.cache.CacheBuilder;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.server.data.RefUpdateAttribute;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.RefUpdatedEvent;
-import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
+import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationSucceededEvent;
+import java.net.URISyntaxException;
 import java.util.Optional;
-import java.util.UUID;
+import org.eclipse.jgit.transport.URIish;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -44,27 +46,23 @@
   private static final Project.NameKey A_TEST_PROJECT_NAME_KEY =
       Project.nameKey(A_TEST_PROJECT_NAME);
 
-  @Mock private SharedRefDatabaseWrapper sharedRefDb;
-  @Mock private GitReferenceUpdated gitReferenceUpdated;
   @Mock private MetricMaker metricMaker;
   @Mock private ProjectVersionLogger verLogger;
   @Mock private ProjectCache projectCache;
   @Mock private Provider<ProjectVersionRefUpdate> projectVersionRefUpdateProvider;
   @Mock private ProjectVersionRefUpdate projectVersionRefUpdate;
   private SubscriberMetrics metrics;
-  private EventMessage.Header msgHeader;
+  private ReplicationStatus replicationStatus;
 
   @Before
   public void setup() throws Exception {
-    msgHeader = new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID());
-    metrics =
-        new SubscriberMetrics(
-            metricMaker,
-            new ReplicationStatus(
-                CacheBuilder.newBuilder().build(),
-                projectVersionRefUpdateProvider,
-                verLogger,
-                projectCache));
+    replicationStatus =
+        new ReplicationStatus(
+            CacheBuilder.newBuilder().build(),
+            projectVersionRefUpdateProvider,
+            verLogger,
+            projectCache);
+    metrics = new SubscriberMetrics(metricMaker, replicationStatus);
     when(projectVersionRefUpdateProvider.get()).thenReturn(projectVersionRefUpdate);
   }
 
@@ -76,7 +74,7 @@
     when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
         .thenReturn(globalRefDbVersion);
 
-    EventMessage eventMessage = new EventMessage(msgHeader, newRefUpdateEvent());
+    Event eventMessage = newRefUpdateEvent();
 
     metrics.updateReplicationStatusMetrics(eventMessage);
 
@@ -92,13 +90,102 @@
     when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
         .thenReturn(globalRefDbVersion);
 
-    EventMessage eventMessage = new EventMessage(msgHeader, newRefUpdateEvent());
+    Event eventMessage = newRefUpdateEvent();
 
     metrics.updateReplicationStatusMetrics(eventMessage);
 
     verify(verLogger).log(A_TEST_PROJECT_NAME_KEY, globalRefDbVersion.get(), replicationLag);
   }
 
+  @Test
+  public void
+      shouldLogUponProjectDeletionSuccessWhenLocalVersionDoesNotExistAndSubscriberMetricsExist()
+          throws Exception {
+    long nowSecs = System.currentTimeMillis() / 1000;
+    long replicationLagSecs = 60;
+    Optional<Long> globalRefDbVersion = Optional.of(nowSecs);
+    when(projectVersionRefUpdate.getProjectRemoteVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(globalRefDbVersion.map(ts -> ts + replicationLagSecs));
+    when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(globalRefDbVersion);
+
+    Event refUpdateEventMessage = newRefUpdateEvent();
+    metrics.updateReplicationStatusMetrics(refUpdateEventMessage);
+
+    assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME))
+        .isEqualTo(replicationLagSecs);
+    assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isEqualTo(nowSecs);
+
+    when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(Optional.empty());
+
+    Event projectDeleteEventMessage = projectDeletionSuccess();
+    metrics.updateReplicationStatusMetrics(projectDeleteEventMessage);
+
+    verify(verLogger).logDeleted(A_TEST_PROJECT_NAME_KEY);
+  }
+
+  @Test
+  public void shouldNotLogUponProjectDeletionSuccessWhenSubscriberMetricsDoNotExist()
+      throws Exception {
+    Event eventMessage = projectDeletionSuccess();
+    when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(Optional.empty());
+
+    assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME)).isNull();
+    assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isNull();
+
+    metrics.updateReplicationStatusMetrics(eventMessage);
+
+    verifyZeroInteractions(verLogger);
+  }
+
+  @Test
+  public void shouldNotLogUponProjectDeletionSuccessWhenLocalVersionStillExists() throws Exception {
+    Event eventMessage = projectDeletionSuccess();
+    Optional<Long> anyRefVersionValue = Optional.of(System.currentTimeMillis() / 1000);
+    when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(anyRefVersionValue);
+
+    metrics.updateReplicationStatusMetrics(eventMessage);
+
+    verifyZeroInteractions(verLogger);
+  }
+
+  @Test
+  public void shouldRemoveProjectMetricsUponProjectDeletionSuccess() throws Exception {
+    long nowSecs = System.currentTimeMillis() / 1000;
+    long replicationLagSecs = 60;
+    Optional<Long> globalRefDbVersion = Optional.of(nowSecs);
+    when(projectVersionRefUpdate.getProjectRemoteVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(globalRefDbVersion.map(ts -> ts + replicationLagSecs));
+    when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(globalRefDbVersion);
+
+    Event eventMessage = newRefUpdateEvent();
+
+    metrics.updateReplicationStatusMetrics(eventMessage);
+
+    assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME))
+        .isEqualTo(replicationLagSecs);
+    assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isEqualTo(nowSecs);
+
+    when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(Optional.empty());
+    Event projectDeleteEvent = projectDeletionSuccess();
+
+    metrics.updateReplicationStatusMetrics(projectDeleteEvent);
+
+    assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME)).isNull();
+    assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isNull();
+  }
+
+  private ProjectDeletionReplicationSucceededEvent projectDeletionSuccess()
+      throws URISyntaxException {
+    return new ProjectDeletionReplicationSucceededEvent(
+        A_TEST_PROJECT_NAME, new URIish("git://target"));
+  }
+
   private RefUpdatedEvent newRefUpdateEvent() {
     RefUpdateAttribute refUpdate = new RefUpdateAttribute();
     refUpdate.project = A_TEST_PROJECT_NAME;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
index 76c001f..6762ba2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
@@ -34,6 +34,7 @@
 @RunWith(MockitoJUnitRunner.class)
 public class CacheEvictionEventRouterTest {
 
+  private static final String INSTANCE_ID = "instance-id";
   private static Gson gson = new EventGsonProvider().get();
   private CacheEvictionEventRouter router;
   @Mock private ForwardedCacheEvictionHandler cacheEvictionHandler;
@@ -45,7 +46,7 @@
 
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_CacheEviction() throws Exception {
-    final CacheEvictionEvent event = new CacheEvictionEvent("cache", "key");
+    final CacheEvictionEvent event = new CacheEvictionEvent("cache", "key", INSTANCE_ID);
     router.route(event);
 
     verify(cacheEvictionHandler).evict(CacheEntry.from(event.cacheName, event.key));
@@ -54,7 +55,7 @@
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_CacheEvictionWithSlash()
       throws Exception {
-    final CacheEvictionEvent event = new CacheEvictionEvent("cache", "some/key");
+    final CacheEvictionEvent event = new CacheEvictionEvent("cache", "some/key", INSTANCE_ID);
     router.route(event);
 
     verify(cacheEvictionHandler).evict(CacheEntry.from(event.cacheName, event.key));
@@ -63,7 +64,8 @@
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_ProjectCacheEvictionWithSlash()
       throws Exception {
-    final CacheEvictionEvent event = new CacheEvictionEvent(Constants.PROJECTS, "some/project");
+    final CacheEvictionEvent event =
+        new CacheEvictionEvent(Constants.PROJECTS, "some/project", INSTANCE_ID);
     router.route(event);
 
     verify(cacheEvictionHandler)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
index 1dc07bd..8efa2ed 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
@@ -33,7 +33,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
-import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
 import java.util.Optional;
 import org.eclipse.jgit.lib.ObjectId;
 import org.junit.Before;
@@ -44,7 +44,7 @@
 
 @RunWith(MockitoJUnitRunner.class)
 public class IndexEventRouterTest {
-
+  private static final String INSTANCE_ID = "instance-id";
   private IndexEventRouter router;
   @Mock private ForwardedIndexAccountHandler indexAccountHandler;
   @Mock private ForwardedIndexChangeHandler indexChangeHandler;
@@ -66,7 +66,7 @@
 
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_AccountIndex() throws Exception {
-    final AccountIndexEvent event = new AccountIndexEvent(1);
+    final AccountIndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
     router.route(event);
 
     verify(indexAccountHandler)
@@ -80,7 +80,7 @@
 
     StreamEventRouter streamEventRouter = new StreamEventRouter(forwardedEventHandler, router);
 
-    final AccountIndexEvent event = new AccountIndexEvent(1);
+    final AccountIndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
     router.route(event);
 
     verify(indexAccountHandler)
@@ -96,7 +96,7 @@
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_GroupIndex() throws Exception {
     final String groupId = "12";
-    final GroupIndexEvent event = new GroupIndexEvent(groupId, ObjectId.zeroId());
+    final GroupIndexEvent event = new GroupIndexEvent(groupId, ObjectId.zeroId(), INSTANCE_ID);
     router.route(event);
 
     verify(indexGroupHandler)
@@ -108,7 +108,7 @@
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_ProjectIndex() throws Exception {
     final String projectName = "projectName";
-    final ProjectIndexEvent event = new ProjectIndexEvent(projectName);
+    final ProjectIndexEvent event = new ProjectIndexEvent(projectName, INSTANCE_ID);
     router.route(event);
 
     verify(indexProjectHandler)
@@ -119,7 +119,7 @@
 
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_ChangeIndex() throws Exception {
-    final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, false);
+    final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, false, INSTANCE_ID);
     router.route(event);
 
     verify(indexChangeHandler)
@@ -133,7 +133,7 @@
 
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_ChangeIndexDelete() throws Exception {
-    final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, true);
+    final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, true, INSTANCE_ID);
     router.route(event);
 
     verify(indexChangeHandler)
@@ -147,7 +147,7 @@
 
   @Test
   public void routerShouldFailForNotRecognisedEvents() throws Exception {
-    final IndexEvent newEventType = new IndexEvent("new-type") {};
+    final IndexEvent newEventType = new IndexEvent("new-type", INSTANCE_ID) {};
 
     assertThrows(UnsupportedOperationException.class, () -> router.route(newEventType));
     verifyZeroInteractions(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
index 93daf92..8a21c39 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
@@ -38,7 +38,8 @@
 
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_ProjectListUpdate() throws Exception {
-    final ProjectListUpdateEvent event = new ProjectListUpdateEvent("project", false);
+    String instanceId = "instance-id";
+    final ProjectListUpdateEvent event = new ProjectListUpdateEvent("project", false, instanceId);
     router.route(event);
 
     verify(projectListUpdateHandler).update(event);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
index e3d1ae0..28bf56d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
@@ -68,10 +68,10 @@
     }
   }
 
-  public class TestEvent extends MultiSiteEvent {
+  public static class TestEvent extends MultiSiteEvent {
 
     protected TestEvent() {
-      super("test");
+      super("test", "instance-id");
     }
   }
 
@@ -93,7 +93,7 @@
   @Test
   public void shouldSendEventToBrokerFromGenericSourceThread() {
     brokerForwarder.send(newForwarderTask(), testTopic, testEvent);
-    verify(brokerMock).send(eq(testTopicName), eq(testEvent));
+    verify(brokerMock).sendSync(eq(testTopicName), eq(testEvent));
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
index 826e154..75596ed 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.Sets;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.config.GerritConfig;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.api.projects.ProjectInput;
 import com.google.gerrit.extensions.registration.DynamicSet;
@@ -123,8 +124,10 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
   public void shouldEvictProjectCache() throws Exception {
-    objectUnderTest.route(new CacheEvictionEvent(ProjectCacheImpl.CACHE_NAME, project.get()));
+    objectUnderTest.route(
+        new CacheEvictionEvent(ProjectCacheImpl.CACHE_NAME, project.get(), "instance-id"));
     evictionsCacheTracker.waitForExpectedEvictions();
 
     assertThat(evictionsCacheTracker.trackedEvictionsFor(ProjectCacheImpl.CACHE_NAME))
@@ -132,6 +135,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "instance-id")
   public void shouldEvictProjectCacheWithSlash() throws Exception {
     ProjectInput in = new ProjectInput();
     in.name = name("my/project");
@@ -141,7 +145,7 @@
     restartCacheEvictionsTracking();
 
     objectUnderTest.route(
-        new CacheEvictionEvent(ProjectCacheImpl.CACHE_NAME, projectNameKey.get()));
+        new CacheEvictionEvent(ProjectCacheImpl.CACHE_NAME, projectNameKey.get(), "instance-id"));
 
     evictionsCacheTracker.waitForExpectedEvictions();
     assertThat(evictionsCacheTracker.trackedEvictionsFor(ProjectCacheImpl.CACHE_NAME))
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerTest.java
index 90ae8da..dbd358d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerTest.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.multisite.forwarder;
 
 import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
 import static org.mockito.Mockito.doReturn;
 
 import com.google.common.cache.Cache;
@@ -48,10 +49,12 @@
   public void shouldThrowAnExceptionWhenCacheNotFound() throws Exception {
     CacheEntry entry = new CacheEntry("somePlugin", "unexistingCache", null);
 
-    exception.expect(CacheNotFoundException.class);
-    exception.expectMessage(
-        String.format("cache %s.%s not found", entry.getPluginName(), entry.getCacheName()));
-    handler.evict(entry);
+    CacheNotFoundException thrown =
+        assertThrows(CacheNotFoundException.class, () -> handler.evict(entry));
+    assertThat(thrown)
+        .hasMessageThat()
+        .isEqualTo(
+            String.format("cache %s.%s not found", entry.getPluginName(), entry.getCacheName()));
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandlerTest.java
index 32c6319..02efb77 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandlerTest.java
@@ -61,9 +61,11 @@
 
   @Test
   public void deleteIsNotSupported() throws Exception {
-    exception.expect(UnsupportedOperationException.class);
-    exception.expectMessage("Delete from account index not supported");
-    handler.index(id, Operation.DELETE, Optional.empty());
+    UnsupportedOperationException thrown =
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> handler.index(id, Operation.DELETE, Optional.empty()));
+    assertThat(thrown).hasMessageThat().isEqualTo("Delete from account index not supported");
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
index 96470b6..452a0d2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
@@ -104,7 +104,9 @@
   public void changeIsStillIndexedEvenWhenOutdated() throws Exception {
     setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_OUTDATED);
     handler.index(
-        TEST_CHANGE_ID, Operation.INDEX, Optional.of(new ChangeIndexEvent("foo", 1, false)));
+        TEST_CHANGE_ID,
+        Operation.INDEX,
+        Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id")));
     verify(indexerMock, times(1)).index(any(Change.class));
   }
 
@@ -125,8 +127,9 @@
   @Test
   public void indexerThrowsStorageExceptionTryingToIndexChange() throws Exception {
     setupChangeAccessRelatedMocks(CHANGE_EXISTS, THROW_STORAGE_EXCEPTION, CHANGE_UP_TO_DATE);
-    exception.expect(StorageException.class);
-    handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
+    assertThrows(
+        StorageException.class,
+        () -> handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty()));
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
index 982ac52..b49dbfa 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
@@ -74,9 +74,11 @@
 
   @Test
   public void deleteIsNotSupported() throws Exception {
-    exception.expect(UnsupportedOperationException.class);
-    exception.expectMessage("Delete from group index not supported");
-    handler.index(uuid, Operation.DELETE, Optional.empty());
+    UnsupportedOperationException thrown =
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> handler.index(uuid, Operation.DELETE, Optional.empty()));
+    assertThat(thrown).hasMessageThat().isEqualTo("Delete from group index not supported");
   }
 
   @Test
@@ -141,6 +143,6 @@
   }
 
   private Optional<GroupIndexEvent> groupIndexEvent(String uuid) {
-    return Optional.of(new GroupIndexEvent(uuid, null));
+    return Optional.of(new GroupIndexEvent(uuid, null, "instance-id"));
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java
index 3ce5e14..72b9427 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java
@@ -73,9 +73,11 @@
 
   @Test
   public void deleteIsNotSupported() throws Exception {
-    exception.expect(UnsupportedOperationException.class);
-    exception.expectMessage("Delete from project index not supported");
-    handler.index(nameKey, Operation.DELETE, Optional.empty());
+    UnsupportedOperationException thrown =
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> handler.index(nameKey, Operation.DELETE, Optional.empty()));
+    assertThat(thrown).hasMessageThat().isEqualTo("Delete from project index not supported");
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java
index 9893ce7..2412c79 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java
@@ -34,6 +34,7 @@
 @RunWith(MockitoJUnitRunner.class)
 public class ForwardedProjectListUpdateHandlerTest {
 
+  private static final String INSTANCE_ID = "instance-id";
   private static final String PROJECT_NAME = "someProject";
   private static final String SOME_MESSAGE = "someMessage";
   private static final Project.NameKey PROJECT_KEY = Project.nameKey(PROJECT_NAME);
@@ -48,13 +49,13 @@
 
   @Test
   public void testSuccessfulAdd() throws Exception {
-    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false));
+    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID));
     verify(projectCacheMock).onCreateProject(PROJECT_KEY);
   }
 
   @Test
   public void testSuccessfulRemove() throws Exception {
-    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true));
+    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true, INSTANCE_ID));
     verify(projectCacheMock).remove(PROJECT_KEY);
   }
 
@@ -72,7 +73,7 @@
         .onCreateProject(PROJECT_KEY);
 
     assertThat(Context.isForwardedEvent()).isFalse();
-    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false));
+    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID));
     assertThat(Context.isForwardedEvent()).isFalse();
 
     verify(projectCacheMock).onCreateProject(PROJECT_KEY);
@@ -92,7 +93,7 @@
         .remove(PROJECT_KEY);
 
     assertThat(Context.isForwardedEvent()).isFalse();
-    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true));
+    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true, INSTANCE_ID));
     assertThat(Context.isForwardedEvent()).isFalse();
 
     verify(projectCacheMock).remove(PROJECT_KEY);
@@ -113,7 +114,7 @@
     RuntimeException thrown =
         assertThrows(
             RuntimeException.class,
-            () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false)));
+            () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID)));
     assertThat(thrown).hasMessageThat().isEqualTo(SOME_MESSAGE);
     assertThat(Context.isForwardedEvent()).isFalse();
 
@@ -135,7 +136,7 @@
     RuntimeException thrown =
         assertThrows(
             RuntimeException.class,
-            () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true)));
+            () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true, INSTANCE_ID)));
     assertThat(thrown).hasMessageThat().isEqualTo(SOME_MESSAGE);
     assertThat(Context.isForwardedEvent()).isFalse();
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
index 6403cce..09f8f59 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
@@ -101,7 +101,7 @@
   }
 
   private Optional<GroupIndexEvent> groupIndexEvent(String uuid, @Nullable ObjectId sha1) {
-    return Optional.of(new GroupIndexEvent(uuid, sha1));
+    return Optional.of(new GroupIndexEvent(uuid, sha1, "instance-id"));
   }
 
   private void setCommitExistsInRepo(boolean commitExists) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java
index 93cec05..099f1dd 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java
@@ -16,7 +16,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gson.Gson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
 import java.util.UUID;
@@ -24,13 +24,14 @@
 import org.junit.Test;
 
 public class GroupEventIndexTest {
+  private static final String INSTANCE_ID = "instance-id";
   private static final Gson gson = new EventGsonProvider().get();
 
   @Test
   public void groupEventIndexRoundTripWithSha1() {
     String aGroupUUID = UUID.randomUUID().toString();
     ObjectId anObjectId = ObjectId.fromString("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef");
-    GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, anObjectId);
+    GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, anObjectId, INSTANCE_ID);
 
     assertThat(gson.fromJson(gson.toJson(original), GroupIndexEvent.class)).isEqualTo(original);
   }
@@ -38,7 +39,7 @@
   @Test
   public void groupEventIndexRoundTripWithoutSha1() {
     String aGroupUUID = UUID.randomUUID().toString();
-    GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, null);
+    GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, null, INSTANCE_ID);
 
     assertThat(gson.fromJson(gson.toJson(original), GroupIndexEvent.class)).isEqualTo(original);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
index 660a302..3fd13c6 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
@@ -36,6 +36,8 @@
 @RunWith(MockitoJUnitRunner.class)
 public class IndexEventHandlerTest {
 
+  private static final String INSTANCE_ID = "instance-id";
+
   private IndexEventHandler eventHandler;
 
   @Mock private ProjectsFilter projectsFilter;
@@ -50,7 +52,8 @@
             asDynamicSet(forwarder),
             changeChecker,
             projectsFilter,
-            new TestGroupChecker(true));
+            new TestGroupChecker(true),
+            INSTANCE_ID);
   }
 
   private DynamicSet<IndexEventForwarder> asDynamicSet(IndexEventForwarder forwarder) {
@@ -65,7 +68,7 @@
 
     eventHandler.onProjectIndexed("test_project");
     verify(forwarder, never())
-        .index(any(IndexProjectTask.class), eq(new ProjectIndexEvent("test_project")));
+        .index(any(IndexProjectTask.class), eq(new ProjectIndexEvent("test_project", INSTANCE_ID)));
   }
 
   @Test