Merge branch 'stable-3.0'

* stable-3.0:
  Move DroppedEventListener out of kafka domain
  Merge Kafka-related modules into single KafkaBrokerModule
  Decouple the multi-site engine from broker implementation
  Fix NPE when source ref is null
  Fix NPE when source ref is null
  Fix issue with Kafka topic alias resolving
  Replicate projects in local setup
  Cleanup unused forwarder interfaces

Change-Id: Iffab0e08a3163b2fc3817341fe47d5edd5cfeee4
diff --git a/setup_local_env/configs/replication.config b/setup_local_env/configs/replication.config
index ea9252f..cf7f66a 100644
--- a/setup_local_env/configs/replication.config
+++ b/setup_local_env/configs/replication.config
@@ -4,6 +4,9 @@
     timeout = 600
     rescheduleDelay = 15
     replicationDelay = $REPLICATION_DELAY_SEC
+    createMissingRepositories = true
+    replicateProjectDeletions = true
+    replicateHiddenProjects = true
 [gerrit]
     autoReload = true
     replicateOnStartup = false
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 30c188a..e9dcef7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -25,16 +25,12 @@
 import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.google.inject.spi.Message;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
 import com.googlesource.gerrit.plugins.multisite.event.EventModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
 import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectDeletedSharedDbCleanup;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.NoopSharedRefDatabase;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
@@ -52,17 +48,12 @@
 public class Module extends LifecycleModule {
   private static final Logger log = LoggerFactory.getLogger(Module.class);
   private Configuration config;
-  private KafkaForwardedEventRouterModule kafkaForwardedEventRouterModule;
-  private KafkaBrokerForwarderModule kafkaBrokerForwarderModule;
+  private BrokerModule brokerModule;
 
   @Inject
-  public Module(
-      Configuration config,
-      KafkaForwardedEventRouterModule forwardedEeventRouterModule,
-      KafkaBrokerForwarderModule brokerForwarderModule) {
+  public Module(Configuration config, BrokerModule brokerModule) {
     this.config = config;
-    this.kafkaForwardedEventRouterModule = forwardedEeventRouterModule;
-    this.kafkaBrokerForwarderModule = brokerForwarderModule;
+    this.brokerModule = brokerModule;
   }
 
   @Override
@@ -94,15 +85,10 @@
       install(new IndexModule());
     }
 
-    install(new BrokerModule());
-    DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
-    listener().to(KafkaBrokerApi.class);
+    install(brokerModule);
 
     install(new RouterModule());
 
-    install(kafkaForwardedEventRouterModule);
-    install(kafkaBrokerForwarderModule);
-
     if (config.getSharedRefDb().isEnabled()) {
       DynamicSet.bind(binder(), ProjectDeletedListener.class)
           .to(ProjectDeletedSharedDbCleanup.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index 22a8d2d..b27ae01 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -15,8 +15,13 @@
 package com.googlesource.gerrit.plugins.multisite;
 
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
 
 public class PluginModule extends LifecycleModule {
@@ -24,20 +29,29 @@
 
   private Configuration config;
   private ZkValidationModule zkValidationModule;
+  private KafkaBrokerModule kafkaBrokerModule;
 
   @Inject
-  public PluginModule(Configuration config, ZkValidationModule zkValidationModule) {
+  public PluginModule(
+      Configuration config,
+      ZkValidationModule zkValidationModule,
+      KafkaBrokerModule kafkaBrokerModule) {
     this.config = config;
     this.zkValidationModule = zkValidationModule;
+    this.kafkaBrokerModule = kafkaBrokerModule;
   }
 
   @Override
   protected void configure() {
-    listener().to(PluginStartup.class);
-
     if (config.getSharedRefDb().isEnabled()) {
+      listener().to(PluginStartup.class);
       logger.atInfo().log("Shared ref-db engine: Zookeeper");
       install(zkValidationModule);
     }
+
+    DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
+    listener().to(KafkaBrokerApi.class);
+
+    install(kafkaBrokerModule);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedStreamEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
similarity index 60%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedStreamEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
index 0e50ea9..19cf0f7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedStreamEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
@@ -12,8 +12,19 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.forwarder.router;
+package com.googlesource.gerrit.plugins.multisite.broker;
 
 import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import java.util.function.Consumer;
 
-public interface ForwardedStreamEventRouter extends ForwardedEventRouter<Event> {}
+public class BrokerApiNoOp implements BrokerApi {
+
+  @Override
+  public boolean send(String topic, Event event) {
+    return true;
+  }
+
+  @Override
+  public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {}
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
index f1ab1f6..6983984 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -24,6 +24,7 @@
   @Override
   protected void configure() {
     DynamicItem.itemOf(binder(), BrokerApi.class);
+    DynamicItem.bind(binder(), BrokerApi.class).to(BrokerApiNoOp.class).in(Scopes.SINGLETON);
 
     bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
deleted file mode 100644
index a49aa8b..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
+++ /dev/null
@@ -1,62 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker.kafka;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerCacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-
-public class KafkaBrokerForwarderModule extends LifecycleModule {
-  private final KafkaConfiguration config;
-
-  @Inject
-  public KafkaBrokerForwarderModule(KafkaConfiguration config) {
-    this.config = config;
-  }
-
-  @Override
-  protected void configure() {
-    if (config.kafkaPublisher().enabled()) {
-      listener().to(BrokerPublisher.class);
-      bind(BrokerSession.class).to(KafkaSession.class);
-
-      if (config.kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC)) {
-        DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
-      }
-      if (config.kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC)) {
-        DynamicSet.bind(binder(), CacheEvictionForwarder.class)
-            .to(BrokerCacheEvictionForwarder.class);
-      }
-      if (config.kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
-        DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
-            .to(BrokerProjectListUpdateForwarder.class);
-      }
-      if (config.kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
-        DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
-      }
-    }
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 99c533d..3d1046f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -23,6 +23,7 @@
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
@@ -41,7 +42,7 @@
   private SubscriberMetrics subscriberMetrics;
 
   public AbstractSubcriber(
-      BrokerApi brokerApi,
+      BrokerApiWrapper brokerApi,
       ForwardedEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @EventGson Gson gson,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index fc352da..53aae99 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -16,14 +16,12 @@
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventGson;
-import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
 import java.util.UUID;
@@ -32,7 +30,7 @@
 public class CacheEvictionEventSubscriber extends AbstractSubcriber {
   @Inject
   public CacheEvictionEventSubscriber(
-      BrokerApi brokerApi,
+      BrokerApiWrapper brokerApi,
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @EventGson Gson gsonProvider,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index 46a34b5..eacccbf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -16,14 +16,12 @@
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventGson;
-import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
 import java.util.UUID;
@@ -32,7 +30,7 @@
 public class IndexEventSubscriber extends AbstractSubcriber {
   @Inject
   public IndexEventSubscriber(
-      BrokerApi brokerApi,
+      BrokerApiWrapper brokerApi,
       IndexEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @EventGson Gson gsonProvider,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index 6f22566..4fa7f64 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -16,14 +16,12 @@
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventGson;
-import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
 import java.util.UUID;
@@ -32,7 +30,7 @@
 public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
   @Inject
   public ProjectUpdateEventSubscriber(
-      BrokerApi brokerApi,
+      BrokerApiWrapper brokerApi,
       ProjectListUpdateRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @EventGson Gson gson,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index 9cbda49..2918657 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -16,14 +16,12 @@
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventGson;
-import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
 import java.util.UUID;
@@ -32,7 +30,7 @@
 public class StreamEventSubscriber extends AbstractSubcriber {
   @Inject
   public StreamEventSubscriber(
-      BrokerApi brokerApi,
+      BrokerApiWrapper brokerApi,
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @EventGson Gson gson,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
index 940ad1f..0a0c350 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
@@ -32,5 +32,6 @@
     listener().to(MultiSiteConsumerRunner.class);
 
     DynamicSet.setOf(binder(), AbstractSubcriber.class);
+    DynamicSet.setOf(binder(), DroppedEventListener.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index 24b29b7..7d42acc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -14,28 +14,32 @@
 
 package com.googlesource.gerrit.plugins.multisite.forwarder.events;
 
-import com.google.common.base.CaseFormat;
-
 public enum EventTopic {
-  INDEX_TOPIC("GERRIT.EVENT.INDEX"),
-  CACHE_TOPIC("GERRIT.EVENT.CACHE"),
-  PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST"),
-  STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM");
+  INDEX_TOPIC("GERRIT.EVENT.INDEX", "indexEvent"),
+  CACHE_TOPIC("GERRIT.EVENT.CACHE", "cacheEvent"),
+  PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST", "projectListEvent"),
+  STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM", "streamEvent");
 
   private final String topic;
+  private final String aliasKey;
 
-  private EventTopic(String topic) {
+  private EventTopic(String topic, String aliasKey) {
     this.topic = topic;
-  }
-
-  public String lowerCamelName() {
-    return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, name());
+    this.aliasKey = aliasKey;
   }
 
   public String topic() {
     return topic;
   }
 
+  public String topicAliasKey() {
+    return aliasKey + "Topic";
+  }
+
+  public String enabledKey() {
+    return aliasKey + "Enabled";
+  }
+
   public static EventTopic of(String topicString) {
     EventTopic[] topics = EventTopic.values();
     for (EventTopic topic : topics) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java
index eaf1ff7..464ceef 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java
@@ -23,7 +23,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
 
 @Singleton
-public class CacheEvictionEventRouter implements ForwardedCacheEvictionEventRouter {
+public class CacheEvictionEventRouter implements ForwardedEventRouter<CacheEvictionEvent> {
   private final ForwardedCacheEvictionHandler cacheEvictionHanlder;
   private final GsonParser gsonParser;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedCacheEvictionEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedCacheEvictionEventRouter.java
deleted file mode 100644
index 79ab0b9..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedCacheEvictionEventRouter.java
+++ /dev/null
@@ -1,20 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.forwarder.router;
-
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
-
-public interface ForwardedCacheEvictionEventRouter
-    extends ForwardedEventRouter<CacheEvictionEvent> {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedIndexEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedIndexEventRouter.java
deleted file mode 100644
index a843fcc..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedIndexEventRouter.java
+++ /dev/null
@@ -1,19 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.forwarder.router;
-
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
-
-public interface ForwardedIndexEventRouter extends ForwardedEventRouter<IndexEvent> {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedProjectListUpdateRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedProjectListUpdateRouter.java
deleted file mode 100644
index 37cc3de..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedProjectListUpdateRouter.java
+++ /dev/null
@@ -1,20 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.forwarder.router;
-
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
-
-public interface ForwardedProjectListUpdateRouter
-    extends ForwardedEventRouter<ProjectListUpdateEvent> {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
index 1e21ae2..34a9155 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
@@ -34,7 +34,7 @@
 import java.util.Optional;
 
 @Singleton
-public class IndexEventRouter implements ForwardedIndexEventRouter {
+public class IndexEventRouter implements ForwardedEventRouter<IndexEvent> {
   private final ForwardedIndexAccountHandler indexAccountHandler;
   private final ForwardedIndexChangeHandler indexChangeHandler;
   private final ForwardedIndexGroupHandler indexGroupHandler;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ProjectListUpdateRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ProjectListUpdateRouter.java
index 2248c4c..0fcebc2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ProjectListUpdateRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ProjectListUpdateRouter.java
@@ -21,7 +21,7 @@
 import java.io.IOException;
 
 @Singleton
-public class ProjectListUpdateRouter implements ForwardedProjectListUpdateRouter {
+public class ProjectListUpdateRouter implements ForwardedEventRouter<ProjectListUpdateEvent> {
   ForwardedProjectListUpdateHandler projectListUpdateHandler;
 
   @Inject
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/RouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/RouterModule.java
index 6f1572a..2c29d1f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/RouterModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/RouterModule.java
@@ -19,9 +19,9 @@
 public class RouterModule extends AbstractModule {
   @Override
   protected void configure() {
-    bind(ForwardedIndexEventRouter.class).to(IndexEventRouter.class);
-    bind(ForwardedCacheEvictionEventRouter.class).to(CacheEvictionEventRouter.class);
-    bind(ForwardedProjectListUpdateRouter.class).to(ProjectListUpdateRouter.class);
-    bind(ForwardedStreamEventRouter.class).to(StreamEventRouter.class);
+    bind(IndexEventRouter.class);
+    bind(CacheEvictionEventRouter.class);
+    bind(ProjectListUpdateRouter.class);
+    bind(StreamEventRouter.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
index a330416..aa8752e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
@@ -21,7 +21,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
 
 @Singleton
-public class StreamEventRouter implements ForwardedStreamEventRouter {
+public class StreamEventRouter implements ForwardedEventRouter<Event> {
   private final ForwardedEventHandler streamEventHandler;
 
   @Inject
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
new file mode 100644
index 0000000..5fdb07e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
@@ -0,0 +1,92 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Inject;
+import com.google.inject.TypeLiteral;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaSession;
+import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerCacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventDeserializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+
+public class KafkaBrokerModule extends LifecycleModule {
+  private KafkaConfiguration config;
+
+  @Inject
+  public KafkaBrokerModule(KafkaConfiguration config) {
+    this.config = config;
+  }
+
+  @Override
+  protected void configure() {
+    if (config.kafkaSubscriber().enabled()) {
+      bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
+      bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
+          .to(KafkaEventDeserializer.class);
+
+      if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
+        DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
+      }
+      if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
+        DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
+      }
+      if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
+        DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
+      }
+      if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
+        DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
+      }
+    }
+
+    if (config.kafkaPublisher().enabled()) {
+      listener().to(BrokerPublisher.class);
+      bind(BrokerSession.class).to(KafkaSession.class);
+
+      if (config.kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC)) {
+        DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
+      }
+      if (config.kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC)) {
+        DynamicSet.bind(binder(), CacheEvictionForwarder.class)
+            .to(BrokerCacheEvictionForwarder.class);
+      }
+      if (config.kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
+        DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
+            .to(BrokerProjectListUpdateForwarder.class);
+      }
+      if (config.kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
+        DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
+      }
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
index baec520..97528de 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
@@ -20,7 +20,6 @@
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
@@ -110,13 +109,12 @@
       Supplier<Config> config, String subsection) {
     Map<EventTopic, Boolean> eventsEnabled = new HashMap<>();
     for (EventTopic topic : EventTopic.values()) {
-      String enabledConfigKey = topic.lowerCamelName() + "Enabled";
-
       eventsEnabled.put(
           topic,
           config
               .get()
-              .getBoolean(KAFKA_SECTION, subsection, enabledConfigKey, DEFAULT_ENABLE_PROCESSING));
+              .getBoolean(
+                  KAFKA_SECTION, subsection, topic.enabledKey(), DEFAULT_ENABLE_PROCESSING));
     }
     return eventsEnabled;
   }
@@ -147,28 +145,16 @@
     private final Map<EventTopic, String> eventTopics;
     private final String bootstrapServers;
 
-    private static final ImmutableMap<EventTopic, String> EVENT_TOPICS =
-        ImmutableMap.of(
-            EventTopic.INDEX_TOPIC,
-            "GERRIT.EVENT.INDEX",
-            EventTopic.STREAM_EVENT_TOPIC,
-            "GERRIT.EVENT.STREAM",
-            EventTopic.CACHE_TOPIC,
-            "GERRIT.EVENT.CACHE",
-            EventTopic.PROJECT_LIST_TOPIC,
-            "GERRIT.EVENT.PROJECT.LIST");
-
     Kafka(Supplier<Config> config) {
       this.bootstrapServers =
           getString(
               config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
 
       this.eventTopics = new HashMap<>();
-      for (Map.Entry<EventTopic, String> topicDefault : EVENT_TOPICS.entrySet()) {
-        String topicConfigKey = topicDefault.getKey().lowerCamelName() + "Topic";
+      for (EventTopic eventTopic : EventTopic.values()) {
         eventTopics.put(
-            topicDefault.getKey(),
-            getString(config, KAFKA_SECTION, null, topicConfigKey, topicDefault.getValue()));
+            eventTopic,
+            getString(config, KAFKA_SECTION, null, eventTopic.topicAliasKey(), eventTopic.topic()));
       }
     }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
deleted file mode 100644
index b7bcabd..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.inject.Inject;
-import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
-import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.Deserializer;
-
-public class KafkaConsumerModule extends LifecycleModule {
-
-  private KafkaConfiguration config;
-
-  @Inject
-  public KafkaConsumerModule(KafkaConfiguration config) {
-    this.config = config;
-  }
-
-  @Override
-  protected void configure() {
-
-    bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
-    bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
-        .to(KafkaEventDeserializer.class);
-
-    if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
-    }
-    if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
-    }
-    if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
-    }
-    if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
-    }
-
-    DynamicSet.setOf(binder(), DroppedEventListener.class);
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
deleted file mode 100644
index 2e05de8..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaConsumerModule;
-
-public class KafkaForwardedEventRouterModule extends AbstractModule {
-  private KafkaConfiguration kafkaConfig;
-  private KafkaConsumerModule kafkaConsumerModule;
-
-  @Inject
-  public KafkaForwardedEventRouterModule(
-      KafkaConfiguration config, KafkaConsumerModule kafkaConsumerModule) {
-    this.kafkaConfig = config;
-    this.kafkaConsumerModule = kafkaConsumerModule;
-  }
-
-  @Override
-  protected void configure() {
-    if (kafkaConfig.kafkaSubscriber().enabled()) {
-      install(kafkaConsumerModule);
-    }
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
index 94a0a9b..5f25607 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
@@ -84,7 +84,7 @@
   }
 
   private String changePrefix(String changeRef) {
-    if (!changeRef.startsWith("refs/changes")) {
+    if (changeRef == null || !changeRef.startsWith("refs/changes")) {
       return changeRef;
     }
     if (changeRef.endsWith(REF_META_SUFFIX)) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
index 59df7bb..10b4adb 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
@@ -17,8 +17,7 @@
 import static com.google.common.truth.Truth.assertThat;
 
 import com.google.gerrit.server.config.SitePaths;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
 import java.io.File;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -38,8 +37,7 @@
   @Mock(answer = Answers.RETURNS_DEEP_STUBS)
   private Configuration configMock;
 
-  @Mock private KafkaForwardedEventRouterModule routerModule;
-  @Mock private KafkaBrokerForwarderModule brokerForwarderModule;
+  @Mock private BrokerModule brokerModule;
 
   @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -47,7 +45,7 @@
 
   @Before
   public void setup() {
-    module = new Module(configMock, routerModule, brokerForwarderModule);
+    module = new Module(configMock, brokerModule);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
index ba423ff..84e1dd8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
@@ -22,6 +22,7 @@
 import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
 
 import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import org.eclipse.jgit.lib.Config;
 import org.junit.Before;
 import org.junit.Test;
@@ -121,4 +122,79 @@
 
     assertThat(property).isNull();
   }
+
+  @Test
+  public void shouldReturnKafkaTopicAliasForIndexTopic() {
+    setKafkaTopicAlias("indexEventTopic", "gerrit_index");
+    final String property = getConfiguration().getKafka().getTopicAlias(EventTopic.INDEX_TOPIC);
+
+    assertThat(property).isEqualTo("gerrit_index");
+  }
+
+  @Test
+  public void shouldReturnKafkaTopicAliasForStreamEventTopic() {
+    setKafkaTopicAlias("streamEventTopic", "gerrit_stream_events");
+    final String property =
+        getConfiguration().getKafka().getTopicAlias(EventTopic.STREAM_EVENT_TOPIC);
+
+    assertThat(property).isEqualTo("gerrit_stream_events");
+  }
+
+  @Test
+  public void shouldReturnKafkaTopicAliasForProjectListEventTopic() {
+    setKafkaTopicAlias("projectListEventTopic", "gerrit_project_list");
+    final String property =
+        getConfiguration().getKafka().getTopicAlias(EventTopic.PROJECT_LIST_TOPIC);
+
+    assertThat(property).isEqualTo("gerrit_project_list");
+  }
+
+  @Test
+  public void shouldReturnKafkaTopicAliasForCacheEventTopic() {
+    setKafkaTopicAlias("cacheEventTopic", "gerrit_cache");
+    final String property = getConfiguration().getKafka().getTopicAlias(EventTopic.CACHE_TOPIC);
+
+    assertThat(property).isEqualTo("gerrit_cache");
+  }
+
+  @Test
+  public void shouldReturnKafkaTopicEnabledForCacheEventTopic() {
+    setKafkaTopicEnabled("cacheEventEnabled", false);
+    final Boolean property =
+        getConfiguration().kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC);
+    assertThat(property).isFalse();
+  }
+
+  @Test
+  public void shouldReturnKafkaTopicEnabledForIndexTopic() {
+    setKafkaTopicEnabled("indexEventEnabled", false);
+    final Boolean property =
+        getConfiguration().kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC);
+    assertThat(property).isFalse();
+  }
+
+  @Test
+  public void shouldReturnKafkaTopicEnabledForStreamEventTopic() {
+    setKafkaTopicEnabled("streamEventEnabled", false);
+    final Boolean property =
+        getConfiguration().kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC);
+    assertThat(property).isFalse();
+  }
+
+  @Test
+  public void shouldReturnKafkaTopicEnabledForProjectListEventTopic() {
+    setKafkaTopicEnabled("projectListEventEnabled", false);
+    final Boolean property =
+        getConfiguration().kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC);
+    assertThat(property).isFalse();
+  }
+
+  private void setKafkaTopicAlias(String topicKey, String topic) {
+    globalPluginConfig.setString(KAFKA_SECTION, null, topicKey, topic);
+  }
+
+  private void setKafkaTopicEnabled(String topicEnabledKey, Boolean isEnabled) {
+    globalPluginConfig.setBoolean(
+        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, topicEnabledKey, isEnabled);
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 5683bfb..370d6da 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -24,6 +24,7 @@
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.extensions.api.changes.ReviewInput;
 import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.gerrit.server.config.SitePaths;
@@ -37,15 +38,22 @@
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Key;
+import com.google.inject.Scopes;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.GitModule;
 import com.googlesource.gerrit.plugins.multisite.Module;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.PluginModule;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
 import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
 import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -92,8 +100,20 @@
       }
     }
 
+    public static class TestBrokerModule extends BrokerModule {
+      @Override
+      protected void configure() {
+        DynamicItem.itemOf(binder(), BrokerApi.class);
+        bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
+
+        install(new SubscriberModule());
+      }
+    }
+
     private final FileBasedConfig config;
     private final Module multiSiteModule;
+    private final PluginModule pluginModule;
+    private final GitModule gitModule;
 
     @Inject
     public KafkaTestContainerModule(SitePaths sitePaths) throws IOException {
@@ -106,13 +126,13 @@
       config.save();
 
       Configuration multiSiteConfig = new Configuration(config, new Config());
-      KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(multiSiteConfig);
-      this.multiSiteModule =
-          new Module(
+      this.multiSiteModule = new Module(multiSiteConfig, new TestBrokerModule());
+      this.pluginModule =
+          new PluginModule(
               multiSiteConfig,
-              new KafkaForwardedEventRouterModule(
-                  kafkaConfiguration, new KafkaConsumerModule(kafkaConfiguration)),
-              new KafkaBrokerForwarderModule(kafkaConfiguration));
+              new ZkValidationModule(multiSiteConfig),
+              new KafkaBrokerModule(new KafkaConfiguration(multiSiteConfig)));
+      this.gitModule = new GitModule(multiSiteConfig);
     }
 
     @Override
@@ -123,6 +143,8 @@
         listener().toInstance(new KafkaStopAtShutdown(kafka));
 
         install(multiSiteModule);
+        install(pluginModule);
+        install(gitModule);
 
       } catch (IOException e) {
         throw new IllegalStateException(e);