Send/receive Event object instead of EventMessage

Event object contains instance id populated by Gerrit Core. Workaround
with EventMessage using sourceInstanceId field to recognise the event
source node is not needed anymore. Use Event object instead of
EventMessage.

Bug: Issue 14390
Change-Id: I0296b8b935faa30f935d9891d3286d21142e5aef
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 93ddd9e..b91d0d7 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -9,6 +9,6 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.4.0.2",
-        sha1 = "8a56300ce92c3e25b4669a0511b4c520b34851b2",
+        artifact = "com.gerritforge:events-broker:3.4.0.4",
+        sha1 = "8d361d863382290e33828116e65698190118d0f1",
     )
diff --git a/setup_local_env/configs/gerrit.config b/setup_local_env/configs/gerrit.config
index c4df06a..5dce6e6 100644
--- a/setup_local_env/configs/gerrit.config
+++ b/setup_local_env/configs/gerrit.config
@@ -5,6 +5,7 @@
     installModule = com.gerritforge.gerrit.eventbroker.BrokerApiModule # events-broker module to setup BrokerApi dynamic item
     installModule = com.googlesource.gerrit.plugins.multisite.Module # multi-site needs to be a gerrit lib
     installDbModule = com.googlesource.gerrit.plugins.multisite.GitModule
+    instanceId = $INSTANCE_ID
 [database]
     type = h2
     database = $LOCATION_TEST_SITE/db/ReviewDB
@@ -43,7 +44,7 @@
 [plugin "events-kafka"]
     sendAsync = true
     bootstrapServers = localhost:$BROKER_PORT
-    groupId = $GROUP_ID
+    groupId = $INSTANCE_ID
     numberOfSubscribers = 6
     securityProtocol = PLAINTEXT
     pollingIntervalMs = 1000
@@ -55,12 +56,12 @@
     pollingIntervalMs = 1000
     region = us-east-1
     endpoint = http://localhost:$BROKER_PORT
-    applicationName = $GROUP_ID
+    applicationName = $INSTANCE_ID
     initialPosition = trim_horizon
 [plugin "events-gcloud-pubsub"]
     numberOfSubscribers = 6
     gcloudProject="test-project"
-    subscriptionId=$GROUP_ID
+    subscriptionId=$INSTANCE_ID
     privateKeyLocation="not used in local mode"
 [plugin "metrics-reporter-prometheus"]
     prometheusBearerToken = token
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index 083ae66..f745482 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -70,7 +70,7 @@
     export GERRIT_HOSTNAME=$7
     export REPLICATION_HOSTNAME=$8
     export REMOTE_DEBUG_PORT=$9
-    export GROUP_ID=${10}
+    export INSTANCE_ID=${10}
     export REPLICATION_URL=$(get_replication_url $REPLICATION_LOCATION_TEST_SITE $REPLICATION_HOSTNAME)
 
     echo "Replacing variables for file $file and copying to $CONFIG_TEST_SITE/$file_name"
@@ -122,21 +122,21 @@
   GERRIT_SITE1_SSHD_PORT=$3
   CONFIG_TEST_SITE_1=$LOCATION_TEST_SITE_1/etc
   GERRIT_SITE1_REMOTE_DEBUG_PORT="5005"
-  GERRIT_SITE1_GROUP_ID="instance-1"
+  GERRIT_SITE1_INSTANCE_ID="instance-1"
   # SITE 2
   GERRIT_SITE2_HOSTNAME=$4
   GERRIT_SITE2_HTTPD_PORT=$5
   GERRIT_SITE2_SSHD_PORT=$6
   CONFIG_TEST_SITE_2=$LOCATION_TEST_SITE_2/etc
   GERRIT_SITE2_REMOTE_DEBUG_PORT="5006"
-  GERRIT_SITE2_GROUP_ID="instance-2"
+  GERRIT_SITE2_INSTANCE_ID="instance-2"
 
   # Set config SITE1
-  copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_GROUP_ID
+  copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_INSTANCE_ID
 
 
   # Set config SITE2
-  copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_GROUP_ID
+  copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_INSTANCE_ID
 }
 
 function is_docker_desktop {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/InstanceId.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/InstanceId.java
deleted file mode 100644
index 87306a2..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/InstanceId.java
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite;
-
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import com.google.inject.BindingAnnotation;
-import java.lang.annotation.Retention;
-
-@Retention(RUNTIME)
-@BindingAnnotation
-public @interface InstanceId {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
index 1ec8886..95c417f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
@@ -14,8 +14,8 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gerrit.server.util.PluginLogFile;
 import com.google.gerrit.server.util.SystemLog;
@@ -41,7 +41,7 @@
   }
 
   @Override
-  public void log(Direction direction, String topic, EventMessage event) {
+  public void log(Direction direction, String topic, Event event) {
     msgLog.info("{} {} {}", direction, topic, gson.toJson(event));
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
index b1f3e79..cc64b02 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
 
 public interface MessageLogger {
 
@@ -23,5 +23,5 @@
     CONSUME;
   }
 
-  public void log(Direction direction, String topic, EventMessage event);
+  public void log(Direction direction, String topic, Event event);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 44c4554..ad53449 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -16,11 +16,8 @@
 
 import com.gerritforge.gerrit.globalrefdb.validation.LibModule;
 import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.CreationException;
 import com.google.inject.Inject;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
 import com.google.inject.spi.Message;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
 import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
@@ -28,19 +25,9 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
 import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.Collection;
-import java.util.UUID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class Module extends LifecycleModule {
-  private static final Logger log = LoggerFactory.getLogger(Module.class);
   private Configuration config;
 
   @Inject
@@ -77,55 +64,4 @@
 
     install(new RouterModule());
   }
-
-  @Provides
-  @Singleton
-  @InstanceId
-  UUID getInstanceId(SitePaths sitePaths) throws IOException {
-    UUID instanceId = null;
-    Path dataDir = sitePaths.data_dir.resolve(Configuration.PLUGIN_NAME);
-    if (!dataDir.toFile().exists()) {
-      dataDir.toFile().mkdirs();
-    }
-    String serverIdFile =
-        dataDir.toAbsolutePath().toString() + "/" + Configuration.INSTANCE_ID_FILE;
-
-    instanceId = tryToLoadSavedInstanceId(serverIdFile);
-
-    if (instanceId == null) {
-      instanceId = UUID.randomUUID();
-      Files.createFile(Paths.get(serverIdFile));
-      try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(serverIdFile))) {
-        writer.write(instanceId.toString());
-      } catch (IOException e) {
-        log.warn(
-            String.format(
-                "Cannot write instance ID, a new one will be generated at instance restart. (%s)",
-                e.getMessage()));
-      }
-    }
-    return instanceId;
-  }
-
-  private UUID tryToLoadSavedInstanceId(String serverIdFile) {
-    if (Files.exists(Paths.get(serverIdFile))) {
-      try (BufferedReader br = Files.newBufferedReader(Paths.get(serverIdFile))) {
-        return UUID.fromString(br.readLine());
-      } catch (IOException e) {
-        log.warn(
-            String.format(
-                "Cannot read instance ID from path '%s', deleting the old file and generating a new ID: (%s)",
-                serverIdFile, e.getMessage()));
-        try {
-          Files.delete(Paths.get(serverIdFile));
-        } catch (IOException e1) {
-          log.warn(
-              String.format(
-                  "Cannot delete old instance ID file at path '%s' with instance ID while generating a new one: (%s)",
-                  serverIdFile, e1.getMessage()));
-        }
-      }
-    }
-    return null;
-  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index cff9899..f58efa7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -15,8 +15,8 @@
 package com.googlesource.gerrit.plugins.multisite.broker;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.base.Strings;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -24,12 +24,10 @@
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import org.slf4j.Logger;
@@ -41,25 +39,22 @@
   private final DynamicItem<BrokerApi> apiDelegate;
   private final BrokerMetrics metrics;
   private final MessageLogger msgLog;
-  private final UUID instanceId;
 
   @Inject
   public BrokerApiWrapper(
       @BrokerExecutor Executor executor,
       DynamicItem<BrokerApi> apiDelegate,
       BrokerMetrics metrics,
-      MessageLogger msgLog,
-      @InstanceId UUID instanceId) {
+      MessageLogger msgLog) {
     this.apiDelegate = apiDelegate;
     this.executor = executor;
     this.metrics = metrics;
     this.msgLog = msgLog;
-    this.instanceId = instanceId;
   }
 
-  public boolean send(String topic, Event event) {
+  public boolean sendSync(String topic, Event event) {
     try {
-      return send(topic, apiDelegate.get().newMessage(instanceId, event)).get();
+      return send(topic, event).get();
     } catch (Throwable e) {
       log.error(
           "Failed to publish event '{}' to topic '{}' - error: {} - stack trace: {}",
@@ -73,13 +68,21 @@
   }
 
   @Override
-  public ListenableFuture<Boolean> send(String topic, EventMessage message) {
+  public ListenableFuture<Boolean> send(String topic, Event message) {
     SettableFuture<Boolean> resultFuture = SettableFuture.create();
     if (Context.isForwardedEvent()) {
       resultFuture.set(true);
       return resultFuture;
     }
 
+    if (Strings.isNullOrEmpty(message.instanceId)) {
+      log.warn(
+          "Dropping event '{}' because event instance id cannot be null or empty",
+          message.toString());
+      resultFuture.set(true);
+      return resultFuture;
+    }
+
     ListenableFuture<Boolean> resfultF = apiDelegate.get().send(topic, message);
     Futures.addCallback(
         resfultF,
@@ -106,7 +109,7 @@
   }
 
   @Override
-  public void receiveAsync(String topic, Consumer<EventMessage> messageConsumer) {
+  public void receiveAsync(String topic, Consumer<Event> messageConsumer) {
     apiDelegate.get().receiveAsync(topic, messageConsumer);
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
index 2990264..e418da5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
@@ -17,6 +17,7 @@
 import com.google.common.cache.RemovalNotification;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.cache.CacheRemovalListener;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
@@ -28,21 +29,25 @@
   private final Executor executor;
   private final DynamicSet<CacheEvictionForwarder> forwarders;
   private final CachePatternMatcher matcher;
+  private final String instanceId;
 
   @Inject
   CacheEvictionHandler(
       DynamicSet<CacheEvictionForwarder> forwarders,
       @CacheExecutor Executor executor,
-      CachePatternMatcher matcher) {
+      CachePatternMatcher matcher,
+      @GerritInstanceId String instanceId) {
     this.forwarders = forwarders;
     this.executor = executor;
     this.matcher = matcher;
+    this.instanceId = instanceId;
   }
 
   @Override
   public void onRemoval(String plugin, String cache, RemovalNotification<K, V> notification) {
     if (!Context.isForwardedEvent() && !notification.wasEvicted() && matcher.matches(cache)) {
-      executor.execute(new CacheEvictionTask(new CacheEvictionEvent(cache, notification.getKey())));
+      executor.execute(
+          new CacheEvictionTask(new CacheEvictionEvent(cache, notification.getKey(), instanceId)));
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
index fdc6fc3..44f8417 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
@@ -19,6 +19,7 @@
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.events.ProjectEvent;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
@@ -33,15 +34,18 @@
   private final DynamicSet<ProjectListUpdateForwarder> forwarders;
   private final Executor executor;
   private final ProjectsFilter projectsFilter;
+  private final String instanceId;
 
   @Inject
   public ProjectListUpdateHandler(
       DynamicSet<ProjectListUpdateForwarder> forwarders,
       @CacheExecutor Executor executor,
-      ProjectsFilter filter) {
+      ProjectsFilter filter,
+      @GerritInstanceId String instanceId) {
     this.forwarders = forwarders;
     this.executor = executor;
     this.projectsFilter = filter;
+    this.instanceId = instanceId;
   }
 
   @Override
@@ -59,7 +63,8 @@
   private void process(ProjectEvent event, boolean delete) {
     if (!Context.isForwardedEvent() && projectsFilter.matches(event.getProjectName())) {
       executor.execute(
-          new ProjectListUpdateTask(new ProjectListUpdateEvent(event.getProjectName(), delete)));
+          new ProjectListUpdateTask(
+              new ProjectListUpdateEvent(event.getProjectName(), delete, instanceId)));
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index b37f434..7a20c7f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -14,20 +14,19 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.base.Strings;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
 import java.io.IOException;
-import java.util.UUID;
 import java.util.function.Consumer;
 
 public abstract class AbstractSubcriber {
@@ -44,13 +43,13 @@
   public AbstractSubcriber(
       ForwardedEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String gerritInstanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
       Configuration cfg) {
     this.eventRouter = eventRouter;
     this.droppedEventListeners = droppedEventListeners;
-    this.instanceId = instanceId.toString();
+    this.instanceId = gerritInstanceId;
     this.msgLog = msgLog;
     this.subscriberMetrics = subscriberMetrics;
     this.cfg = cfg;
@@ -59,12 +58,12 @@
 
   protected abstract EventTopic getTopic();
 
-  public Consumer<EventMessage> getConsumer() {
+  public Consumer<Event> getConsumer() {
     return this::processRecord;
   }
 
-  private void processRecord(EventMessage event) {
-    String sourceInstanceId = event.getHeader().sourceInstanceId;
+  private void processRecord(Event event) {
+    String sourceInstanceId = event.instanceId;
 
     if (Strings.isNullOrEmpty(sourceInstanceId) || instanceId.equals(sourceInstanceId)) {
       if (Strings.isNullOrEmpty(sourceInstanceId)) {
@@ -80,14 +79,14 @@
     } else {
       try {
         msgLog.log(Direction.CONSUME, topic, event);
-        eventRouter.route(event.getEvent());
+        eventRouter.route(event);
         subscriberMetrics.incrementSubscriberConsumedMessage();
         subscriberMetrics.updateReplicationStatusMetrics(event);
       } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Malformed event '%s'", event.getHeader());
+        logger.atSevere().withCause(e).log("Malformed event '%s'", event);
         subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
       } catch (PermissionBackendException | CacheNotFoundException e) {
-        logger.atSevere().withCause(e).log("Cannot handle message '%s'", event.getHeader());
+        logger.atSevere().withCause(e).log("Cannot handle message '%s'", event);
         subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
       }
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
index 5bbaee0..80f61f6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
@@ -15,14 +15,13 @@
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
-import java.util.UUID;
 
 @Singleton
 public class BatchIndexEventSubscriber extends AbstractSubcriber {
@@ -30,7 +29,7 @@
   public BatchIndexEventSubscriber(
       IndexEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
       Configuration cfg) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index eae66b4..5f57156 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -15,14 +15,13 @@
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.CacheEvictionEventRouter;
-import java.util.UUID;
 
 @Singleton
 public class CacheEvictionEventSubscriber extends AbstractSubcriber {
@@ -30,7 +29,7 @@
   public CacheEvictionEventSubscriber(
       CacheEvictionEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
       Configuration cfg) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
index 6f4680c..b34e02c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
 
 public interface DroppedEventListener {
   /**
@@ -22,5 +22,5 @@
    *
    * @param event information about the event.
    */
-  void onEventDropped(EventMessage event);
+  void onEventDropped(Event event);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index 49d470a..8809799 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -15,14 +15,13 @@
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
-import java.util.UUID;
 
 @Singleton
 public class IndexEventSubscriber extends AbstractSubcriber {
@@ -30,7 +29,7 @@
   public IndexEventSubscriber(
       IndexEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
       Configuration cfg) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index 6ff0969..239f3ac 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -15,14 +15,13 @@
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
-import java.util.UUID;
 
 @Singleton
 public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
@@ -30,7 +29,7 @@
   public ProjectUpdateEventSubscriber(
       ProjectListUpdateRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
       Configuration cfg) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index 20c355e..57a3f51 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -15,14 +15,13 @@
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
-import java.util.UUID;
 
 @Singleton
 public class StreamEventSubscriber extends AbstractSubcriber {
@@ -30,7 +29,7 @@
   public StreamEventSubscriber(
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
       Configuration cfg) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
index 1f264b0..f7d5f99 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
@@ -14,7 +14,6 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Project;
@@ -101,8 +100,7 @@
     subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
   }
 
-  public void updateReplicationStatusMetrics(EventMessage eventMessage) {
-    Event event = eventMessage.getEvent();
+  public void updateReplicationStatusMetrics(Event event) {
 
     if (event instanceof RefReplicationDoneEvent
         || event instanceof RefReplicatedEvent
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
index da19fed..c833bbc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
@@ -46,6 +46,6 @@
       return true;
     }
 
-    return broker.send(eventTopic.topic(cfg), event);
+    return broker.sendSync(eventTopic.topic(cfg), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
index 9ff4688..8a020fc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
@@ -35,6 +35,6 @@
 
   @Override
   public boolean send(Event event) {
-    return broker.send(EventTopic.STREAM_EVENT_TOPIC.topic(cfg), event);
+    return broker.sendSync(EventTopic.STREAM_EVENT_TOPIC.topic(cfg), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java
index 317eb76..065ef7e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java
@@ -21,8 +21,8 @@
 
   public int accountId;
 
-  public AccountIndexEvent(int accountId) {
-    super(TYPE);
+  public AccountIndexEvent(int accountId, String instanceId) {
+    super(TYPE, instanceId);
     this.accountId = accountId;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java
index 1c2185f..4444756 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java
@@ -22,8 +22,8 @@
   public String cacheName;
   public Object key;
 
-  public CacheEvictionEvent(String cacheName, Object key) {
-    super(TYPE);
+  public CacheEvictionEvent(String cacheName, Object key, String instanceId) {
+    super(TYPE, instanceId);
     this.cacheName = cacheName;
     this.key = key;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java
index ab4ddf4..64fbdfb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java
@@ -28,8 +28,8 @@
   public String targetSha;
   public boolean deleted;
 
-  public ChangeIndexEvent(String projectName, int changeId, boolean deleted) {
-    super(TYPE);
+  public ChangeIndexEvent(String projectName, int changeId, boolean deleted, String instanceId) {
+    super(TYPE, instanceId);
     this.projectName = projectName;
     this.changeId = changeId;
     this.deleted = deleted;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java
index 4981b2f..05ac198 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java
@@ -24,8 +24,8 @@
   public final String groupUUID;
   public final ObjectId sha1;
 
-  public GroupIndexEvent(String groupUUID, @Nullable ObjectId sha1) {
-    super(TYPE);
+  public GroupIndexEvent(String groupUUID, @Nullable ObjectId sha1, String instanceId) {
+    super(TYPE, instanceId);
     this.groupUUID = groupUUID;
     this.sha1 = sha1;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java
index ea2c3fb..2fdda72 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.multisite.forwarder.events;
 
 public abstract class IndexEvent extends MultiSiteEvent {
-  protected IndexEvent(String type) {
-    super(type);
+  protected IndexEvent(String type, String instanceId) {
+    super(type, instanceId);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java
index 404d168..f29204b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java
@@ -29,7 +29,8 @@
     register(ProjectListUpdateEvent.TYPE, ProjectListUpdateEvent.class);
   }
 
-  protected MultiSiteEvent(String type) {
+  protected MultiSiteEvent(String type, String instanceId) {
     super(type);
+    this.instanceId = instanceId;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java
index 8bdb7b5..954befb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java
@@ -21,8 +21,8 @@
 
   public String projectName;
 
-  public ProjectIndexEvent(String projectName) {
-    super(TYPE);
+  public ProjectIndexEvent(String projectName, String instanceId) {
+    super(TYPE, instanceId);
     this.projectName = projectName;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java
index d030e5b..0e18b27 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java
@@ -22,8 +22,8 @@
   public String projectName;
   public boolean remove;
 
-  public ProjectListUpdateEvent(String projectName, boolean remove) {
-    super(TYPE);
+  public ProjectListUpdateEvent(String projectName, boolean remove, String instanceId) {
+    super(TYPE, instanceId);
     this.projectName = projectName;
     this.remove = remove;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
index 08b26f7..983a07b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
@@ -19,6 +19,7 @@
 import com.google.gerrit.exceptions.StorageException;
 import com.google.gerrit.server.CommentsUtil;
 import com.google.gerrit.server.change.ChangeFinder;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.notedb.ChangeNotes;
 import com.google.gerrit.server.util.ManualRequestContext;
@@ -42,6 +43,7 @@
   private final OneOffRequestContext oneOffReqCtx;
   private final String changeId;
   private final ChangeFinder changeFinder;
+  private final String instanceId;
   private Optional<Long> computedChangeTs = Optional.empty();
   private Optional<ChangeNotes> changeNotes = Optional.empty();
 
@@ -55,12 +57,14 @@
       CommentsUtil commentsUtil,
       ChangeFinder changeFinder,
       OneOffRequestContext oneOffReqCtx,
+      @GerritInstanceId String instanceId,
       @Assisted String changeId) {
     this.changeFinder = changeFinder;
     this.gitRepoMgr = gitRepoMgr;
     this.commentsUtil = commentsUtil;
     this.oneOffReqCtx = oneOffReqCtx;
     this.changeId = changeId;
+    this.instanceId = instanceId;
   }
 
   @Override
@@ -69,7 +73,8 @@
     return getComputedChangeTs()
         .map(
             ts -> {
-              ChangeIndexEvent event = new ChangeIndexEvent(projectName, changeId, deleted);
+              ChangeIndexEvent event =
+                  new ChangeIndexEvent(projectName, changeId, deleted, instanceId);
               event.eventCreatedOn = ts;
               event.targetSha = getBranchTargetSha();
               return event;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index eef3e4b..02f1b1c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -21,6 +21,7 @@
 import com.google.gerrit.extensions.events.GroupIndexedListener;
 import com.google.gerrit.extensions.events.ProjectIndexedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
@@ -48,6 +49,7 @@
   private final ChangeCheckerImpl.Factory changeChecker;
   private final ProjectsFilter projectsFilter;
   private final GroupChecker groupChecker;
+  private final String instanceId;
 
   @Inject
   IndexEventHandler(
@@ -55,18 +57,20 @@
       DynamicSet<IndexEventForwarder> forwarders,
       ChangeCheckerImpl.Factory changeChecker,
       ProjectsFilter projectsFilter,
-      GroupChecker groupChecker) {
+      GroupChecker groupChecker,
+      @GerritInstanceId String instanceId) {
     this.forwarders = forwarders;
     this.executor = executor;
     this.changeChecker = changeChecker;
     this.projectsFilter = projectsFilter;
     this.groupChecker = groupChecker;
+    this.instanceId = instanceId;
   }
 
   @Override
   public void onAccountIndexed(int id) {
     if (!Context.isForwardedEvent()) {
-      IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id));
+      IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id, instanceId));
       if (queuedTasks.add(task)) {
         executor.execute(task);
       }
@@ -87,7 +91,8 @@
   public void onGroupIndexed(String groupUUID) {
     if (!Context.isForwardedEvent()) {
       IndexGroupTask task =
-          new IndexGroupTask(new GroupIndexEvent(groupUUID, groupChecker.getGroupHead(groupUUID)));
+          new IndexGroupTask(
+              new GroupIndexEvent(groupUUID, groupChecker.getGroupHead(groupUUID), instanceId));
       if (queuedTasks.add(task)) {
         executor.execute(task);
       }
@@ -97,7 +102,7 @@
   @Override
   public void onProjectIndexed(String projectName) {
     if (!Context.isForwardedEvent() && projectsFilter.matches(projectName)) {
-      IndexProjectTask task = new IndexProjectTask(new ProjectIndexEvent(projectName));
+      IndexProjectTask task = new IndexProjectTask(new ProjectIndexEvent(projectName, instanceId));
       if (queuedTasks.add(task)) {
         executor.execute(task);
       }
@@ -132,7 +137,7 @@
 
   private void executeDeleteChangeTask(int id) {
     if (!Context.isForwardedEvent()) {
-      IndexChangeTask task = new IndexChangeTask(new ChangeIndexEvent("", id, true));
+      IndexChangeTask task = new IndexChangeTask(new ChangeIndexEvent("", id, true, instanceId));
       if (queuedTasks.add(task)) {
         executor.execute(task);
       }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
deleted file mode 100644
index 2df60dd..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-// Copyright (C) 2017 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import com.google.gerrit.server.config.SitePaths;
-import java.io.File;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.UUID;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.mockito.Answers;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class ModuleTest {
-
-  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
-  private Configuration configMock;
-
-  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
-
-  private Module module;
-
-  @Before
-  public void setup() {
-    module = new Module(configMock);
-  }
-
-  @Test
-  public void shouldGetInstanceId() throws Exception {
-    File tmpSitePath = tempFolder.newFolder();
-    File tmpPluginDataPath =
-        Paths.get(tmpSitePath.getPath(), "data", Configuration.PLUGIN_NAME).toFile();
-    tmpPluginDataPath.mkdirs();
-    Path path = Paths.get(tmpPluginDataPath.getPath(), Configuration.INSTANCE_ID_FILE);
-    SitePaths sitePaths = new SitePaths(Paths.get(tmpSitePath.getPath()));
-    assertThat(path.toFile().exists()).isFalse();
-
-    UUID gotUUID1 = module.getInstanceId(sitePaths);
-    assertThat(gotUUID1).isNotNull();
-    assertThat(path.toFile().exists()).isTrue();
-
-    UUID gotUUID2 = module.getInstanceId(sitePaths);
-    assertThat(gotUUID1).isEqualTo(gotUUID2);
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
index bb63caf..7d1751c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -1,6 +1,8 @@
 package com.googlesource.gerrit.plugins.multisite.broker;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.only;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -10,8 +12,8 @@
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import java.util.UUID;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -24,20 +26,19 @@
   @Mock private BrokerApi brokerApi;
   @Mock Event event;
   @Mock MessageLogger msgLog;
-  private UUID instanceId = UUID.randomUUID();
   private String topic = "index";
 
   private BrokerApiWrapper objectUnderTest;
 
   @Before
   public void setUp() {
+    event.instanceId = "instance-id";
     objectUnderTest =
         new BrokerApiWrapper(
             MoreExecutors.directExecutor(),
             DynamicItem.itemOf(BrokerApi.class, brokerApi),
             brokerMetrics,
-            msgLog,
-            instanceId);
+            msgLog);
   }
 
   @Test
@@ -63,10 +64,26 @@
     when(brokerApi.send(any(), any()))
         .thenThrow(new RuntimeException("Unexpected runtime exception"));
     try {
-      objectUnderTest.send(topic, event);
+      objectUnderTest.sendSync(topic, event);
     } catch (RuntimeException e) {
       // expected
     }
     verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
   }
+
+  @Test
+  public void shouldSkipMessageSendingWhenInstanceIdIsNull() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = null;
+    objectUnderTest.send(topic, event);
+    verify(brokerApi, never()).send(any(), eq(event));
+  }
+
+  @Test
+  public void shouldSkipMessageSendingWhenInstanceIdIsEmpty() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = "";
+    objectUnderTest.send(topic, event);
+    verify(brokerApi, never()).send(any(), eq(event));
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
index 67be583..ce222d0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
@@ -36,9 +36,10 @@
 
   @Test
   public void shouldNotPublishAccountsCacheEvictions() {
-
+    String instanceId = "instance-id";
     final CacheEvictionHandler<String, String> handler =
-        new CacheEvictionHandler<>(DynamicSet.emptySet(), executorMock, defaultCacheMatcher);
+        new CacheEvictionHandler<>(
+            DynamicSet.emptySet(), executorMock, defaultCacheMatcher, instanceId);
 
     handler.onRemoval(
         "test", "accounts", RemovalNotification.create("test", "accounts", RemovalCause.EXPLICIT));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
index 4263ddb..c8216bd 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
@@ -41,6 +41,8 @@
 @RunWith(MockitoJUnitRunner.class)
 public class ProjectListUpdateHandlerTest {
 
+  private static final String INSTANCE_ID = "instance-id";
+
   private ProjectListUpdateHandler handler;
 
   @Mock private ProjectListUpdateForwarder forwarder;
@@ -51,7 +53,7 @@
     when(projectsFilter.matches(any(String.class))).thenReturn(true);
     handler =
         new ProjectListUpdateHandler(
-            asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter);
+            asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter, INSTANCE_ID);
   }
 
   private DynamicSet<ProjectListUpdateForwarder> asDynamicSet(
@@ -69,7 +71,8 @@
     handler.onNewProjectCreated(event);
     verify(forwarder)
         .updateProjectList(
-            any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, false)));
+            any(ProjectListUpdateTask.class),
+            eq(new ProjectListUpdateEvent(projectName, false, INSTANCE_ID)));
   }
 
   @Test
@@ -80,7 +83,8 @@
     handler.onProjectDeleted(event);
     verify(forwarder)
         .updateProjectList(
-            any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, true)));
+            any(ProjectListUpdateTask.class),
+            eq(new ProjectListUpdateEvent(projectName, true, INSTANCE_ID)));
   }
 
   @Test
@@ -101,18 +105,22 @@
     handler.onNewProjectCreated(event);
     verify(forwarder, never())
         .updateProjectList(
-            any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, true)));
+            any(ProjectListUpdateTask.class),
+            eq(new ProjectListUpdateEvent(projectName, true, INSTANCE_ID)));
   }
 
   @Test
   public void testProjectUpdateTaskToString() throws Exception {
     String projectName = "someProjectName";
     ProjectListUpdateTask task =
-        handler.new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, false));
+        handler
+        .new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, false, INSTANCE_ID));
     assertThat(task.toString())
         .isEqualTo(String.format("Update project list in target instance: add '%s'", projectName));
 
-    task = handler.new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, true));
+    task =
+        handler
+        .new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, true, INSTANCE_ID));
     assertThat(task.toString())
         .isEqualTo(
             String.format("Update project list in target instance: remove '%s'", projectName));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
index c27dea2..aa17c36 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
@@ -19,18 +19,17 @@
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.base.Suppliers;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.server.data.RefUpdateAttribute;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.RefUpdatedEvent;
 import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
 import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationSucceededEvent;
 import java.net.URISyntaxException;
 import java.util.Optional;
-import java.util.UUID;
 import org.eclipse.jgit.transport.URIish;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,11 +47,9 @@
   @Mock private ProjectVersionLogger verLogger;
   @Mock private ProjectVersionRefUpdate projectVersionRefUpdate;
   private SubscriberMetrics metrics;
-  private EventMessage.Header msgHeader;
 
   @Before
   public void setup() throws Exception {
-    msgHeader = new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID());
     metrics = new SubscriberMetrics(metricMaker, projectVersionRefUpdate, verLogger);
   }
 
@@ -64,7 +61,7 @@
     when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
         .thenReturn(globalRefDbVersion);
 
-    EventMessage eventMessage = new EventMessage(msgHeader, newRefUpdateEvent());
+    Event eventMessage = newRefUpdateEvent();
 
     metrics.updateReplicationStatusMetrics(eventMessage);
 
@@ -80,7 +77,7 @@
     when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
         .thenReturn(globalRefDbVersion);
 
-    EventMessage eventMessage = new EventMessage(msgHeader, newRefUpdateEvent());
+    Event eventMessage = newRefUpdateEvent();
 
     metrics.updateReplicationStatusMetrics(eventMessage);
 
@@ -99,7 +96,7 @@
     when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
         .thenReturn(globalRefDbVersion);
 
-    EventMessage refUpdateEventMessage = new EventMessage(msgHeader, newRefUpdateEvent());
+    Event refUpdateEventMessage = newRefUpdateEvent();
     metrics.updateReplicationStatusMetrics(refUpdateEventMessage);
 
     assertThat(metrics.getReplicationStatus(A_TEST_PROJECT_NAME)).isEqualTo(replicationLagSecs);
@@ -108,7 +105,7 @@
     when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
         .thenReturn(Optional.empty());
 
-    EventMessage projectDeleteEventMessage = new EventMessage(msgHeader, projectDeletionSuccess());
+    Event projectDeleteEventMessage = projectDeletionSuccess();
     metrics.updateReplicationStatusMetrics(projectDeleteEventMessage);
 
     verify(verLogger).logDeleted(A_TEST_PROJECT_NAME_KEY);
@@ -117,7 +114,7 @@
   @Test
   public void shouldNotLogUponProjectDeletionSuccessWhenSubscriberMetricsDoNotExist()
       throws Exception {
-    EventMessage eventMessage = new EventMessage(msgHeader, projectDeletionSuccess());
+    Event eventMessage = projectDeletionSuccess();
     when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
         .thenReturn(Optional.empty());
 
@@ -131,7 +128,7 @@
 
   @Test
   public void shouldNotLogUponProjectDeletionSuccessWhenLocalVersionStillExists() throws Exception {
-    EventMessage eventMessage = new EventMessage(msgHeader, projectDeletionSuccess());
+    Event eventMessage = projectDeletionSuccess();
     Optional<Long> anyRefVersionValue = Optional.of(System.currentTimeMillis() / 1000);
     when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
         .thenReturn(anyRefVersionValue);
@@ -151,7 +148,7 @@
     when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
         .thenReturn(globalRefDbVersion);
 
-    EventMessage eventMessage = new EventMessage(msgHeader, newRefUpdateEvent());
+    Event eventMessage = newRefUpdateEvent();
 
     metrics.updateReplicationStatusMetrics(eventMessage);
 
@@ -160,7 +157,7 @@
 
     when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
         .thenReturn(Optional.empty());
-    EventMessage projectDeleteEvent = new EventMessage(msgHeader, projectDeletionSuccess());
+    Event projectDeleteEvent = projectDeletionSuccess();
 
     metrics.updateReplicationStatusMetrics(projectDeleteEvent);
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
index bf5c5d9..50507e7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
@@ -31,6 +31,7 @@
 @RunWith(MockitoJUnitRunner.class)
 public class CacheEvictionEventRouterTest {
 
+  private static final String INSTANCE_ID = "instance-id";
   private CacheEvictionEventRouter router;
   @Mock private ForwardedCacheEvictionHandler cacheEvictionHandler;
 
@@ -41,7 +42,7 @@
 
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_CacheEviction() throws Exception {
-    final CacheEvictionEvent event = new CacheEvictionEvent("cache", "key");
+    final CacheEvictionEvent event = new CacheEvictionEvent("cache", "key", INSTANCE_ID);
     router.route(event);
 
     verify(cacheEvictionHandler).evict(CacheEntry.from(event.cacheName, event.key));
@@ -50,7 +51,7 @@
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_ProjectCacheEvictionWithSlash()
       throws Exception {
-    final CacheEvictionEvent event = new CacheEvictionEvent("cache", "some/project");
+    final CacheEvictionEvent event = new CacheEvictionEvent("cache", "some/project", INSTANCE_ID);
     router.route(event);
 
     verify(cacheEvictionHandler).evict(CacheEntry.from(event.cacheName, event.key));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
index f22cb14..8efa2ed 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
@@ -44,7 +44,7 @@
 
 @RunWith(MockitoJUnitRunner.class)
 public class IndexEventRouterTest {
-
+  private static final String INSTANCE_ID = "instance-id";
   private IndexEventRouter router;
   @Mock private ForwardedIndexAccountHandler indexAccountHandler;
   @Mock private ForwardedIndexChangeHandler indexChangeHandler;
@@ -66,7 +66,7 @@
 
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_AccountIndex() throws Exception {
-    final AccountIndexEvent event = new AccountIndexEvent(1);
+    final AccountIndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
     router.route(event);
 
     verify(indexAccountHandler)
@@ -80,7 +80,7 @@
 
     StreamEventRouter streamEventRouter = new StreamEventRouter(forwardedEventHandler, router);
 
-    final AccountIndexEvent event = new AccountIndexEvent(1);
+    final AccountIndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
     router.route(event);
 
     verify(indexAccountHandler)
@@ -96,7 +96,7 @@
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_GroupIndex() throws Exception {
     final String groupId = "12";
-    final GroupIndexEvent event = new GroupIndexEvent(groupId, ObjectId.zeroId());
+    final GroupIndexEvent event = new GroupIndexEvent(groupId, ObjectId.zeroId(), INSTANCE_ID);
     router.route(event);
 
     verify(indexGroupHandler)
@@ -108,7 +108,7 @@
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_ProjectIndex() throws Exception {
     final String projectName = "projectName";
-    final ProjectIndexEvent event = new ProjectIndexEvent(projectName);
+    final ProjectIndexEvent event = new ProjectIndexEvent(projectName, INSTANCE_ID);
     router.route(event);
 
     verify(indexProjectHandler)
@@ -119,7 +119,7 @@
 
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_ChangeIndex() throws Exception {
-    final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, false);
+    final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, false, INSTANCE_ID);
     router.route(event);
 
     verify(indexChangeHandler)
@@ -133,7 +133,7 @@
 
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_ChangeIndexDelete() throws Exception {
-    final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, true);
+    final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, true, INSTANCE_ID);
     router.route(event);
 
     verify(indexChangeHandler)
@@ -147,7 +147,7 @@
 
   @Test
   public void routerShouldFailForNotRecognisedEvents() throws Exception {
-    final IndexEvent newEventType = new IndexEvent("new-type") {};
+    final IndexEvent newEventType = new IndexEvent("new-type", INSTANCE_ID) {};
 
     assertThrows(UnsupportedOperationException.class, () -> router.route(newEventType));
     verifyZeroInteractions(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
index 93daf92..8a21c39 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
@@ -38,7 +38,8 @@
 
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_ProjectListUpdate() throws Exception {
-    final ProjectListUpdateEvent event = new ProjectListUpdateEvent("project", false);
+    String instanceId = "instance-id";
+    final ProjectListUpdateEvent event = new ProjectListUpdateEvent("project", false, instanceId);
     router.route(event);
 
     verify(projectListUpdateHandler).update(event);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
index 74d8917..6aab497 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
@@ -70,7 +70,7 @@
   public static class TestEvent extends MultiSiteEvent {
 
     protected TestEvent() {
-      super("test");
+      super("test", "instance-id");
     }
   }
 
@@ -92,7 +92,7 @@
   @Test
   public void shouldSendEventToBrokerFromGenericSourceThread() {
     brokerForwarder.send(newForwarderTask(), testTopic, testEvent);
-    verify(brokerMock).send(eq(testTopicName), eq(testEvent));
+    verify(brokerMock).sendSync(eq(testTopicName), eq(testEvent));
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
index fb2a50b..b3d46cf 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.Sets;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.config.GerritConfig;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.extensions.registration.RegistrationHandle;
 import com.google.gerrit.server.cache.CacheRemovalListener;
@@ -121,9 +122,10 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
   public void shouldEvictProjectCache() throws Exception {
     objectUnderTest.route(
-        new CacheEvictionEvent(ProjectCacheImpl.CACHE_NAME, gson.toJson(project)));
+        new CacheEvictionEvent(ProjectCacheImpl.CACHE_NAME, gson.toJson(project), "instance-id"));
     evictionsCacheTracker.waitForExpectedEvictions();
 
     assertThat(evictionsCacheTracker.trackedEvictionsFor(ProjectCacheImpl.CACHE_NAME))
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
index e64759c..452a0d2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
@@ -104,7 +104,9 @@
   public void changeIsStillIndexedEvenWhenOutdated() throws Exception {
     setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_OUTDATED);
     handler.index(
-        TEST_CHANGE_ID, Operation.INDEX, Optional.of(new ChangeIndexEvent("foo", 1, false)));
+        TEST_CHANGE_ID,
+        Operation.INDEX,
+        Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id")));
     verify(indexerMock, times(1)).index(any(Change.class));
   }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
index 65d8d20..b49dbfa 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
@@ -143,6 +143,6 @@
   }
 
   private Optional<GroupIndexEvent> groupIndexEvent(String uuid) {
-    return Optional.of(new GroupIndexEvent(uuid, null));
+    return Optional.of(new GroupIndexEvent(uuid, null, "instance-id"));
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java
index 9893ce7..2412c79 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java
@@ -34,6 +34,7 @@
 @RunWith(MockitoJUnitRunner.class)
 public class ForwardedProjectListUpdateHandlerTest {
 
+  private static final String INSTANCE_ID = "instance-id";
   private static final String PROJECT_NAME = "someProject";
   private static final String SOME_MESSAGE = "someMessage";
   private static final Project.NameKey PROJECT_KEY = Project.nameKey(PROJECT_NAME);
@@ -48,13 +49,13 @@
 
   @Test
   public void testSuccessfulAdd() throws Exception {
-    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false));
+    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID));
     verify(projectCacheMock).onCreateProject(PROJECT_KEY);
   }
 
   @Test
   public void testSuccessfulRemove() throws Exception {
-    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true));
+    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true, INSTANCE_ID));
     verify(projectCacheMock).remove(PROJECT_KEY);
   }
 
@@ -72,7 +73,7 @@
         .onCreateProject(PROJECT_KEY);
 
     assertThat(Context.isForwardedEvent()).isFalse();
-    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false));
+    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID));
     assertThat(Context.isForwardedEvent()).isFalse();
 
     verify(projectCacheMock).onCreateProject(PROJECT_KEY);
@@ -92,7 +93,7 @@
         .remove(PROJECT_KEY);
 
     assertThat(Context.isForwardedEvent()).isFalse();
-    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true));
+    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true, INSTANCE_ID));
     assertThat(Context.isForwardedEvent()).isFalse();
 
     verify(projectCacheMock).remove(PROJECT_KEY);
@@ -113,7 +114,7 @@
     RuntimeException thrown =
         assertThrows(
             RuntimeException.class,
-            () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false)));
+            () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID)));
     assertThat(thrown).hasMessageThat().isEqualTo(SOME_MESSAGE);
     assertThat(Context.isForwardedEvent()).isFalse();
 
@@ -135,7 +136,7 @@
     RuntimeException thrown =
         assertThrows(
             RuntimeException.class,
-            () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true)));
+            () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true, INSTANCE_ID)));
     assertThat(thrown).hasMessageThat().isEqualTo(SOME_MESSAGE);
     assertThat(Context.isForwardedEvent()).isFalse();
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
index 6403cce..09f8f59 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
@@ -101,7 +101,7 @@
   }
 
   private Optional<GroupIndexEvent> groupIndexEvent(String uuid, @Nullable ObjectId sha1) {
-    return Optional.of(new GroupIndexEvent(uuid, sha1));
+    return Optional.of(new GroupIndexEvent(uuid, sha1, "instance-id"));
   }
 
   private void setCommitExistsInRepo(boolean commitExists) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java
index 77d00ac..099f1dd 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java
@@ -24,13 +24,14 @@
 import org.junit.Test;
 
 public class GroupEventIndexTest {
+  private static final String INSTANCE_ID = "instance-id";
   private static final Gson gson = new EventGsonProvider().get();
 
   @Test
   public void groupEventIndexRoundTripWithSha1() {
     String aGroupUUID = UUID.randomUUID().toString();
     ObjectId anObjectId = ObjectId.fromString("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef");
-    GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, anObjectId);
+    GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, anObjectId, INSTANCE_ID);
 
     assertThat(gson.fromJson(gson.toJson(original), GroupIndexEvent.class)).isEqualTo(original);
   }
@@ -38,7 +39,7 @@
   @Test
   public void groupEventIndexRoundTripWithoutSha1() {
     String aGroupUUID = UUID.randomUUID().toString();
-    GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, null);
+    GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, null, INSTANCE_ID);
 
     assertThat(gson.fromJson(gson.toJson(original), GroupIndexEvent.class)).isEqualTo(original);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
index 660a302..3fd13c6 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
@@ -36,6 +36,8 @@
 @RunWith(MockitoJUnitRunner.class)
 public class IndexEventHandlerTest {
 
+  private static final String INSTANCE_ID = "instance-id";
+
   private IndexEventHandler eventHandler;
 
   @Mock private ProjectsFilter projectsFilter;
@@ -50,7 +52,8 @@
             asDynamicSet(forwarder),
             changeChecker,
             projectsFilter,
-            new TestGroupChecker(true));
+            new TestGroupChecker(true),
+            INSTANCE_ID);
   }
 
   private DynamicSet<IndexEventForwarder> asDynamicSet(IndexEventForwarder forwarder) {
@@ -65,7 +68,7 @@
 
     eventHandler.onProjectIndexed("test_project");
     verify(forwarder, never())
-        .index(any(IndexProjectTask.class), eq(new ProjectIndexEvent("test_project")));
+        .index(any(IndexProjectTask.class), eq(new ProjectIndexEvent("test_project", INSTANCE_ID)));
   }
 
   @Test