Merge branch 'stable-2.16' into stable-3.0

* stable-2.16:
  Move DroppedEventListener out of kafka domain
  Merge Kafka-related modules into single KafkaBrokerModule
  Decouple the multi-site engine from broker implementation
  Fix NPE when source ref is null

Change-Id: I546786507c663230ac573ac0ab27face85ad917d
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 03bff94..89c2253 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -14,9 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
-import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.gson.Gson;
@@ -26,19 +24,14 @@
 import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.google.inject.spi.Message;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
 import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
 import com.googlesource.gerrit.plugins.multisite.event.EventModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
 import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
-import com.googlesource.gerrit.plugins.multisite.validation.ProjectDeletedSharedDbCleanup;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.NoopSharedRefDatabase;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
 import java.io.BufferedReader;
@@ -55,17 +48,12 @@
 public class Module extends LifecycleModule {
   private static final Logger log = LoggerFactory.getLogger(Module.class);
   private Configuration config;
-  private KafkaForwardedEventRouterModule kafkaForwardedEventRouterModule;
-  private KafkaBrokerForwarderModule kafkaBrokerForwarderModule;
+  private BrokerModule brokerModule;
 
   @Inject
-  public Module(
-      Configuration config,
-      KafkaForwardedEventRouterModule forwardedEeventRouterModule,
-      KafkaBrokerForwarderModule brokerForwarderModule) {
+  public Module(Configuration config, BrokerModule brokerModule) {
     this.config = config;
-    this.kafkaForwardedEventRouterModule = forwardedEeventRouterModule;
-    this.kafkaBrokerForwarderModule = brokerForwarderModule;
+    this.brokerModule = brokerModule;
   }
 
   @Override
@@ -97,20 +85,10 @@
       install(new IndexModule());
     }
 
-    install(new BrokerModule());
-    DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
-    listener().to(KafkaBrokerApi.class);
+    install(brokerModule);
 
     install(new RouterModule());
 
-    install(kafkaForwardedEventRouterModule);
-    install(kafkaBrokerForwarderModule);
-
-    if (config.getSharedRefDb().isEnabled()) {
-      DynamicSet.bind(binder(), ProjectDeletedListener.class)
-          .to(ProjectDeletedSharedDbCleanup.class);
-    }
-
     bind(Gson.class)
         .annotatedWith(BrokerGson.class)
         .toProvider(GsonProvider.class)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index 22a8d2d..b27ae01 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -15,8 +15,13 @@
 package com.googlesource.gerrit.plugins.multisite;
 
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
 
 public class PluginModule extends LifecycleModule {
@@ -24,20 +29,29 @@
 
   private Configuration config;
   private ZkValidationModule zkValidationModule;
+  private KafkaBrokerModule kafkaBrokerModule;
 
   @Inject
-  public PluginModule(Configuration config, ZkValidationModule zkValidationModule) {
+  public PluginModule(
+      Configuration config,
+      ZkValidationModule zkValidationModule,
+      KafkaBrokerModule kafkaBrokerModule) {
     this.config = config;
     this.zkValidationModule = zkValidationModule;
+    this.kafkaBrokerModule = kafkaBrokerModule;
   }
 
   @Override
   protected void configure() {
-    listener().to(PluginStartup.class);
-
     if (config.getSharedRefDb().isEnabled()) {
+      listener().to(PluginStartup.class);
       logger.atInfo().log("Shared ref-db engine: Zookeeper");
       install(zkValidationModule);
     }
+
+    DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
+    listener().to(KafkaBrokerApi.class);
+
+    install(kafkaBrokerModule);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
new file mode 100644
index 0000000..19cf0f7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import java.util.function.Consumer;
+
+public class BrokerApiNoOp implements BrokerApi {
+
+  @Override
+  public boolean send(String topic, Event event) {
+    return true;
+  }
+
+  @Override
+  public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {}
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
index f1ab1f6..6983984 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -24,6 +24,7 @@
   @Override
   protected void configure() {
     DynamicItem.itemOf(binder(), BrokerApi.class);
+    DynamicItem.bind(binder(), BrokerApi.class).to(BrokerApiNoOp.class).in(Scopes.SINGLETON);
 
     bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
deleted file mode 100644
index a49aa8b..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
+++ /dev/null
@@ -1,62 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker.kafka;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerCacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-
-public class KafkaBrokerForwarderModule extends LifecycleModule {
-  private final KafkaConfiguration config;
-
-  @Inject
-  public KafkaBrokerForwarderModule(KafkaConfiguration config) {
-    this.config = config;
-  }
-
-  @Override
-  protected void configure() {
-    if (config.kafkaPublisher().enabled()) {
-      listener().to(BrokerPublisher.class);
-      bind(BrokerSession.class).to(KafkaSession.class);
-
-      if (config.kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC)) {
-        DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
-      }
-      if (config.kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC)) {
-        DynamicSet.bind(binder(), CacheEvictionForwarder.class)
-            .to(BrokerCacheEvictionForwarder.class);
-      }
-      if (config.kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
-        DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
-            .to(BrokerProjectListUpdateForwarder.class);
-      }
-      if (config.kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
-        DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
-      }
-    }
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 8e41690..7854aab 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -22,6 +22,7 @@
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
@@ -41,7 +42,7 @@
   private SubscriberMetrics subscriberMetrics;
 
   public AbstractSubcriber(
-      BrokerApi brokerApi,
+      BrokerApiWrapper brokerApi,
       ForwardedEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gson,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index fc62e6d..d096148 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -20,7 +20,7 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
@@ -30,7 +30,7 @@
 public class CacheEvictionEventSubscriber extends AbstractSubcriber {
   @Inject
   public CacheEvictionEventSubscriber(
-      BrokerApi brokerApi,
+      BrokerApiWrapper brokerApi,
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gsonProvider,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index c201f65..df55040 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -20,7 +20,7 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
@@ -30,7 +30,7 @@
 public class IndexEventSubscriber extends AbstractSubcriber {
   @Inject
   public IndexEventSubscriber(
-      BrokerApi brokerApi,
+      BrokerApiWrapper brokerApi,
       IndexEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gsonProvider,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index 4157609..5c42ea6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -20,7 +20,7 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
@@ -30,7 +30,7 @@
 public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
   @Inject
   public ProjectUpdateEventSubscriber(
-      BrokerApi brokerApi,
+      BrokerApiWrapper brokerApi,
       ProjectListUpdateRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gson,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index af0f3e7..b48ab31 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -20,7 +20,7 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
@@ -30,7 +30,7 @@
 public class StreamEventSubscriber extends AbstractSubcriber {
   @Inject
   public StreamEventSubscriber(
-      BrokerApi brokerApi,
+      BrokerApiWrapper brokerApi,
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gson,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
index 940ad1f..0a0c350 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
@@ -32,5 +32,6 @@
     listener().to(MultiSiteConsumerRunner.class);
 
     DynamicSet.setOf(binder(), AbstractSubcriber.class);
+    DynamicSet.setOf(binder(), DroppedEventListener.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
new file mode 100644
index 0000000..5fdb07e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
@@ -0,0 +1,92 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Inject;
+import com.google.inject.TypeLiteral;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaSession;
+import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerCacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventDeserializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+
+public class KafkaBrokerModule extends LifecycleModule {
+  private KafkaConfiguration config;
+
+  @Inject
+  public KafkaBrokerModule(KafkaConfiguration config) {
+    this.config = config;
+  }
+
+  @Override
+  protected void configure() {
+    if (config.kafkaSubscriber().enabled()) {
+      bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
+      bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
+          .to(KafkaEventDeserializer.class);
+
+      if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
+        DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
+      }
+      if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
+        DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
+      }
+      if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
+        DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
+      }
+      if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
+        DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
+      }
+    }
+
+    if (config.kafkaPublisher().enabled()) {
+      listener().to(BrokerPublisher.class);
+      bind(BrokerSession.class).to(KafkaSession.class);
+
+      if (config.kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC)) {
+        DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
+      }
+      if (config.kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC)) {
+        DynamicSet.bind(binder(), CacheEvictionForwarder.class)
+            .to(BrokerCacheEvictionForwarder.class);
+      }
+      if (config.kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
+        DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
+            .to(BrokerProjectListUpdateForwarder.class);
+      }
+      if (config.kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
+        DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
+      }
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
deleted file mode 100644
index b7bcabd..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.inject.Inject;
-import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
-import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.Deserializer;
-
-public class KafkaConsumerModule extends LifecycleModule {
-
-  private KafkaConfiguration config;
-
-  @Inject
-  public KafkaConsumerModule(KafkaConfiguration config) {
-    this.config = config;
-  }
-
-  @Override
-  protected void configure() {
-
-    bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
-    bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
-        .to(KafkaEventDeserializer.class);
-
-    if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
-    }
-    if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
-    }
-    if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
-    }
-    if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
-    }
-
-    DynamicSet.setOf(binder(), DroppedEventListener.class);
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
deleted file mode 100644
index 2e05de8..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaConsumerModule;
-
-public class KafkaForwardedEventRouterModule extends AbstractModule {
-  private KafkaConfiguration kafkaConfig;
-  private KafkaConsumerModule kafkaConsumerModule;
-
-  @Inject
-  public KafkaForwardedEventRouterModule(
-      KafkaConfiguration config, KafkaConsumerModule kafkaConsumerModule) {
-    this.kafkaConfig = config;
-    this.kafkaConsumerModule = kafkaConsumerModule;
-  }
-
-  @Override
-  protected void configure() {
-    if (kafkaConfig.kafkaSubscriber().enabled()) {
-      install(kafkaConsumerModule);
-    }
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
index 59df7bb..10b4adb 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
@@ -17,8 +17,7 @@
 import static com.google.common.truth.Truth.assertThat;
 
 import com.google.gerrit.server.config.SitePaths;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
 import java.io.File;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -38,8 +37,7 @@
   @Mock(answer = Answers.RETURNS_DEEP_STUBS)
   private Configuration configMock;
 
-  @Mock private KafkaForwardedEventRouterModule routerModule;
-  @Mock private KafkaBrokerForwarderModule brokerForwarderModule;
+  @Mock private BrokerModule brokerModule;
 
   @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -47,7 +45,7 @@
 
   @Before
   public void setup() {
-    module = new Module(configMock, routerModule, brokerForwarderModule);
+    module = new Module(configMock, brokerModule);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index d1da1f0..1d3db68 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -24,6 +24,7 @@
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.extensions.api.changes.ReviewInput;
 import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.gerrit.server.config.SitePaths;
@@ -36,16 +37,23 @@
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Key;
+import com.google.inject.Scopes;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.GitModule;
 import com.googlesource.gerrit.plugins.multisite.Module;
+import com.googlesource.gerrit.plugins.multisite.PluginModule;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
 import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
 import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -93,8 +101,20 @@
       }
     }
 
+    public static class TestBrokerModule extends BrokerModule {
+      @Override
+      protected void configure() {
+        DynamicItem.itemOf(binder(), BrokerApi.class);
+        bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
+
+        install(new SubscriberModule());
+      }
+    }
+
     private final FileBasedConfig config;
     private final Module multiSiteModule;
+    private final PluginModule pluginModule;
+    private final GitModule gitModule;
 
     @Inject
     public KafkaTestContainerModule(SitePaths sitePaths) throws IOException {
@@ -107,13 +127,13 @@
       config.save();
 
       Configuration multiSiteConfig = new Configuration(config, new Config());
-      KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(multiSiteConfig);
-      this.multiSiteModule =
-          new Module(
+      this.multiSiteModule = new Module(multiSiteConfig, new TestBrokerModule());
+      this.pluginModule =
+          new PluginModule(
               multiSiteConfig,
-              new KafkaForwardedEventRouterModule(
-                  kafkaConfiguration, new KafkaConsumerModule(kafkaConfiguration)),
-              new KafkaBrokerForwarderModule(kafkaConfiguration));
+              new ZkValidationModule(multiSiteConfig),
+              new KafkaBrokerModule(new KafkaConfiguration(multiSiteConfig)));
+      this.gitModule = new GitModule(multiSiteConfig);
     }
 
     @Override
@@ -124,6 +144,8 @@
         listener().toInstance(new KafkaStopAtShutdown(kafka));
 
         install(multiSiteModule);
+        install(pluginModule);
+        install(gitModule);
 
       } catch (IOException e) {
         throw new IllegalStateException(e);