Kafka module for multi-site

Extract Kafka related code from main module to a separate Kafka modules.
It will help us to move it later on to a different component.

Feature: Issue 10825
Change-Id: Id1b92ff4e9f108879b32f5c1ba8aa5f65cb31d15
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 7348137..ca01471 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -45,19 +45,19 @@
 
   public static final String PLUGIN_NAME = "multi-site";
   public static final String MULTI_SITE_CONFIG = PLUGIN_NAME + ".config";
-  public static final String REPLICATION_CONFIG = "replication.config";
 
   static final String INSTANCE_ID_FILE = "instanceId.data";
-
-  // common parameters to cache and index sections
   static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
-  static final int DEFAULT_INDEX_MAX_TRIES = 2;
-  static final int DEFAULT_INDEX_RETRY_INTERVAL = 30000;
   static final int DEFAULT_THREAD_POOL_SIZE = 4;
-  static final String NUM_STRIPED_LOCKS = "numStripedLocks";
-  static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
   static final String ENABLE_KEY = "enabled";
 
+  private static final String REPLICATION_CONFIG = "replication.config";
+  // common parameters to cache and index sections
+  private static final int DEFAULT_INDEX_MAX_TRIES = 2;
+  private static final int DEFAULT_INDEX_RETRY_INTERVAL = 30000;
+  private static final String NUM_STRIPED_LOCKS = "numStripedLocks";
+  private static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
+
   private final Supplier<Cache> cache;
   private final Supplier<Event> event;
   private final Supplier<Index> index;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 8801f11..156ce27 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -26,13 +26,12 @@
 import com.google.inject.spi.Message;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
 import com.googlesource.gerrit.plugins.multisite.event.EventModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaConsumerModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.ForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
 import com.googlesource.gerrit.plugins.multisite.validation.ValidationModule;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
 import java.io.BufferedReader;
@@ -50,12 +49,17 @@
   private static final Logger log = LoggerFactory.getLogger(Module.class);
   private Configuration config;
   private NoteDbStatus noteDb;
+  private KafkaForwardedEventRouterModule kafkaForwardedEventRouterModule;
+  private KafkaBrokerForwarderModule kafkaBrokerForwarderModule;
   private final boolean disableGitRepositoryValidation;
-  private KafkaConfiguration kafkaConfig;
 
   @Inject
-  public Module(Configuration config, NoteDbStatus noteDb, KafkaConfiguration kafkaConfig) {
-    this(config, noteDb, kafkaConfig, false);
+  public Module(
+      Configuration config,
+      NoteDbStatus noteDb,
+      KafkaForwardedEventRouterModule forwardedEeventRouterModule,
+      KafkaBrokerForwarderModule brokerForwarderModule) {
+    this(config, noteDb, forwardedEeventRouterModule, brokerForwarderModule, false);
   }
 
   // TODO: It is not possible to properly test the libModules in Gerrit.
@@ -66,16 +70,22 @@
   public Module(
       Configuration config,
       NoteDbStatus noteDb,
-      KafkaConfiguration kafkaConfig,
+      KafkaForwardedEventRouterModule forwardedEeventRouterModule,
+      KafkaBrokerForwarderModule brokerForwarderModule,
       boolean disableGitRepositoryValidation) {
-    init(config, noteDb, kafkaConfig);
+    init(config, noteDb, forwardedEeventRouterModule, brokerForwarderModule);
     this.disableGitRepositoryValidation = disableGitRepositoryValidation;
   }
 
-  private void init(Configuration config, NoteDbStatus noteDb, KafkaConfiguration kafkaConfig) {
+  private void init(
+      Configuration config,
+      NoteDbStatus noteDb,
+      KafkaForwardedEventRouterModule forwardedEeventRouterModule,
+      KafkaBrokerForwarderModule brokerForwarderModule) {
     this.config = config;
     this.noteDb = noteDb;
-    this.kafkaConfig = kafkaConfig;
+    this.kafkaForwardedEventRouterModule = forwardedEeventRouterModule;
+    this.kafkaBrokerForwarderModule = brokerForwarderModule;
   }
 
   @Override
@@ -106,13 +116,9 @@
       install(new IndexModule());
     }
 
-    if (kafkaConfig.kafkaSubscriber().enabled()) {
-      install(new KafkaConsumerModule(kafkaConfig.kafkaSubscriber()));
-      install(new ForwardedEventRouterModule());
-    }
-    if (kafkaConfig.kafkaPublisher().enabled()) {
-      install(new BrokerForwarderModule(kafkaConfig.kafkaPublisher()));
-    }
+    install(kafkaForwardedEventRouterModule);
+
+    install(kafkaBrokerForwarderModule);
 
     install(
         new ValidationModule(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
new file mode 100644
index 0000000..62bdcb0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
@@ -0,0 +1,63 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker.kafka;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerCacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
+
+public class KafkaBrokerForwarderModule extends LifecycleModule {
+  private final KafkaConfiguration config;
+
+  @Inject
+  public KafkaBrokerForwarderModule(KafkaConfiguration config) {
+    this.config = config;
+  }
+
+  @Override
+  protected void configure() {
+    if (config.kafkaPublisher().enabled()) {
+      listener().to(BrokerPublisher.class);
+      bind(BrokerSession.class).to(KafkaSession.class);
+
+      if (config.kafkaPublisher().enabledEvent(EventFamily.INDEX_EVENT)) {
+        DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
+      }
+      if (config.kafkaPublisher().enabledEvent(EventFamily.CACHE_EVENT)) {
+        DynamicSet.bind(binder(), CacheEvictionForwarder.class)
+            .to(BrokerCacheEvictionForwarder.class);
+      }
+      if (config.kafkaPublisher().enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
+        DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
+            .to(BrokerProjectListUpdateForwarder.class);
+      }
+      if (config.kafkaPublisher().enabledEvent(EventFamily.STREAM_EVENT)) {
+        DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
+      }
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
index 7c83346..aaf329a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
@@ -16,9 +16,9 @@
 
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
deleted file mode 100644
index b7ee20b..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaPublisher;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-
-public class BrokerForwarderModule extends LifecycleModule {
-  private final KafkaPublisher kafkaPublisher;
-
-  public BrokerForwarderModule(KafkaPublisher kafkaPublisher) {
-    this.kafkaPublisher = kafkaPublisher;
-  }
-
-  @Override
-  protected void configure() {
-    listener().to(BrokerPublisher.class);
-    bind(BrokerSession.class).to(KafkaSession.class);
-
-    if (kafkaPublisher.enabledEvent(EventFamily.INDEX_EVENT)) {
-      DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
-    }
-    if (kafkaPublisher.enabledEvent(EventFamily.CACHE_EVENT)) {
-      DynamicSet.bind(binder(), CacheEvictionForwarder.class)
-          .to(BrokerCacheEvictionForwarder.class);
-    }
-    if (kafkaPublisher.enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
-      DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
-          .to(BrokerProjectListUpdateForwarder.class);
-    }
-    if (kafkaPublisher.enabledEvent(EventFamily.STREAM_EVENT)) {
-      DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
-    }
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
index 808245f..cb05a4e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
@@ -23,7 +23,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
 
 @Singleton
-class BrokerProjectListUpdateForwarder implements ProjectListUpdateForwarder {
+public class BrokerProjectListUpdateForwarder implements ProjectListUpdateForwarder {
   private final BrokerPublisher publisher;
 
   @Inject
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/CacheEvictionEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java
similarity index 95%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/CacheEvictionEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java
index c2a06d2..e07026d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/CacheEvictionEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedCacheEvictionEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedCacheEvictionEventRouter.java
similarity index 91%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedCacheEvictionEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedCacheEvictionEventRouter.java
index 7f740d5..79ab0b9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedCacheEvictionEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedCacheEvictionEventRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedEventRouter.java
similarity index 93%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedEventRouter.java
index 616b40a..139020b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedEventRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gwtorm.server.OrmException;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedIndexEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedIndexEventRouter.java
similarity index 91%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedIndexEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedIndexEventRouter.java
index 59dcaa4..a843fcc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedIndexEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedIndexEventRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedProjectListUpdateRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedProjectListUpdateRouter.java
similarity index 92%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedProjectListUpdateRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedProjectListUpdateRouter.java
index 1df3129..37cc3de 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedProjectListUpdateRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedProjectListUpdateRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedStreamEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedStreamEventRouter.java
similarity index 91%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedStreamEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedStreamEventRouter.java
index c96ab1a..0e50ea9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedStreamEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedStreamEventRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.google.gerrit.server.events.Event;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/IndexEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
similarity index 98%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/IndexEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
index e859f40..5abf527 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/IndexEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import static com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler.Operation.DELETE;
 import static com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler.Operation.INDEX;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ProjectListUpdateRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ProjectListUpdateRouter.java
similarity index 95%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ProjectListUpdateRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ProjectListUpdateRouter.java
index 2cce12c..2248c4c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ProjectListUpdateRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ProjectListUpdateRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/StreamEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
similarity index 94%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/StreamEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
index ed65662..adb9e87 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/StreamEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.permissions.PermissionBackendException;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
similarity index 91%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
index b88699a..ea16bc3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
@@ -12,19 +12,18 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite;
+package com.googlesource.gerrit.plugins.multisite.kafka;
 
 import static com.google.common.base.Suppliers.memoize;
 import static com.google.common.base.Suppliers.ofInstance;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
-import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import java.io.IOException;
 import java.util.HashMap;
@@ -35,7 +34,6 @@
 import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.util.FS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,11 +41,11 @@
 public class KafkaConfiguration {
 
   private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
-  public static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
+  static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
   static final String KAFKA_SECTION = "kafka";
   static final String ENABLE_KEY = "enabled";
-  static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
-  static final boolean DEFAULT_ENABLE_PROCESSING = true;
+  private static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
+  private static final boolean DEFAULT_ENABLE_PROCESSING = true;
   private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
 
   private final Supplier<KafkaSubscriber> subscriber;
@@ -55,22 +53,13 @@
   private final Supplier<KafkaPublisher> publisher;
 
   @Inject
-  KafkaConfiguration(SitePaths sitePaths) {
-    this(getConfigFile(sitePaths, Configuration.MULTI_SITE_CONFIG));
-  }
-
-  @VisibleForTesting
-  public KafkaConfiguration(Config kafkaConfig) {
-    Supplier<Config> lazyCfg = lazyLoad(kafkaConfig);
+  public KafkaConfiguration(Configuration configuration) {
+    Supplier<Config> lazyCfg = lazyLoad(configuration.getMultiSiteConfig());
     kafka = memoize(() -> new Kafka(lazyCfg));
     publisher = memoize(() -> new KafkaPublisher(lazyCfg));
     subscriber = memoize(() -> new KafkaSubscriber(lazyCfg));
   }
 
-  private static FileBasedConfig getConfigFile(SitePaths sitePaths, String configFileName) {
-    return new FileBasedConfig(sitePaths.etc_dir.resolve(configFileName).toFile(), FS.DETECTED);
-  }
-
   public Kafka getKafka() {
     return kafka.get();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index b56c8c2..e3ca413 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -24,12 +24,12 @@
 import com.google.gson.Gson;
 import com.google.gwtorm.server.OrmException;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
index 9020486..09938db 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
@@ -20,11 +20,11 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
 import org.apache.kafka.common.serialization.Deserializer;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
similarity index 87%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
index 12b2790..ca07e78 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
@@ -20,18 +20,18 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
 import org.apache.kafka.common.serialization.Deserializer;
 
 @Singleton
-public class CacheEvictionEventSubscriber extends AbstractKafkaSubcriber {
+public class KafkaCacheEvictionEventSubscriber extends AbstractKafkaSubcriber {
   @Inject
-  public CacheEvictionEventSubscriber(
+  public KafkaCacheEvictionEventSubscriber(
       KafkaConfiguration configuration,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
index 3e0e5d7..cc8a849 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
@@ -16,10 +16,11 @@
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Inject;
 import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaSubscriber;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -27,10 +28,11 @@
 
 public class KafkaConsumerModule extends LifecycleModule {
 
-  private final KafkaSubscriber kafkaSubscriber;
+  private KafkaConfiguration config;
 
-  public KafkaConsumerModule(KafkaSubscriber kafkaSubscriber) {
-    this.kafkaSubscriber = kafkaSubscriber;
+  @Inject
+  public KafkaConsumerModule(KafkaConfiguration config) {
+    this.config = config;
   }
 
   @Override
@@ -47,17 +49,17 @@
 
     DynamicSet.setOf(binder(), AbstractKafkaSubcriber.class);
 
-    if (kafkaSubscriber.enabledEvent(EventFamily.INDEX_EVENT)) {
+    if (config.kafkaSubscriber().enabledEvent(EventFamily.INDEX_EVENT)) {
       DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(IndexEventSubscriber.class);
     }
-    if (kafkaSubscriber.enabledEvent(EventFamily.STREAM_EVENT)) {
+    if (config.kafkaSubscriber().enabledEvent(EventFamily.STREAM_EVENT)) {
       DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(StreamEventSubscriber.class);
     }
-    if (kafkaSubscriber.enabledEvent(EventFamily.CACHE_EVENT)) {
+    if (config.kafkaSubscriber().enabledEvent(EventFamily.CACHE_EVENT)) {
       DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
-          .to(CacheEvictionEventSubscriber.class);
+          .to(KafkaCacheEvictionEventSubscriber.class);
     }
-    if (kafkaSubscriber.enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
+    if (config.kafkaSubscriber().enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
       DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
           .to(ProjectUpdateEventSubscriber.class);
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
index 9e5db3a..2ba33e4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
@@ -20,11 +20,11 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.ProjectListUpdateRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
 import org.apache.kafka.common.serialization.Deserializer;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
index 66070b6..77f3c85 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
@@ -20,11 +20,11 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
 import org.apache.kafka.common.serialization.Deserializer;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouterModule.java
deleted file mode 100644
index dd11180..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouterModule.java
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
-
-import com.google.inject.AbstractModule;
-
-public class ForwardedEventRouterModule extends AbstractModule {
-  @Override
-  protected void configure() {
-    bind(ForwardedIndexEventRouter.class).to(IndexEventRouter.class);
-    bind(ForwardedCacheEvictionEventRouter.class).to(CacheEvictionEventRouter.class);
-    bind(ForwardedProjectListUpdateRouter.class).to(ProjectListUpdateRouter.class);
-    bind(ForwardedStreamEventRouter.class).to(StreamEventRouter.class);
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
new file mode 100644
index 0000000..a6422ae
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
@@ -0,0 +1,52 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka.router;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.CacheEvictionEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedCacheEvictionEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedIndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedProjectListUpdateRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedStreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
+import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaConsumerModule;
+
+public class KafkaForwardedEventRouterModule extends AbstractModule {
+  private KafkaConfiguration kafkaConfig;
+  private KafkaConsumerModule kafkaConsumerModule;
+
+  @Inject
+  public KafkaForwardedEventRouterModule(
+      KafkaConfiguration config, KafkaConsumerModule kafkaConsumerModule) {
+    this.kafkaConfig = config;
+    this.kafkaConsumerModule = kafkaConsumerModule;
+  }
+
+  @Override
+  protected void configure() {
+    if (kafkaConfig.kafkaSubscriber().enabled()) {
+      bind(ForwardedIndexEventRouter.class).to(IndexEventRouter.class);
+      bind(ForwardedCacheEvictionEventRouter.class).to(CacheEvictionEventRouter.class);
+      bind(ForwardedProjectListUpdateRouter.class).to(ProjectListUpdateRouter.class);
+      bind(ForwardedStreamEventRouter.class).to(StreamEventRouter.class);
+
+      install(kafkaConsumerModule);
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
index e6f7921..e93d5f3 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
@@ -17,6 +17,8 @@
 import static com.google.common.truth.Truth.assertThat;
 
 import com.google.gerrit.server.config.SitePaths;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
 import java.io.File;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -36,18 +38,18 @@
   @Mock(answer = Answers.RETURNS_DEEP_STUBS)
   private Configuration configMock;
 
-  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
-  private KafkaConfiguration kafkaConfig;
-
   @Mock private NoteDbStatus noteDb;
 
+  @Mock private KafkaForwardedEventRouterModule routerModule;
+  @Mock private KafkaBrokerForwarderModule brokerForwarderModule;
+
   @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
 
   private Module module;
 
   @Before
-  public void setUp() {
-    module = new Module(configMock, noteDb, kafkaConfig);
+  public void setup() {
+    module = new Module(configMock, noteDb, routerModule, brokerForwarderModule);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
similarity index 83%
rename from src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java
rename to src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
index ab0d3bc..ba423ff 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
@@ -12,15 +12,16 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite;
+package com.googlesource.gerrit.plugins.multisite.kafka;
 
 import static com.google.common.truth.Truth.assertThat;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.ENABLE_KEY;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_PROPERTY_PREFIX;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_SECTION;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
+import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.ENABLE_KEY;
+import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KAFKA_PROPERTY_PREFIX;
+import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KAFKA_SECTION;
+import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
+import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
 
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import org.eclipse.jgit.lib.Config;
 import org.junit.Before;
 import org.junit.Test;
@@ -31,14 +32,16 @@
 public class KafkaConfigurationTest {
 
   private Config globalPluginConfig;
+  private Configuration multiSiteConfig;
 
   @Before
-  public void setUp() {
+  public void setup() {
     globalPluginConfig = new Config();
+    multiSiteConfig = new Configuration(globalPluginConfig, new Config());
   }
 
   private KafkaConfiguration getConfiguration() {
-    return new KafkaConfiguration(globalPluginConfig);
+    return new KafkaConfiguration(multiSiteConfig);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java
index 3aa72de..ec03a40 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java
@@ -19,7 +19,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEntry;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedCacheEvictionHandler;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.CacheEvictionEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.CacheEvictionEventRouter;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 0820fbe..24de6b5 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -38,11 +38,13 @@
 import com.google.inject.Key;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.Module;
 import com.googlesource.gerrit.plugins.multisite.NoteDbStatus;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
+import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -98,15 +100,24 @@
     private final Module multiSiteModule;
 
     @Inject
-    public KafkaTestContainerModule(SitePaths sitePaths, NoteDbStatus noteDb) {
+    public KafkaTestContainerModule(SitePaths sitePaths, NoteDbStatus noteDb) throws IOException {
       this.config =
           new FileBasedConfig(
               sitePaths.etc_dir.resolve(Configuration.MULTI_SITE_CONFIG).toFile(), FS.DETECTED);
+      config.setBoolean("kafka", "publisher", "enabled", true);
+      config.setBoolean("kafka", "subscriber", "enabled", true);
+      config.setBoolean("ref-database", null, "enabled", false);
+      config.save();
+
+      Configuration multiSiteConfig = new Configuration(config, new Config());
+      KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(multiSiteConfig);
       this.multiSiteModule =
           new Module(
-              new Configuration(config, new Config()),
+              multiSiteConfig,
               noteDb,
-              new KafkaConfiguration(config),
+              new KafkaForwardedEventRouterModule(
+                  kafkaConfiguration, new KafkaConsumerModule(kafkaConfiguration)),
+              new KafkaBrokerForwarderModule(kafkaConfiguration),
               true);
     }
 
@@ -129,9 +140,6 @@
       kafkaContainer.start();
 
       config.setString("kafka", null, "bootstrapServers", kafkaContainer.getBootstrapServers());
-      config.setBoolean("kafka", "publisher", "enabled", true);
-      config.setBoolean("kafka", "subscriber", "enabled", true);
-      config.setBoolean("ref-database", null, "enabled", false);
       config.save();
 
       return kafkaContainer;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
index aa3d3de..df7c2fc 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
@@ -28,7 +28,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
 import java.util.Optional;
 import org.junit.Assert;
 import org.junit.Before;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java
index 7b3cc74..00a239b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java
@@ -18,7 +18,7 @@
 
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedProjectListUpdateHandler;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.ProjectListUpdateRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
index eeddadf..147f275 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
@@ -22,7 +22,7 @@
 import com.google.gerrit.server.events.CommentAddedEvent;
 import com.google.gerrit.server.util.time.TimeUtil;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;