Merge branch 'stable-3.0'

* stable-3.0:
  Remove kafka related code from multi-site
  Fix NPE when removing refs
  Leverage BrokerApi interface

Change-Id: Icd5ba630ac6b2fc2f4fe83dcf13510c83415860e
diff --git a/BUILD b/BUILD
index 6736634..eb2eeba 100644
--- a/BUILD
+++ b/BUILD
@@ -17,8 +17,8 @@
     ],
     resources = glob(["src/main/resources/**/*"]),
     deps = [
-        "@kafka-client//jar",
         "@global-refdb//jar",
+        "@events-broker//jar",
         "//plugins/replication",
     ],
 )
@@ -43,9 +43,7 @@
     exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
         ":multi-site__plugin",
         "@wiremock//jar",
-        "@kafka-client//jar",
-        "@testcontainers-kafka//jar",
-        "//lib/testcontainers",
         "@global-refdb//jar",
+        "@events-broker//jar",
     ],
 )
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index ca03c9d..7e8fee4 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -8,19 +8,13 @@
     )
 
     maven_jar(
-        name = "kafka-client",
-        artifact = "org.apache.kafka:kafka-clients:2.1.0",
-        sha1 = "34d9983705c953b97abb01e1cd04647f47272fe5",
-    )
-
-    maven_jar(
-        name = "testcontainers-kafka",
-        artifact = "org.testcontainers:kafka:1.11.3",
-        sha1 = "932d1baa2541f218b1b44a0546ae83d530011468",
-    )
-
-    maven_jar(
         name = "global-refdb",
         artifact = "com.gerritforge:global-refdb:3.1.0-rc1",
         sha1 = "61fc8defaed9c364e6bfa101563e434fcc70038f",
     )
+
+    maven_jar(
+        name = "events-broker",
+        artifact = "com.gerritforge:events-broker:3.1.2",
+        sha1 = "b4ed20d7be8a7023111c511ca5dc00ec18e9313a",
+    )
diff --git a/setup_local_env/configs/gerrit.config b/setup_local_env/configs/gerrit.config
index f0f56dc..3cdf940 100644
--- a/setup_local_env/configs/gerrit.config
+++ b/setup_local_env/configs/gerrit.config
@@ -36,4 +36,13 @@
 [plugins]
     allowRemoteAdmin = true
 [plugin "websession-flatfile"]
-    directory = $FAKE_NFS
\ No newline at end of file
+    directory = $FAKE_NFS
+[plugin "kafka-events"]
+    bootstrapServers = localhost:$KAFKA_PORT
+    groupId = $KAFKA_GROUP_ID
+    numberOfSubscribers = 4
+    securityProtocol = PLAINTEXT
+    pollingIntervalMs = 1000
+    enableAutoCommit = true
+    autoCommitIntervalMs = 1000
+    autoOffsetReset = latest
diff --git a/setup_local_env/configs/multi-site.config b/setup_local_env/configs/multi-site.config
index 5287f61..442c37e 100644
--- a/setup_local_env/configs/multi-site.config
+++ b/setup_local_env/configs/multi-site.config
@@ -1,17 +1,9 @@
 [index]
 	maxTries = 50
 	retryInterval = 30000
-[kafka]
-	bootstrapServers = localhost:$KAFKA_PORT
-	securityProtocol = PLAINTEXT
-	indexEventTopic = gerrit_index
-	streamEventTopic = gerrit_stream
-	projectListEventTopic = gerrit_list_project
-	cacheEventTopic = gerrit_cache_eviction
-[kafka "subscriber"]
-	pollingIntervalMs = 1000
-	KafkaProp-enableAutoCommit = true
-	KafkaProp-autoCommitIntervalMs = 1000
-	KafkaProp-autoOffsetReset = latest
-[ref-database "zookeeper"]
-	connectString = localhost:$ZK_PORT
+
+[broker]
+        indexEventTopic = gerrit_index
+        streamEventTopic = gerrit_stream
+        projectListEventTopic = gerrit_list_project
+        cacheEventTopic = gerrit_cache_eviction
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index faa6548..bcbaf9c 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -62,6 +62,7 @@
 		export GERRIT_HOSTNAME=$7
 		export REPLICATION_HOSTNAME=$8
 		export REMOTE_DEBUG_PORT=$9
+		export KAFKA_GROUP_ID=${10}
 		export REPLICATION_URL=$(get_replication_url $REPLICATION_LOCATION_TEST_SITE $REPLICATION_HOSTNAME)
 
 		echo "Replacing variables for file $file and copying to $CONFIG_TEST_SITE/$file_name"
@@ -105,19 +106,21 @@
 	GERRIT_SITE1_SSHD_PORT=$3
 	CONFIG_TEST_SITE_1=$LOCATION_TEST_SITE_1/etc
 	GERRIT_SITE1_REMOTE_DEBUG_PORT="5005"
+	GERRIT_SITE1_KAFKA_GROUP_ID="instance-1"
 	# SITE 2
 	GERRIT_SITE2_HOSTNAME=$4
 	GERRIT_SITE2_HTTPD_PORT=$5
 	GERRIT_SITE2_SSHD_PORT=$6
 	CONFIG_TEST_SITE_2=$LOCATION_TEST_SITE_2/etc
 	GERRIT_SITE2_REMOTE_DEBUG_PORT="5006"
+	GERRIT_SITE2_KAFKA_GROUP_ID="instance-2"
 
 	# Set config SITE1
-	copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT
+	copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_KAFKA_GROUP_ID
 
 
 	# Set config SITE2
-	copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT
+	copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_KAFKA_GROUP_ID
 }
 
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 14c6be6..ebe28d4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -18,6 +18,7 @@
 import static com.google.common.base.Suppliers.ofInstance;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Multimap;
@@ -62,6 +63,7 @@
   private final Supplier<Index> index;
   private final Supplier<SharedRefDatabase> sharedRefDb;
   private final Supplier<Collection<Message>> replicationConfigValidation;
+  private final Supplier<Broker> broker;
   private final Config multiSiteConfig;
 
   @Inject
@@ -78,6 +80,7 @@
     event = memoize(() -> new Event(lazyMultiSiteCfg));
     index = memoize(() -> new Index(lazyMultiSiteCfg));
     sharedRefDb = memoize(() -> new SharedRefDatabase(lazyMultiSiteCfg));
+    broker = memoize(() -> new Broker(lazyMultiSiteCfg));
   }
 
   public Config getMultiSiteConfig() {
@@ -100,6 +103,10 @@
     return index.get();
   }
 
+  public Broker broker() {
+    return broker.get();
+  }
+
   public Collection<Message> validate() {
     return replicationConfigValidation.get();
   }
@@ -278,6 +285,19 @@
     }
   }
 
+  public static class Broker {
+    static final String BROKER_SECTION = "broker";
+    private final Config cfg;
+
+    Broker(Supplier<Config> cfgSupplier) {
+      cfg = cfgSupplier.get();
+    }
+
+    public String getTopic(String topicKey, String defValue) {
+      return MoreObjects.firstNonNull(cfg.getString(BROKER_SECTION, null, topicKey), defValue);
+    }
+  }
+
   static boolean getBoolean(
       Supplier<Config> cfg, String section, String subsection, String name, boolean defaultValue) {
     try {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
index db7dd63..7c88655 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
@@ -14,12 +14,14 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.gerrit.extensions.systemstatus.ServerInformation;
 import com.google.gerrit.server.util.PluginLogFile;
 import com.google.gerrit.server.util.SystemLog;
+import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import org.apache.log4j.PatternLayout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,15 +30,18 @@
 public class Log4jMessageLogger extends PluginLogFile implements MessageLogger {
   private static final String LOG_NAME = "message_log";
   private final Logger msgLog;
+  private final Gson gson;
 
   @Inject
-  public Log4jMessageLogger(SystemLog systemLog, ServerInformation serverInfo) {
+  public Log4jMessageLogger(
+      SystemLog systemLog, ServerInformation serverInfo, EventGsonProvider gsonProvider) {
     super(systemLog, serverInfo, LOG_NAME, new PatternLayout("[%d{ISO8601}] [%t] %-5p : %m%n"));
-    msgLog = LoggerFactory.getLogger(LOG_NAME);
+    this.msgLog = LoggerFactory.getLogger(LOG_NAME);
+    this.gson = gsonProvider.get();
   }
 
   @Override
-  public void log(Direction direction, SourceAwareEventWrapper event) {
-    msgLog.info("{} Header[{}] Body[{}]", direction, event.getHeader(), event.getBody());
+  public void log(Direction direction, String topic, EventMessage event) {
+    msgLog.info("{} {} {}", direction, topic, gson.toJson(event));
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jSharedRefLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jSharedRefLogger.java
index ae5d96c..6a73d19 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jSharedRefLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jSharedRefLogger.java
@@ -63,17 +63,19 @@
           RevWalk walk = new RevWalk(repository)) {
         GitPerson committer = null;
         String commitMessage = null;
-        int objectType = walk.parseAny(newRefValue).getType();
-        switch (objectType) {
-          case OBJ_COMMIT:
-            RevCommit commit = walk.parseCommit(newRefValue);
-            committer = CommonConverters.toGitPerson(commit.getCommitterIdent());
-            commitMessage = commit.getShortMessage();
-            break;
-          case OBJ_BLOB:
-            break;
-          default:
-            throw new IncorrectObjectTypeException(newRefValue, Constants.typeString(objectType));
+        if (newRefValue != null) {
+          int objectType = walk.parseAny(newRefValue).getType();
+          switch (objectType) {
+            case OBJ_COMMIT:
+              RevCommit commit = walk.parseCommit(newRefValue);
+              committer = CommonConverters.toGitPerson(commit.getCommitterIdent());
+              commitMessage = commit.getShortMessage();
+              break;
+            case OBJ_BLOB:
+              break;
+            default:
+              throw new IncorrectObjectTypeException(newRefValue, Constants.typeString(objectType));
+          }
         }
         sharedRefDBLog.info(
             gson.toJson(
@@ -81,7 +83,7 @@
                     project,
                     currRef.getName(),
                     currRef.getObjectId().getName(),
-                    newRefValue.getName(),
+                    newRefValue == null ? ObjectId.zeroId().name() : newRefValue.getName(),
                     committer,
                     commitMessage)));
       } catch (IOException e) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
index 8b07115..b1f3e79 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
 
 public interface MessageLogger {
 
@@ -23,5 +23,5 @@
     CONSUME;
   }
 
-  public void log(Direction direction, SourceAwareEventWrapper event);
+  public void log(Direction direction, String topic, EventMessage event);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index b796f42..920cf41 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -15,9 +15,7 @@
 package com.googlesource.gerrit.plugins.multisite;
 
 import com.gerritforge.gerrit.globalrefdb.GlobalRefDatabase;
-import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.CreationException;
@@ -30,9 +28,9 @@
 import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
 import com.googlesource.gerrit.plugins.multisite.event.EventModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
 import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
-import com.googlesource.gerrit.plugins.multisite.validation.ProjectDeletedSharedDbCleanup;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.NoopSharedRefDatabase;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
@@ -73,7 +71,10 @@
     listener().to(Log4jMessageLogger.class);
     bind(MessageLogger.class).to(Log4jMessageLogger.class);
 
+    install(brokerModule);
+
     install(new ForwarderModule());
+    install(new BrokerForwarderModule());
 
     if (config.cache().synchronize()) {
       install(new CacheModule());
@@ -85,14 +86,7 @@
       install(new IndexModule());
     }
 
-    install(brokerModule);
-
     install(new RouterModule());
-
-    if (config.getSharedRefDb().isEnabled()) {
-      DynamicSet.bind(binder(), ProjectDeletedListener.class)
-          .to(ProjectDeletedSharedDbCleanup.class);
-    }
   }
 
   @Provides
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index f0ad970..bd90be4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -15,24 +15,17 @@
 package com.googlesource.gerrit.plugins.multisite;
 
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
-import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.Inject;
-import com.google.inject.Scopes;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectDeletedSharedDbCleanup;
 
 public class PluginModule extends LifecycleModule {
   private Configuration config;
-  private KafkaBrokerModule kafkaBrokerModule;
 
   @Inject
-  public PluginModule(Configuration config, KafkaBrokerModule kafkaBrokerModule) {
+  public PluginModule(Configuration config) {
     this.config = config;
-    this.kafkaBrokerModule = kafkaBrokerModule;
   }
 
   @Override
@@ -42,8 +35,5 @@
       DynamicSet.bind(binder(), ProjectDeletedListener.class)
           .to(ProjectDeletedSharedDbCleanup.class);
     }
-    DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
-    listener().to(KafkaBrokerApi.class);
-    install(kafkaBrokerModule);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
deleted file mode 100644
index 35350e9..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker;
-
-import com.google.gerrit.server.events.Event;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import java.util.function.Consumer;
-
-/** API for sending/receiving events through a message Broker. */
-public interface BrokerApi {
-
-  /**
-   * Send an event to a topic.
-   *
-   * @param topic
-   * @param event
-   * @return true if the event was successfully sent. False otherwise.
-   */
-  boolean send(String topic, Event event);
-
-  /**
-   * Receive asynchronously events from a topic.
-   *
-   * @param topic
-   * @param eventConsumer
-   */
-  void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer);
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
index 19cf0f7..b38ac6b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
@@ -14,17 +14,39 @@
 
 package com.googlesource.gerrit.plugins.multisite.broker;
 
-import com.google.gerrit.server.events.Event;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import java.util.Set;
 import java.util.function.Consumer;
 
 public class BrokerApiNoOp implements BrokerApi {
+  private final Set<TopicSubscriber> topicSubscribers;
+
+  @Inject
+  public BrokerApiNoOp() {
+    topicSubscribers = Sets.newHashSet();
+  }
 
   @Override
-  public boolean send(String topic, Event event) {
+  public boolean send(String topic, EventMessage event) {
     return true;
   }
 
   @Override
-  public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {}
+  public void receiveAsync(String topic, Consumer<EventMessage> eventConsumer) {
+    topicSubscribers.add(TopicSubscriber.topicSubscriber(topic, eventConsumer));
+  }
+
+  @Override
+  public void disconnect() {
+    topicSubscribers.clear();
+  }
+
+  @Override
+  public Set<TopicSubscriber> topicSubscribers() {
+    return topicSubscribers;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index e83fe53..b266eb0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -14,29 +14,53 @@
 
 package com.googlesource.gerrit.plugins.multisite.broker;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
+import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
+import java.util.Set;
+import java.util.UUID;
 import java.util.function.Consumer;
 
 public class BrokerApiWrapper implements BrokerApi {
   private final DynamicItem<BrokerApi> apiDelegate;
   private final BrokerMetrics metrics;
+  private final MessageLogger msgLog;
+  private final UUID instanceId;
 
   @Inject
-  public BrokerApiWrapper(DynamicItem<BrokerApi> apiDelegate, BrokerMetrics metrics) {
+  public BrokerApiWrapper(
+      DynamicItem<BrokerApi> apiDelegate,
+      BrokerMetrics metrics,
+      MessageLogger msgLog,
+      @InstanceId UUID instanceId) {
     this.apiDelegate = apiDelegate;
     this.metrics = metrics;
+    this.msgLog = msgLog;
+    this.instanceId = instanceId;
+  }
+
+  public boolean send(String topic, Event event) {
+    return send(topic, apiDelegate.get().newMessage(instanceId, event));
   }
 
   @Override
-  public boolean send(String topic, Event event) {
+  public boolean send(String topic, EventMessage message) {
+    if (Context.isForwardedEvent()) {
+      return true;
+    }
     boolean succeeded = false;
     try {
-      succeeded = apiDelegate.get().send(topic, event);
+      succeeded = apiDelegate.get().send(topic, message);
     } finally {
       if (succeeded) {
+        msgLog.log(Direction.PUBLISH, topic, message);
         metrics.incrementBrokerPublishedMessage();
       } else {
         metrics.incrementBrokerFailedToPublishMessage();
@@ -46,7 +70,17 @@
   }
 
   @Override
-  public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
-    apiDelegate.get().receiveAync(topic, eventConsumer);
+  public void receiveAsync(String topic, Consumer<EventMessage> messageConsumer) {
+    apiDelegate.get().receiveAsync(topic, messageConsumer);
+  }
+
+  @Override
+  public void disconnect() {
+    apiDelegate.get().disconnect();
+  }
+
+  @Override
+  public Set<TopicSubscriber> topicSubscribers() {
+    return apiDelegate.get().topicSubscribers();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
index 6983984..093b920 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.broker;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.inject.AbstractModule;
 import com.google.inject.Scopes;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerSession.java
deleted file mode 100644
index b04fbff..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerSession.java
+++ /dev/null
@@ -1,26 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker;
-
-public interface BrokerSession {
-
-  boolean isOpen();
-
-  void connect();
-
-  void disconnect();
-
-  boolean publish(String topic, String payload);
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
deleted file mode 100644
index 743d323..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
+++ /dev/null
@@ -1,99 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker.kafka;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventGson;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
-import java.util.UUID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Singleton
-public class BrokerPublisher implements LifecycleListener {
-  protected final Logger log = LoggerFactory.getLogger(getClass());
-
-  private final BrokerSession session;
-  private final Gson gson;
-  private final UUID instanceId;
-  private final MessageLogger msgLog;
-
-  @Inject
-  public BrokerPublisher(
-      BrokerSession session,
-      @EventGson Gson gson,
-      @InstanceId UUID instanceId,
-      MessageLogger msgLog) {
-    this.session = session;
-    this.gson = gson;
-    this.instanceId = instanceId;
-    this.msgLog = msgLog;
-  }
-
-  @Override
-  public void start() {
-    if (!session.isOpen()) {
-      session.connect();
-    }
-  }
-
-  @Override
-  public void stop() {
-    if (session.isOpen()) {
-      session.disconnect();
-    }
-  }
-
-  public boolean publish(String topic, Event event) {
-    if (Context.isForwardedEvent()) {
-      return true;
-    }
-
-    SourceAwareEventWrapper brokerEvent = toBrokerEvent(event);
-    Boolean eventPublished = session.publish(topic, getPayload(brokerEvent));
-    if (eventPublished) {
-      msgLog.log(Direction.PUBLISH, brokerEvent);
-    }
-    return eventPublished;
-  }
-
-  private String getPayload(SourceAwareEventWrapper event) {
-    return gson.toJson(event);
-  }
-
-  private SourceAwareEventWrapper toBrokerEvent(Event event) {
-    JsonObject body = eventToJson(event);
-    return new SourceAwareEventWrapper(
-        new SourceAwareEventWrapper.EventHeader(
-            UUID.randomUUID(), event.getType(), instanceId, event.eventCreatedOn),
-        body);
-  }
-
-  @VisibleForTesting
-  public JsonObject eventToJson(Event event) {
-    return gson.toJsonTree(event).getAsJsonObject();
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
deleted file mode 100644
index 6594544..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
+++ /dev/null
@@ -1,100 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker.kafka;
-
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaSession implements BrokerSession {
-  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSession.class);
-  private KafkaConfiguration properties;
-  private final UUID instanceId;
-  private volatile Producer<String, String> producer;
-
-  @Inject
-  public KafkaSession(KafkaConfiguration kafkaConfig, @InstanceId UUID instanceId) {
-    this.properties = kafkaConfig;
-    this.instanceId = instanceId;
-  }
-
-  @Override
-  public boolean isOpen() {
-    if (producer != null) {
-      return true;
-    }
-    return false;
-  }
-
-  @Override
-  public void connect() {
-    if (isOpen()) {
-      LOGGER.debug("Already connected.");
-      return;
-    }
-
-    LOGGER.info("Connect to {}...", properties.getKafka().getBootstrapServers());
-    /* Need to make sure that the thread of the running connection uses
-     * the correct class loader otherwize you can endup with hard to debug
-     * ClassNotFoundExceptions
-     */
-    setConnectionClassLoader();
-    producer = new KafkaProducer<>(properties.kafkaPublisher());
-    LOGGER.info("Connection established.");
-  }
-
-  private void setConnectionClassLoader() {
-    Thread.currentThread().setContextClassLoader(KafkaSession.class.getClassLoader());
-  }
-
-  @Override
-  public void disconnect() {
-    LOGGER.info("Disconnecting...");
-    if (producer != null) {
-      LOGGER.info("Closing Producer {}...", producer);
-      producer.close();
-    }
-    producer = null;
-  }
-
-  @Override
-  public boolean publish(String topic, String payload) {
-    return publishToTopic(properties.getKafka().getTopicAlias(EventTopic.of(topic)), payload);
-  }
-
-  private boolean publishToTopic(String topic, String payload) {
-    Future<RecordMetadata> future =
-        producer.send(new ProducerRecord<>(topic, instanceId.toString(), payload));
-    try {
-      RecordMetadata metadata = future.get();
-      LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
-      return true;
-    } catch (InterruptedException | ExecutionException e) {
-      LOGGER.error("Cannot send the message", e);
-      return false;
-    }
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 3d1046f..ec6072c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -14,76 +14,73 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.EventGson;
 import com.google.gerrit.server.permissions.PermissionBackendException;
-import com.google.gson.Gson;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
 import java.io.IOException;
 import java.util.UUID;
+import java.util.function.Consumer;
 
-public abstract class AbstractSubcriber implements Runnable {
+public abstract class AbstractSubcriber {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
-  private final BrokerApi brokerApi;
   private final ForwardedEventRouter eventRouter;
   private final DynamicSet<DroppedEventListener> droppedEventListeners;
-  private final Gson gson;
   private final UUID instanceId;
   private final MessageLogger msgLog;
   private SubscriberMetrics subscriberMetrics;
+  private final Configuration cfg;
+  private final String topic;
 
   public AbstractSubcriber(
-      BrokerApiWrapper brokerApi,
       ForwardedEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @EventGson Gson gson,
       @InstanceId UUID instanceId,
       MessageLogger msgLog,
-      SubscriberMetrics subscriberMetrics) {
+      SubscriberMetrics subscriberMetrics,
+      Configuration cfg) {
     this.eventRouter = eventRouter;
     this.droppedEventListeners = droppedEventListeners;
-    this.gson = gson;
     this.instanceId = instanceId;
     this.msgLog = msgLog;
     this.subscriberMetrics = subscriberMetrics;
-    this.brokerApi = brokerApi;
-  }
-
-  @Override
-  public void run() {
-    brokerApi.receiveAync(getTopic().topic(), this::processRecord);
+    this.cfg = cfg;
+    this.topic = getTopic().topic(cfg);
   }
 
   protected abstract EventTopic getTopic();
 
-  private void processRecord(SourceAwareEventWrapper event) {
+  public Consumer<EventMessage> getConsumer() {
+    return this::processRecord;
+  }
 
-    if (event.getHeader().getSourceInstanceId().equals(instanceId)) {
+  private void processRecord(EventMessage event) {
+
+    if (event.getHeader().sourceInstanceId.equals(instanceId)) {
       logger.atFiner().log(
           "Dropping event %s produced by our instanceId %s",
           event.toString(), instanceId.toString());
       droppedEventListeners.forEach(l -> l.onEventDropped(event));
     } else {
       try {
-        msgLog.log(Direction.CONSUME, event);
-        eventRouter.route(event.getEventBody(gson));
+        msgLog.log(Direction.CONSUME, topic, event);
+        eventRouter.route(event.getEvent());
         subscriberMetrics.incrementSubscriberConsumedMessage();
       } catch (IOException e) {
         logger.atSevere().withCause(e).log(
-            "Malformed event '%s': [Exception: %s]", event.getHeader().getEventType());
+            "Malformed event '%s': [Exception: %s]", event.getHeader());
         subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
       } catch (PermissionBackendException | CacheNotFoundException e) {
         logger.atSevere().withCause(e).log(
-            "Cannot handle message %s: [Exception: %s]", event.getHeader().getEventType());
+            "Cannot handle message %s: [Exception: %s]", event.getHeader());
         subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
       }
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index 53aae99..6c67c46 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -15,13 +15,11 @@
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.EventGson;
-import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
 import java.util.UUID;
@@ -30,21 +28,14 @@
 public class CacheEvictionEventSubscriber extends AbstractSubcriber {
   @Inject
   public CacheEvictionEventSubscriber(
-      BrokerApiWrapper brokerApi,
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @EventGson Gson gsonProvider,
       @InstanceId UUID instanceId,
       MessageLogger msgLog,
-      SubscriberMetrics subscriberMetrics) {
-    super(
-        brokerApi,
-        eventRouter,
-        droppedEventListeners,
-        gsonProvider,
-        instanceId,
-        msgLog,
-        subscriberMetrics);
+      SubscriberMetrics subscriberMetrics,
+      Configuration cfg) {
+
+    super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ConsumerExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ConsumerExecutor.java
deleted file mode 100644
index 936d07a..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ConsumerExecutor.java
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.consumer;
-
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import com.google.inject.BindingAnnotation;
-import java.lang.annotation.Retention;
-
-@Retention(RUNTIME)
-@BindingAnnotation
-public @interface ConsumerExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
index 680e8ed..6f4680c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
@@ -14,11 +14,13 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+
 public interface DroppedEventListener {
   /**
    * Invoked when any event is dropped.
    *
    * @param event information about the event.
    */
-  void onEventDropped(SourceAwareEventWrapper event);
+  void onEventDropped(EventMessage event);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index eacccbf..696cf03 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -15,13 +15,11 @@
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.EventGson;
-import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
 import java.util.UUID;
@@ -30,21 +28,14 @@
 public class IndexEventSubscriber extends AbstractSubcriber {
   @Inject
   public IndexEventSubscriber(
-      BrokerApiWrapper brokerApi,
       IndexEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @EventGson Gson gsonProvider,
       @InstanceId UUID instanceId,
       MessageLogger msgLog,
-      SubscriberMetrics subscriberMetrics) {
+      SubscriberMetrics subscriberMetrics,
+      Configuration cfg) {
     super(
-        brokerApi,
-        eventRouter,
-        droppedEventListeners,
-        gsonProvider,
-        instanceId,
-        msgLog,
-        subscriberMetrics);
+        eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java
index 1778961..f14dde5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java
@@ -14,36 +14,42 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.google.common.collect.Lists;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import java.util.concurrent.ExecutorService;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 
 @Singleton
 public class MultiSiteConsumerRunner implements LifecycleListener {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   private final DynamicSet<AbstractSubcriber> consumers;
-  private final ExecutorService executor;
+  private DynamicItem<BrokerApi> brokerApi;
+  private Configuration cfg;
 
   @Inject
   public MultiSiteConsumerRunner(
-      @ConsumerExecutor ExecutorService executor, DynamicSet<AbstractSubcriber> consumers) {
+      DynamicItem<BrokerApi> brokerApi,
+      DynamicSet<AbstractSubcriber> consumers,
+      Configuration cfg) {
     this.consumers = consumers;
-    this.executor = executor;
+    this.brokerApi = brokerApi;
+    this.cfg = cfg;
   }
 
   @Override
   public void start() {
     logger.atInfo().log("starting consumers");
-    consumers.forEach(c -> executor.execute(c));
+    consumers.forEach(
+        consumer ->
+            brokerApi.get().receiveAsync(consumer.getTopic().topic(cfg), consumer.getConsumer()));
   }
 
   @Override
-  public void stop() {
-    logger.atInfo().log("shutting down consumers");
-    executor.shutdown();
-  }
+  public void stop() {}
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index 4fa7f64..bf8b33d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -15,13 +15,11 @@
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.EventGson;
-import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
 import java.util.UUID;
@@ -30,15 +28,14 @@
 public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
   @Inject
   public ProjectUpdateEventSubscriber(
-      BrokerApiWrapper brokerApi,
       ProjectListUpdateRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @EventGson Gson gson,
       @InstanceId UUID instanceId,
       MessageLogger msgLog,
-      SubscriberMetrics subscriberMetrics) {
+      SubscriberMetrics subscriberMetrics,
+      Configuration cfg) {
     super(
-        brokerApi, eventRouter, droppedEventListeners, gson, instanceId, msgLog, subscriberMetrics);
+        eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SourceAwareEventWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SourceAwareEventWrapper.java
deleted file mode 100644
index b8bd0d8..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SourceAwareEventWrapper.java
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.consumer;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.gerrit.server.events.Event;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import java.util.UUID;
-
-public class SourceAwareEventWrapper {
-
-  private final EventHeader header;
-  private final JsonObject body;
-
-  public EventHeader getHeader() {
-    return header;
-  }
-
-  public JsonObject getBody() {
-    return body;
-  }
-
-  public Event getEventBody(Gson gson) {
-    return gson.fromJson(this.body, Event.class);
-  }
-
-  public static class EventHeader {
-    private final UUID eventId;
-    private final String eventType;
-    private final UUID sourceInstanceId;
-    private final Long eventCreatedOn;
-
-    public EventHeader(UUID eventId, String eventType, UUID sourceInstanceId, Long eventCreatedOn) {
-      this.eventId = eventId;
-      this.eventType = eventType;
-      this.sourceInstanceId = sourceInstanceId;
-      this.eventCreatedOn = eventCreatedOn;
-    }
-
-    public UUID getEventId() {
-      return eventId;
-    }
-
-    public String getEventType() {
-      return eventType;
-    }
-
-    public UUID getSourceInstanceId() {
-      return sourceInstanceId;
-    }
-
-    public Long getEventCreatedOn() {
-      return eventCreatedOn;
-    }
-
-    public void validate() {
-      requireNonNull(eventId, "EventId cannot be null");
-      requireNonNull(eventType, "EventType cannot be null");
-      requireNonNull(sourceInstanceId, "Source Instance ID cannot be null");
-    }
-
-    @Override
-    public String toString() {
-      return "{"
-          + "eventId="
-          + eventId
-          + ", eventType='"
-          + eventType
-          + '\''
-          + ", sourceInstanceId="
-          + sourceInstanceId
-          + ", eventCreatedOn="
-          + eventCreatedOn
-          + '}';
-    }
-  }
-
-  public SourceAwareEventWrapper(EventHeader header, JsonObject body) {
-    this.header = header;
-    this.body = body;
-  }
-
-  public void validate() {
-    requireNonNull(header, "Header cannot be null");
-    requireNonNull(body, "Body cannot be null");
-    header.validate();
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index 2918657..39ace0e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -15,13 +15,11 @@
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.EventGson;
-import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
 import java.util.UUID;
@@ -30,15 +28,14 @@
 public class StreamEventSubscriber extends AbstractSubcriber {
   @Inject
   public StreamEventSubscriber(
-      BrokerApiWrapper brokerApi,
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @EventGson Gson gson,
       @InstanceId UUID instanceId,
       MessageLogger msgLog,
-      SubscriberMetrics subscriberMetrics) {
+      SubscriberMetrics subscriberMetrics,
+      Configuration cfg) {
     super(
-        brokerApi, eventRouter, droppedEventListeners, gson, instanceId, msgLog, subscriberMetrics);
+         eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
index ef10151..36f618e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
@@ -26,12 +26,9 @@
   private static final String SUBSCRIBER_SUCCESS_COUNTER = "subscriber_msg_consumer_counter";
   private static final String SUBSCRIBER_FAILURE_COUNTER =
       "subscriber_msg_consumer_failure_counter";
-  private static final String SUBSCRIBER_POLL_FAILURE_COUNTER =
-      "subscriber_msg_consumer_poll_failure_counter";
 
   private final Counter1<String> subscriberSuccessCounter;
   private final Counter1<String> subscriberFailureCounter;
-  private final Counter1<String> subscriberPollFailureCounter;
 
   @Inject
   public SubscriberMetrics(MetricMaker metricMaker) {
@@ -50,15 +47,6 @@
                 .setRate()
                 .setUnit("errors"),
             stringField(SUBSCRIBER_FAILURE_COUNTER, "Subscriber failed to consume messages count"));
-
-    this.subscriberPollFailureCounter =
-        metricMaker.newCounter(
-            "multi_site/subscriber/subscriber_message_consumer_poll_failure_counter",
-            new Description("Number of failed attempts to poll messages by the subscriber")
-                .setRate()
-                .setUnit("errors"),
-            stringField(
-                SUBSCRIBER_POLL_FAILURE_COUNTER, "Subscriber failed to poll messages count"));
   }
 
   public void incrementSubscriberConsumedMessage() {
@@ -68,8 +56,4 @@
   public void incrementSubscriberFailedToConsumeMessage() {
     subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
   }
-
-  public void incrementSubscriberFailedToPollMessages() {
-    subscriberPollFailureCounter.increment(SUBSCRIBER_POLL_FAILURE_COUNTER);
-  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
index 0a0c350..ee1430c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
@@ -16,22 +16,21 @@
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 public class SubscriberModule extends LifecycleModule {
 
   @Override
   protected void configure() {
     MultiSiteEvent.registerEventTypes();
-    bind(ExecutorService.class)
-        .annotatedWith(ConsumerExecutor.class)
-        .toInstance(Executors.newFixedThreadPool(EventTopic.values().length));
-    listener().to(MultiSiteConsumerRunner.class);
 
     DynamicSet.setOf(binder(), AbstractSubcriber.class);
     DynamicSet.setOf(binder(), DroppedEventListener.class);
+    listener().to(MultiSiteConsumerRunner.class);
+
+    DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
+    DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
+    DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
+    DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
index dff21c1..b32e5ae 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
@@ -16,7 +16,7 @@
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
@@ -24,15 +24,17 @@
 
 @Singleton
 public class BrokerCacheEvictionForwarder implements CacheEvictionForwarder {
-  private final BrokerApi broker;
+  private final BrokerApiWrapper broker;
+  private final Configuration cfg;
 
   @Inject
-  BrokerCacheEvictionForwarder(BrokerApiWrapper broker) {
+  BrokerCacheEvictionForwarder(BrokerApiWrapper broker, Configuration cfg) {
     this.broker = broker;
+    this.cfg = cfg;
   }
 
   @Override
   public boolean evict(CacheEvictionEvent event) {
-    return broker.send(EventTopic.CACHE_TOPIC.topic(), event);
+    return broker.send(EventTopic.CACHE_TOPIC.topic(cfg), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
new file mode 100644
index 0000000..6bd6437
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
@@ -0,0 +1,33 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
+
+public class BrokerForwarderModule extends LifecycleModule {
+  @Override
+  protected void configure() {
+    DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
+    DynamicSet.bind(binder(), CacheEvictionForwarder.class).to(BrokerCacheEvictionForwarder.class);
+    DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
+        .to(BrokerProjectListUpdateForwarder.class);
+    DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
index c2cc3dc..0b4252d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
@@ -15,22 +15,24 @@
 package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
 
 import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 
 public class BrokerIndexEventForwarder implements IndexEventForwarder {
-  private final BrokerApi broker;
+  private final BrokerApiWrapper broker;
+  private final Configuration cfg;
 
   @Inject
-  BrokerIndexEventForwarder(BrokerApiWrapper broker) {
+  BrokerIndexEventForwarder(BrokerApiWrapper broker, Configuration cfg) {
     this.broker = broker;
+    this.cfg = cfg;
   }
 
   @Override
   public boolean index(IndexEvent event) {
-    return broker.send(EventTopic.INDEX_TOPIC.topic(), event);
+    return broker.send(EventTopic.INDEX_TOPIC.topic(cfg), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
index 1a8b652..34e0300 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
@@ -18,22 +18,24 @@
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
 
 @Singleton
 public class BrokerProjectListUpdateForwarder implements ProjectListUpdateForwarder {
-  private final BrokerApi broker;
+  private final BrokerApiWrapper broker;
+  private final Configuration cfg;
 
   @Inject
-  BrokerProjectListUpdateForwarder(BrokerApiWrapper broker) {
+  BrokerProjectListUpdateForwarder(BrokerApiWrapper broker, Configuration cfg) {
     this.broker = broker;
+    this.cfg = cfg;
   }
 
   @Override
   public boolean updateProjectList(ProjectListUpdateEvent event) {
-    return broker.send(PROJECT_LIST_TOPIC.topic(), event);
+    return broker.send(PROJECT_LIST_TOPIC.topic(cfg), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
index ed3a717..9ff4688 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
@@ -17,22 +17,24 @@
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 
 @Singleton
 public class BrokerStreamEventForwarder implements StreamEventForwarder {
-  private final BrokerApi broker;
+  private final BrokerApiWrapper broker;
+  private final Configuration cfg;
 
   @Inject
-  BrokerStreamEventForwarder(BrokerApiWrapper broker) {
+  BrokerStreamEventForwarder(BrokerApiWrapper broker, Configuration cfg) {
     this.broker = broker;
+    this.cfg = cfg;
   }
 
   @Override
   public boolean send(Event event) {
-    return broker.send(EventTopic.STREAM_EVENT_TOPIC.topic(), event);
+    return broker.send(EventTopic.STREAM_EVENT_TOPIC.topic(cfg), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index 371abae..eaa2df9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.multisite.forwarder.events;
 
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+
 public enum EventTopic {
   INDEX_TOPIC("GERRIT.EVENT.INDEX", "indexEvent"),
   CACHE_TOPIC("GERRIT.EVENT.CACHE", "cacheEvent"),
@@ -28,8 +30,8 @@
     this.aliasKey = aliasKey;
   }
 
-  public String topic() {
-    return topic;
+  public String topic(Configuration config) {
+    return config.broker().getTopic(topicAliasKey(), topic);
   }
 
   public String topicAliasKey() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
deleted file mode 100644
index ff23b78..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
+++ /dev/null
@@ -1,67 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka;
-
-import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.server.events.Event;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventSubscriber;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.Consumer;
-
-public class KafkaBrokerApi implements BrokerApi, LifecycleListener {
-
-  private final BrokerPublisher publisher;
-  private final Provider<KafkaEventSubscriber> subscriberProvider;
-  private List<KafkaEventSubscriber> subscribers;
-
-  @Inject
-  public KafkaBrokerApi(
-      BrokerPublisher publisher, Provider<KafkaEventSubscriber> subscriberProvider) {
-    this.publisher = publisher;
-    this.subscriberProvider = subscriberProvider;
-    subscribers = new ArrayList<>();
-  }
-
-  @Override
-  public boolean send(String topic, Event event) {
-    return publisher.publish(topic, event);
-  }
-
-  @Override
-  public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
-    KafkaEventSubscriber subscriber = subscriberProvider.get();
-    synchronized (subscribers) {
-      subscribers.add(subscriber);
-    }
-    subscriber.subscribe(EventTopic.of(topic), eventConsumer);
-  }
-
-  @Override
-  public void start() {}
-
-  @Override
-  public void stop() {
-    for (KafkaEventSubscriber subscriber : subscribers) {
-      subscriber.shutdown();
-    }
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
deleted file mode 100644
index 665ad49..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
+++ /dev/null
@@ -1,63 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaSession;
-import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerCacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventDeserializer;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.Deserializer;
-
-public class KafkaBrokerModule extends LifecycleModule {
-
-  @Override
-  protected void configure() {
-    bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
-    bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
-        .to(KafkaEventDeserializer.class);
-
-    DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
-    DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
-    DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
-    DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
-
-    listener().to(BrokerPublisher.class);
-    bind(BrokerSession.class).to(KafkaSession.class);
-
-    DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
-    DynamicSet.bind(binder(), CacheEvictionForwarder.class).to(BrokerCacheEvictionForwarder.class);
-    DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
-        .to(BrokerProjectListUpdateForwarder.class);
-    DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
deleted file mode 100644
index 0857334..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
+++ /dev/null
@@ -1,202 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka;
-
-import static com.google.common.base.Suppliers.memoize;
-
-import com.google.common.base.CaseFormat;
-import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
-import com.google.gerrit.extensions.annotations.PluginName;
-import com.google.gerrit.server.config.PluginConfigFactory;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.eclipse.jgit.lib.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Singleton
-public class KafkaConfiguration {
-
-  private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
-  static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
-  static final String KAFKA_SECTION = "kafka";
-  private static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
-  private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
-
-  private final Supplier<KafkaSubscriber> subscriber;
-  private final Supplier<Kafka> kafka;
-  private final Supplier<KafkaPublisher> publisher;
-
-  @Inject
-  public KafkaConfiguration(PluginConfigFactory configFactory, @PluginName String pluginName) {
-    Config cfg = configFactory.getGlobalPluginConfig(pluginName);
-    kafka = memoize(() -> new Kafka(cfg));
-    publisher = memoize(() -> new KafkaPublisher(cfg));
-    subscriber = memoize(() -> new KafkaSubscriber(cfg));
-  }
-
-  public Kafka getKafka() {
-    return kafka.get();
-  }
-
-  public KafkaSubscriber kafkaSubscriber() {
-    return subscriber.get();
-  }
-
-  private static void applyKafkaConfig(Config config, String subsectionName, Properties target) {
-    for (String section : config.getSubsections(KAFKA_SECTION)) {
-      if (section.equals(subsectionName)) {
-        for (String name : config.getNames(KAFKA_SECTION, section, true)) {
-          if (name.startsWith(KAFKA_PROPERTY_PREFIX)) {
-            Object value = config.getString(KAFKA_SECTION, subsectionName, name);
-            String configProperty = name.replaceFirst(KAFKA_PROPERTY_PREFIX, "");
-            String propName =
-                CaseFormat.LOWER_CAMEL
-                    .to(CaseFormat.LOWER_HYPHEN, configProperty)
-                    .replaceAll("-", ".");
-            log.info("[{}] Setting kafka property: {} = {}", subsectionName, propName, value);
-            target.put(propName, value);
-          }
-        }
-      }
-    }
-    target.put(
-        "bootstrap.servers",
-        getString(
-            config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS));
-  }
-
-  private static String getString(
-      Config cfg, String section, String subsection, String name, String defaultValue) {
-    String value = cfg.getString(section, subsection, name);
-    if (!Strings.isNullOrEmpty(value)) {
-      return value;
-    }
-    return defaultValue;
-  }
-
-  public KafkaPublisher kafkaPublisher() {
-    return publisher.get();
-  }
-
-  public static class Kafka {
-    private final Map<EventTopic, String> eventTopics;
-    private final String bootstrapServers;
-
-    Kafka(Config config) {
-      this.bootstrapServers =
-          getString(
-              config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
-
-      this.eventTopics = new HashMap<>();
-      for (EventTopic eventTopic : EventTopic.values()) {
-        eventTopics.put(
-            eventTopic,
-            getString(config, KAFKA_SECTION, null, eventTopic.topicAliasKey(), eventTopic.topic()));
-      }
-    }
-
-    public String getTopicAlias(EventTopic topic) {
-      return eventTopics.get(topic);
-    }
-
-    public String getBootstrapServers() {
-      return bootstrapServers;
-    }
-
-    private static String getString(
-        Config cfg, String section, String subsection, String name, String defaultValue) {
-      String value = cfg.getString(section, subsection, name);
-      if (!Strings.isNullOrEmpty(value)) {
-        return value;
-      }
-      return defaultValue;
-    }
-  }
-
-  public static class KafkaPublisher extends Properties {
-    private static final long serialVersionUID = 0L;
-
-    public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
-    public static final String KAFKA_PUBLISHER_SUBSECTION = "publisher";
-
-    private KafkaPublisher(Config kafkaConfig) {
-      setDefaults();
-      applyKafkaConfig(kafkaConfig, KAFKA_PUBLISHER_SUBSECTION, this);
-    }
-
-    private void setDefaults() {
-      put("acks", "all");
-      put("retries", 10);
-      put("batch.size", 16384);
-      put("linger.ms", 1);
-      put("buffer.memory", 33554432);
-      put("key.serializer", KAFKA_STRING_SERIALIZER);
-      put("value.serializer", KAFKA_STRING_SERIALIZER);
-      put("reconnect.backoff.ms", 5000L);
-    }
-  }
-
-  public static class KafkaSubscriber extends Properties {
-    private static final long serialVersionUID = 1L;
-
-    static final String KAFKA_SUBSCRIBER_SUBSECTION = "subscriber";
-
-    private final Integer pollingInterval;
-    private final Config cfg;
-
-    public KafkaSubscriber(Config kafkaCfg) {
-      this.cfg = kafkaCfg;
-
-      this.pollingInterval =
-          cfg.getInt(
-              KAFKA_SECTION,
-              KAFKA_SUBSCRIBER_SUBSECTION,
-              "pollingIntervalMs",
-              DEFAULT_POLLING_INTERVAL_MS);
-
-      applyKafkaConfig(kafkaCfg, KAFKA_SUBSCRIBER_SUBSECTION, this);
-    }
-
-    public Properties initPropsWith(UUID instanceId) {
-      String groupId =
-          getString(
-              cfg, KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, "groupId", instanceId.toString());
-      this.put("group.id", groupId);
-
-      return this;
-    }
-
-    public Integer getPollingInterval() {
-      return pollingInterval;
-    }
-
-    private String getString(
-        Config cfg, String section, String subsection, String name, String defaultValue) {
-      String value = cfg.getString(section, subsection, name);
-      if (!Strings.isNullOrEmpty(value)) {
-        return value;
-      }
-      return defaultValue;
-    }
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerFactory.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerFactory.java
deleted file mode 100644
index 9a5e19f..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import java.util.UUID;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.Deserializer;
-
-@Singleton
-public class KafkaConsumerFactory {
-  private KafkaConfiguration config;
-
-  @Inject
-  public KafkaConsumerFactory(KafkaConfiguration configuration) {
-    this.config = configuration;
-  }
-
-  public Consumer<byte[], byte[]> create(Deserializer<byte[]> keyDeserializer, UUID instanceId) {
-    return new KafkaConsumer<>(
-        config.kafkaSubscriber().initPropsWith(instanceId),
-        keyDeserializer,
-        new ByteArrayDeserializer());
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
deleted file mode 100644
index afd2656..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import com.google.gerrit.server.events.EventGson;
-import com.google.gson.Gson;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import java.util.Map;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-
-@Singleton
-public class KafkaEventDeserializer implements Deserializer<SourceAwareEventWrapper> {
-
-  private final StringDeserializer stringDeserializer = new StringDeserializer();
-  private Gson gson;
-
-  // To be used when providing this deserializer with class name (then need to add a configuration
-  // entry to set the gson.provider
-  public KafkaEventDeserializer() {}
-
-  @Inject
-  public KafkaEventDeserializer(@EventGson Gson gson) {
-    this.gson = gson;
-  }
-
-  @Override
-  public void configure(Map<String, ?> configs, boolean isKey) {}
-
-  @Override
-  public SourceAwareEventWrapper deserialize(String topic, byte[] data) {
-    final SourceAwareEventWrapper result =
-        gson.fromJson(stringDeserializer.deserialize(topic, data), SourceAwareEventWrapper.class);
-
-    result.validate();
-
-    return result;
-  }
-
-  @Override
-  public void close() {}
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
deleted file mode 100644
index c72fa0d..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
+++ /dev/null
@@ -1,110 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.server.util.ManualRequestContext;
-import com.google.gerrit.server.util.OneOffRequestContext;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.Deserializer;
-
-public class KafkaEventSubscriber {
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-
-  private final Consumer<byte[], byte[]> consumer;
-  private final OneOffRequestContext oneOffCtx;
-  private final AtomicBoolean closed = new AtomicBoolean(false);
-
-  private final Deserializer<SourceAwareEventWrapper> valueDeserializer;
-  private final KafkaConfiguration configuration;
-  private final SubscriberMetrics subscriberMetrics;
-
-  @Inject
-  public KafkaEventSubscriber(
-      KafkaConfiguration configuration,
-      KafkaConsumerFactory consumerFactory,
-      Deserializer<byte[]> keyDeserializer,
-      Deserializer<SourceAwareEventWrapper> valueDeserializer,
-      @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx,
-      SubscriberMetrics subscriberMetrics) {
-
-    this.configuration = configuration;
-    this.oneOffCtx = oneOffCtx;
-    this.subscriberMetrics = subscriberMetrics;
-
-    final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
-    try {
-      Thread.currentThread().setContextClassLoader(KafkaEventSubscriber.class.getClassLoader());
-      this.consumer = consumerFactory.create(keyDeserializer, instanceId);
-    } finally {
-      Thread.currentThread().setContextClassLoader(previousClassLoader);
-    }
-    this.valueDeserializer = valueDeserializer;
-  }
-
-  public void subscribe(
-      EventTopic evenTopic, java.util.function.Consumer<SourceAwareEventWrapper> messageProcessor) {
-    try {
-      final String topic = configuration.getKafka().getTopicAlias(evenTopic);
-      logger.atInfo().log(
-          "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, evenTopic);
-      consumer.subscribe(Collections.singleton(topic));
-      while (!closed.get()) {
-        ConsumerRecords<byte[], byte[]> consumerRecords =
-            consumer.poll(Duration.ofMillis(configuration.kafkaSubscriber().getPollingInterval()));
-        consumerRecords.forEach(
-            consumerRecord -> {
-              try (ManualRequestContext ctx = oneOffCtx.open()) {
-                SourceAwareEventWrapper event =
-                    valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
-                messageProcessor.accept(event);
-              } catch (Exception e) {
-                logger.atSevere().withCause(e).log(
-                    "Malformed event '%s': [Exception: %s]",
-                    new String(consumerRecord.value(), UTF_8));
-                subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
-              }
-            });
-      }
-    } catch (WakeupException e) {
-      // Ignore exception if closing
-      if (!closed.get()) throw e;
-    } catch (Exception e) {
-      subscriberMetrics.incrementSubscriberFailedToPollMessages();
-      throw e;
-    } finally {
-      consumer.close();
-    }
-  }
-
-  public void shutdown() {
-    closed.set(true);
-    consumer.wakeup();
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
index 2abc7b0..92fa101 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -5,9 +5,11 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.Event;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import java.util.UUID;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -19,26 +21,30 @@
   @Mock private BrokerMetrics brokerMetrics;
   @Mock private BrokerApi brokerApi;
   @Mock Event event;
+  @Mock MessageLogger msgLog;
+  private UUID instanceId = UUID.randomUUID();
+  private String topic = "index";
 
   private BrokerApiWrapper objectUnderTest;
 
   @Before
   public void setUp() {
     objectUnderTest =
-        new BrokerApiWrapper(DynamicItem.itemOf(BrokerApi.class, brokerApi), brokerMetrics);
+        new BrokerApiWrapper(
+            DynamicItem.itemOf(BrokerApi.class, brokerApi), brokerMetrics, msgLog, instanceId);
   }
 
   @Test
   public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
     when(brokerApi.send(any(), any())).thenReturn(true);
-    objectUnderTest.send(EventTopic.INDEX_TOPIC.topic(), event);
+    objectUnderTest.send(topic, event);
     verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
   }
 
   @Test
   public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailed() {
     when(brokerApi.send(any(), any())).thenReturn(false);
-    objectUnderTest.send(EventTopic.INDEX_TOPIC.topic(), event);
+    objectUnderTest.send(topic, event);
     verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
   }
 
@@ -47,7 +53,7 @@
     when(brokerApi.send(any(), any()))
         .thenThrow(new RuntimeException("Unexpected runtime exception"));
     try {
-      objectUnderTest.send(EventTopic.INDEX_TOPIC.topic(), event);
+      objectUnderTest.send(topic, event);
     } catch (RuntimeException e) {
       // expected
     }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
deleted file mode 100644
index a6d9193..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker.kafka;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.only;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.gerrit.entities.Account;
-import com.google.gerrit.entities.BranchNameKey;
-import com.google.gerrit.entities.Change;
-import com.google.gerrit.extensions.client.ChangeKind;
-import com.google.gerrit.server.data.AccountAttribute;
-import com.google.gerrit.server.data.ApprovalAttribute;
-import com.google.gerrit.server.events.CommentAddedEvent;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventGsonProvider;
-import com.google.gerrit.server.util.time.TimeUtil;
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerMetrics;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import java.util.UUID;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class BrokerPublisherTest {
-
-  @Mock private BrokerMetrics brokerMetrics;
-  @Mock private BrokerSession session;
-  @Mock private MessageLogger msgLog;
-  private BrokerPublisher publisher;
-
-  private Gson gson = new EventGsonProvider().get();
-
-  private String accountName = "Foo Bar";
-  private String accountEmail = "foo@bar.com";
-  private String accountUsername = "foobar";
-  private String approvalType = ChangeKind.REWORK.toString();
-
-  private String approvalDescription = "ApprovalDescription";
-  private String approvalValue = "+2";
-  private String oldApprovalValue = "+1";
-  private Long approvalGrantedOn = 123L;
-  private String commentDescription = "Patch Set 1: Code-Review+2";
-  private String projectName = "project";
-  private String refName = "refs/heads/master";
-  private String changeId = "Iabcd1234abcd1234abcd1234abcd1234abcd1234";
-  private Long eventCreatedOn = 123L;
-
-  @Before
-  public void setUp() {
-    publisher = new BrokerPublisher(session, gson, UUID.randomUUID(), msgLog);
-  }
-
-  @Test
-  public void shouldSerializeCommentAddedEvent() {
-
-    Event event = createSampleEvent();
-
-    String expectedSerializedCommentEvent =
-        "{\"author\": {\"name\": \""
-            + accountName
-            + "\",\"email\": \""
-            + accountEmail
-            + "\",\"username\": \""
-            + accountUsername
-            + "\"},\"approvals\": [{\"type\": \""
-            + approvalType
-            + "\",\"description\": \""
-            + approvalDescription
-            + "\",\"value\": \""
-            + approvalValue
-            + "\",\"oldValue\": \""
-            + oldApprovalValue
-            + "\",\"grantedOn\": "
-            + approvalGrantedOn
-            + ",\"by\": {\"name\": \""
-            + accountName
-            + "\",\"email\": \""
-            + accountEmail
-            + "\",\"username\": \""
-            + accountUsername
-            + "\"}}],\"comment\": \""
-            + commentDescription
-            + "\",\"project\": \""
-            + projectName
-            + "\",\"refName\": \""
-            + refName
-            + "\",\"changeKey\": {\"id\": \""
-            + changeId
-            + "\""
-            + "  },\"type\": \"comment-added\",\"eventCreatedOn\": "
-            + eventCreatedOn
-            + "}";
-
-    JsonObject expectedCommentEventJsonObject =
-        gson.fromJson(expectedSerializedCommentEvent, JsonElement.class).getAsJsonObject();
-
-    assertThat(publisher.eventToJson(event)).isEqualTo(expectedCommentEventJsonObject);
-  }
-
-  @Test
-  public void shouldLogEventPublishedMessageWhenPublishingSucceed() {
-    Event event = createSampleEvent();
-    when(session.publish(any(), any())).thenReturn(true);
-    publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
-    verify(msgLog, only()).log(any(), any());
-  }
-
-  @Test
-  public void shouldSkipEventPublishedLoggingWhenMessagePublishigFailed() {
-    Event event = createSampleEvent();
-    when(session.publish(any(), any())).thenReturn(false);
-
-    publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
-    verify(msgLog, never()).log(any(), any());
-  }
-
-  private Event createSampleEvent() {
-    final Change change =
-        new Change(
-            Change.key(changeId),
-            Change.id(1),
-            Account.id(1),
-            BranchNameKey.create(projectName, refName),
-            TimeUtil.nowTs());
-
-    CommentAddedEvent event = new CommentAddedEvent(change);
-    AccountAttribute accountAttribute = new AccountAttribute();
-    accountAttribute.email = accountEmail;
-    accountAttribute.name = accountName;
-    accountAttribute.username = accountUsername;
-
-    event.eventCreatedOn = eventCreatedOn;
-    event.approvals =
-        () -> {
-          ApprovalAttribute approvalAttribute = new ApprovalAttribute();
-          approvalAttribute.value = approvalValue;
-          approvalAttribute.oldValue = oldApprovalValue;
-          approvalAttribute.description = approvalDescription;
-          approvalAttribute.by = accountAttribute;
-          approvalAttribute.type = ChangeKind.REWORK.toString();
-          approvalAttribute.grantedOn = approvalGrantedOn;
-
-          return new ApprovalAttribute[] {approvalAttribute};
-        };
-
-    event.author = () -> accountAttribute;
-    event.comment = commentDescription;
-
-    return event;
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
similarity index 96%
rename from src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java
rename to src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
index a19134d..c919df8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.event;
 
 import static org.mockito.Mockito.verify;
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
similarity index 98%
rename from src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
rename to src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
index f1084c9..923abb2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.event;
 
 import static com.google.gerrit.testing.GerritJUnit.assertThrows;
 import static org.mockito.Mockito.verify;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
similarity index 95%
rename from src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java
rename to src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
index 00a239b..93daf92 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.event;
 
 import static org.mockito.Mockito.verify;
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
similarity index 96%
rename from src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
rename to src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
index 11560da..605eb45 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.event;
 
 import static org.mockito.Mockito.verify;
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
deleted file mode 100644
index 2d266c2..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka;
-
-import static com.google.common.truth.Truth.assertThat;
-import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KAFKA_SECTION;
-import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
-import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
-import static org.mockito.Mockito.when;
-
-import com.google.gerrit.server.config.PluginConfigFactory;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import org.eclipse.jgit.lib.Config;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class KafkaConfigurationTest {
-
-  private Config kafkaConfig;
-  @Mock PluginConfigFactory configFactory;
-
-  @Before
-  public void setup() {
-    kafkaConfig = new Config();
-    when(configFactory.getGlobalPluginConfig("multi-site")).thenReturn(kafkaConfig);
-  }
-
-  private KafkaConfiguration getConfiguration() {
-    return new KafkaConfiguration(configFactory, "multi-site");
-  }
-
-  @Test
-  public void kafkaSubscriberPropertiesAreIgnoredWhenPrefixIsNotSet() {
-    final String kafkaPropertyName = "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    kafkaConfig.setString(
-        KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
-
-    assertThat(property).isNull();
-  }
-
-  @Test
-  public void kafkaPublisherPropertiesAreIgnoredWhenPrefixIsNotSet() {
-    final String kafkaPropertyName = "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    kafkaConfig.setString(
-        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
-
-    assertThat(property).isNull();
-  }
-
-  @Test
-  public void shouldReturnKafkaTopicAliasForIndexTopic() {
-    setKafkaTopicAlias("indexEventTopic", "gerrit_index");
-    final String property = getConfiguration().getKafka().getTopicAlias(EventTopic.INDEX_TOPIC);
-
-    assertThat(property).isEqualTo("gerrit_index");
-  }
-
-  @Test
-  public void shouldReturnKafkaTopicAliasForStreamEventTopic() {
-    setKafkaTopicAlias("streamEventTopic", "gerrit_stream_events");
-    final String property =
-        getConfiguration().getKafka().getTopicAlias(EventTopic.STREAM_EVENT_TOPIC);
-
-    assertThat(property).isEqualTo("gerrit_stream_events");
-  }
-
-  @Test
-  public void shouldReturnKafkaTopicAliasForProjectListEventTopic() {
-    setKafkaTopicAlias("projectListEventTopic", "gerrit_project_list");
-    final String property =
-        getConfiguration().getKafka().getTopicAlias(EventTopic.PROJECT_LIST_TOPIC);
-
-    assertThat(property).isEqualTo("gerrit_project_list");
-  }
-
-  @Test
-  public void shouldReturnKafkaTopicAliasForCacheEventTopic() {
-    setKafkaTopicAlias("cacheEventTopic", "gerrit_cache");
-    final String property = getConfiguration().getKafka().getTopicAlias(EventTopic.CACHE_TOPIC);
-
-    assertThat(property).isEqualTo("gerrit_cache");
-  }
-
-  private void setKafkaTopicAlias(String topicKey, String topic) {
-    kafkaConfig.setString(KAFKA_SECTION, null, topicKey, topic);
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
deleted file mode 100644
index 9751269..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ /dev/null
@@ -1,300 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import static com.google.common.truth.Truth.assertThat;
-import static java.util.stream.Collectors.toSet;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import com.google.gerrit.acceptance.AbstractDaemonTest;
-import com.google.gerrit.acceptance.GerritConfig;
-import com.google.gerrit.acceptance.LogThreshold;
-import com.google.gerrit.acceptance.NoHttpd;
-import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.extensions.api.changes.ReviewInput;
-import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.gerrit.server.config.PluginConfigFactory;
-import com.google.gerrit.server.config.SitePaths;
-import com.google.gerrit.server.data.PatchSetAttribute;
-import com.google.gerrit.server.events.CommentAddedEvent;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventGson;
-import com.google.gerrit.server.events.PatchSetCreatedEvent;
-import com.google.gerrit.server.events.RefUpdatedEvent;
-import com.google.gerrit.server.query.change.ChangeData;
-import com.google.gson.Gson;
-import com.google.inject.Inject;
-import com.google.inject.Key;
-import com.google.inject.Scopes;
-import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.GitModule;
-import com.googlesource.gerrit.plugins.multisite.Module;
-import com.googlesource.gerrit.plugins.multisite.PluginModule;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
-import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.eclipse.jgit.lib.Config;
-import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.revwalk.RevCommit;
-import org.eclipse.jgit.revwalk.RevWalk;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.util.FS;
-import org.junit.Test;
-import org.testcontainers.containers.KafkaContainer;
-
-@NoHttpd
-@LogThreshold(level = "INFO")
-@UseLocalDisk
-public class EventConsumerIT extends AbstractDaemonTest {
-  public static final String GERRIT_CONFIG_KEY = "gerrit.installModule";
-  public static final String GERRIT_CONFIG_VALUE =
-      "com.googlesource.gerrit.plugins.multisite.kafka.consumer.EventConsumerIT$KafkaTestContainerModule";
-  private static final int QUEUE_POLL_TIMEOUT_MSECS = 10000;
-
-  public static class KafkaTestContainerModule extends LifecycleModule {
-
-    public static class KafkaStopAtShutdown implements LifecycleListener {
-      private final KafkaContainer kafka;
-
-      public KafkaStopAtShutdown(KafkaContainer kafka) {
-        this.kafka = kafka;
-      }
-
-      @Override
-      public void stop() {
-        kafka.stop();
-      }
-
-      @Override
-      public void start() {
-        // Do nothing
-      }
-    }
-
-    public static class TestBrokerModule extends BrokerModule {
-      @Override
-      protected void configure() {
-        DynamicItem.itemOf(binder(), BrokerApi.class);
-        bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
-
-        install(new SubscriberModule());
-      }
-    }
-
-    private final FileBasedConfig config;
-    private final Module multiSiteModule;
-    private final PluginModule pluginModule;
-    private final GitModule gitModule;
-
-    @Inject
-    public KafkaTestContainerModule(SitePaths sitePaths) throws IOException {
-      this.config =
-          new FileBasedConfig(
-              sitePaths.etc_dir.resolve(Configuration.MULTI_SITE_CONFIG).toFile(), FS.DETECTED);
-      config.setBoolean("ref-database", null, "enabled", false);
-      config.save();
-
-      Configuration multiSiteConfig = new Configuration(config, new Config());
-
-      PluginConfigFactory cfgFactory = mock(PluginConfigFactory.class);
-      when(cfgFactory.getGlobalPluginConfig("multi-site")).thenReturn(config);
-
-      this.multiSiteModule = new Module(multiSiteConfig, new TestBrokerModule());
-      this.pluginModule = new PluginModule(multiSiteConfig, new KafkaBrokerModule());
-      this.gitModule = new GitModule(multiSiteConfig);
-    }
-
-    @Override
-    protected void configure() {
-      try {
-        final KafkaContainer kafka = startAndConfigureKafkaConnection();
-
-        listener().toInstance(new KafkaStopAtShutdown(kafka));
-
-        install(multiSiteModule);
-        install(pluginModule);
-        install(gitModule);
-
-      } catch (IOException e) {
-        throw new IllegalStateException(e);
-      }
-    }
-
-    private KafkaContainer startAndConfigureKafkaConnection() throws IOException {
-      KafkaContainer kafkaContainer = new KafkaContainer();
-      kafkaContainer.start();
-
-      Config kafkaConfig = new Config();
-      kafkaConfig.setString(
-          "kafka", null, "bootstrapServers", kafkaContainer.getBootstrapServers());
-
-      PluginConfigFactory configFactory = mock(PluginConfigFactory.class);
-      when(configFactory.getGlobalPluginConfig("multi-site")).thenReturn(kafkaConfig);
-      KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(configFactory, "multi-site");
-      bind(KafkaConfiguration.class).toInstance(kafkaConfiguration);
-
-      listener().toInstance(new KafkaStopAtShutdown(kafkaContainer));
-
-      return kafkaContainer;
-    }
-  }
-
-  @Test
-  @GerritConfig(name = GERRIT_CONFIG_KEY, value = GERRIT_CONFIG_VALUE)
-  public void createChangeShouldPropagateChangeIndexAndRefUpdateStreamEvent() throws Exception {
-    LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
-    drainQueue(droppedEventsQueue);
-
-    ChangeData change = createChange().getChange();
-    String project = change.project().get();
-    int changeNum = change.getId().get();
-    String changeNotesRef = change.notes().getRefName();
-    int patchsetNum = change.currentPatchSet().id().get();
-    String patchsetRevision = change.currentPatchSet().commitId().name();
-    String patchsetRef = change.currentPatchSet().refName();
-
-    Map<String, List<Event>> eventsByType = receiveEventsByType(droppedEventsQueue);
-    assertThat(eventsByType).isNotEmpty();
-
-    assertThat(eventsByType.get("change-index"))
-        .containsExactly(createChangeIndexEvent(project, changeNum, getParentCommit(change)));
-
-    assertThat(
-            eventsByType.get("ref-updated").stream()
-                .map(e -> ((RefUpdatedEvent) e).getRefName())
-                .collect(toSet()))
-        .containsAtLeast(changeNotesRef, patchsetRef); // 'refs/sequences/changes'
-    // not always updated thus
-    // not checked
-
-    List<Event> patchSetCreatedEvents = eventsByType.get("patchset-created");
-    assertThat(patchSetCreatedEvents).hasSize(1);
-    assertPatchSetAttributes(
-        (PatchSetCreatedEvent) patchSetCreatedEvents.get(0),
-        patchsetNum,
-        patchsetRevision,
-        patchsetRef);
-  }
-
-  private void assertPatchSetAttributes(
-      PatchSetCreatedEvent patchSetCreated,
-      int patchsetNum,
-      String patchsetRevision,
-      String patchsetRef) {
-    PatchSetAttribute patchSetAttribute = patchSetCreated.patchSet.get();
-    assertThat(patchSetAttribute.number).isEqualTo(patchsetNum);
-    assertThat(patchSetAttribute.revision).isEqualTo(patchsetRevision);
-    assertThat(patchSetAttribute.ref).isEqualTo(patchsetRef);
-  }
-
-  @Test
-  @GerritConfig(name = GERRIT_CONFIG_KEY, value = GERRIT_CONFIG_VALUE)
-  public void reviewChangeShouldPropagateChangeIndexAndCommentAdded() throws Exception {
-    LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
-    ChangeData change = createChange().getChange();
-    String project = change.project().get();
-    int changeNum = change.getId().get();
-    drainQueue(droppedEventsQueue);
-
-    ReviewInput in = ReviewInput.recommend();
-    in.message = "LGTM";
-    gApi.changes().id(changeNum).revision("current").review(in);
-
-    Map<String, List<Event>> eventsByType = receiveEventsByType(droppedEventsQueue);
-
-    assertThat(eventsByType).isNotEmpty();
-
-    assertThat(eventsByType.get("change-index"))
-        .containsExactly(createChangeIndexEvent(project, changeNum, getParentCommit(change)));
-
-    List<Event> commentAddedEvents = eventsByType.get("comment-added");
-    assertThat(commentAddedEvents).hasSize(1);
-    assertThat(((CommentAddedEvent) commentAddedEvents.get(0)).comment)
-        .isEqualTo("Patch Set 1: Code-Review+1\n\n" + in.message);
-  }
-
-  private String getParentCommit(ChangeData change) throws Exception {
-    RevCommit parent;
-    try (Repository repo = repoManager.openRepository(change.project());
-        RevWalk walk = new RevWalk(repo)) {
-      RevCommit commit = walk.parseCommit(change.currentPatchSet().commitId());
-      parent = commit.getParent(0);
-    }
-    return parent.getId().name();
-  }
-
-  private ChangeIndexEvent createChangeIndexEvent(
-      String projectName, int changeId, String targetSha1) {
-    ChangeIndexEvent event = new ChangeIndexEvent(projectName, changeId, false);
-    event.targetSha = targetSha1;
-    return event;
-  }
-
-  private LinkedBlockingQueue<SourceAwareEventWrapper> captureDroppedEvents() throws Exception {
-    LinkedBlockingQueue<SourceAwareEventWrapper> droppedEvents = new LinkedBlockingQueue<>();
-
-    TypeLiteral<DynamicSet<DroppedEventListener>> type =
-        new TypeLiteral<DynamicSet<DroppedEventListener>>() {};
-    server
-        .getTestInjector()
-        .getInstance(Key.get(type))
-        .add(
-            "multi-site",
-            new DroppedEventListener() {
-              @Override
-              public void onEventDropped(SourceAwareEventWrapper event) {
-                droppedEvents.offer(event);
-              }
-            });
-    return droppedEvents;
-  }
-
-  private Map<String, List<Event>> receiveEventsByType(
-      LinkedBlockingQueue<SourceAwareEventWrapper> queue) throws InterruptedException {
-    return drainQueue(queue).stream()
-        .sorted(Comparator.comparing(e -> e.type))
-        .collect(Collectors.groupingBy(e -> e.type));
-  }
-
-  private List<Event> drainQueue(LinkedBlockingQueue<SourceAwareEventWrapper> queue)
-      throws InterruptedException {
-    Gson gson = server.getTestInjector().getInstance(Key.get(Gson.class, EventGson.class));
-    SourceAwareEventWrapper event;
-    List<Event> eventsList = new ArrayList<>();
-    while ((event = queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS)) != null) {
-      eventsList.add(event.getEventBody(gson));
-    }
-    return eventsList;
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
deleted file mode 100644
index 76ad452..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import static com.google.common.truth.Truth.assertThat;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.gerrit.server.events.EventGsonProvider;
-import com.google.gson.Gson;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import java.util.UUID;
-import org.junit.Before;
-import org.junit.Test;
-
-public class KafkaEventDeserializerTest {
-  private KafkaEventDeserializer deserializer;
-
-  @Before
-  public void setUp() {
-    final Gson gson = new EventGsonProvider().get();
-    deserializer = new KafkaEventDeserializer(gson);
-  }
-
-  @Test
-  public void kafkaEventDeserializerShouldParseAKafkaEvent() {
-    final UUID eventId = UUID.randomUUID();
-    final String eventType = "event-type";
-    final UUID sourceInstanceId = UUID.randomUUID();
-    final long eventCreatedOn = 10L;
-    final String eventJson =
-        String.format(
-            "{ "
-                + "\"header\": { \"eventId\": \"%s\", \"eventType\": \"%s\", \"sourceInstanceId\": \"%s\", \"eventCreatedOn\": %d },"
-                + "\"body\": {}"
-                + "}",
-            eventId, eventType, sourceInstanceId, eventCreatedOn);
-    final SourceAwareEventWrapper event =
-        deserializer.deserialize("ignored", eventJson.getBytes(UTF_8));
-
-    assertThat(event.getBody().entrySet()).isEmpty();
-    assertThat(event.getHeader().getEventId()).isEqualTo(eventId);
-    assertThat(event.getHeader().getEventType()).isEqualTo(eventType);
-    assertThat(event.getHeader().getSourceInstanceId()).isEqualTo(sourceInstanceId);
-    assertThat(event.getHeader().getEventCreatedOn()).isEqualTo(eventCreatedOn);
-  }
-
-  @Test(expected = RuntimeException.class)
-  public void kafkaEventDeserializerShouldFailForInvalidJson() {
-    deserializer.deserialize("ignored", "this is not a JSON string".getBytes(UTF_8));
-  }
-
-  @Test(expected = RuntimeException.class)
-  public void kafkaEventDeserializerShouldFailForInvalidObjectButValidJSON() {
-    deserializer.deserialize("ignored", "{}".getBytes(UTF_8));
-  }
-}