Merge branch 'stable-3.0'

* stable-3.0:
  Extra logging when local ref-db is misaligned with shared-db
  Kafka module for multi-site

Change-Id: I49790e42bd331f6d5d843a138c8ad3709b721a2d
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 7348137..ca01471 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -45,19 +45,19 @@
 
   public static final String PLUGIN_NAME = "multi-site";
   public static final String MULTI_SITE_CONFIG = PLUGIN_NAME + ".config";
-  public static final String REPLICATION_CONFIG = "replication.config";
 
   static final String INSTANCE_ID_FILE = "instanceId.data";
-
-  // common parameters to cache and index sections
   static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
-  static final int DEFAULT_INDEX_MAX_TRIES = 2;
-  static final int DEFAULT_INDEX_RETRY_INTERVAL = 30000;
   static final int DEFAULT_THREAD_POOL_SIZE = 4;
-  static final String NUM_STRIPED_LOCKS = "numStripedLocks";
-  static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
   static final String ENABLE_KEY = "enabled";
 
+  private static final String REPLICATION_CONFIG = "replication.config";
+  // common parameters to cache and index sections
+  private static final int DEFAULT_INDEX_MAX_TRIES = 2;
+  private static final int DEFAULT_INDEX_RETRY_INTERVAL = 30000;
+  private static final String NUM_STRIPED_LOCKS = "numStripedLocks";
+  private static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
+
   private final Supplier<Cache> cache;
   private final Supplier<Event> event;
   private final Supplier<Index> index;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index b8256a9..1432c65 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -24,13 +24,12 @@
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import com.google.inject.spi.Message;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
 import com.googlesource.gerrit.plugins.multisite.event.EventModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaConsumerModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.ForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectDeletedSharedDbCleanup;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
@@ -46,12 +45,16 @@
 public class Module extends LifecycleModule {
   private static final Logger log = LoggerFactory.getLogger(Module.class);
   private Configuration config;
+  private KafkaForwardedEventRouterModule kafkaForwardedEventRouterModule;
+  private KafkaBrokerForwarderModule kafkaBrokerForwarderModule;
   private final boolean disableGitRepositoryValidation;
-  private KafkaConfiguration kafkaConfig;
 
   @Inject
-  public Module(Configuration config, KafkaConfiguration kafkaConfig) {
-    this(config, kafkaConfig, false);
+  public Module(
+      Configuration config,
+      KafkaForwardedEventRouterModule forwardedEeventRouterModule,
+      KafkaBrokerForwarderModule brokerForwarderModule) {
+    this(config, forwardedEeventRouterModule, brokerForwarderModule, false);
   }
 
   // TODO: It is not possible to properly test the libModules in Gerrit.
@@ -61,15 +64,20 @@
   @VisibleForTesting
   public Module(
       Configuration config,
-      KafkaConfiguration kafkaConfig,
+      KafkaForwardedEventRouterModule forwardedEeventRouterModule,
+      KafkaBrokerForwarderModule brokerForwarderModule,
       boolean disableGitRepositoryValidation) {
-    init(config, kafkaConfig);
+    init(config, forwardedEeventRouterModule, brokerForwarderModule);
     this.disableGitRepositoryValidation = disableGitRepositoryValidation;
   }
 
-  private void init(Configuration config, KafkaConfiguration kafkaConfig) {
+  private void init(
+      Configuration config,
+      KafkaForwardedEventRouterModule forwardedEeventRouterModule,
+      KafkaBrokerForwarderModule brokerForwarderModule) {
     this.config = config;
-    this.kafkaConfig = kafkaConfig;
+    this.kafkaForwardedEventRouterModule = forwardedEeventRouterModule;
+    this.kafkaBrokerForwarderModule = brokerForwarderModule;
   }
 
   @Override
@@ -95,13 +103,9 @@
       install(new IndexModule());
     }
 
-    if (kafkaConfig.kafkaSubscriber().enabled()) {
-      install(new KafkaConsumerModule(kafkaConfig.kafkaSubscriber()));
-      install(new ForwardedEventRouterModule());
-    }
-    if (kafkaConfig.kafkaPublisher().enabled()) {
-      install(new BrokerForwarderModule(kafkaConfig.kafkaPublisher()));
-    }
+    install(kafkaForwardedEventRouterModule);
+
+    install(kafkaBrokerForwarderModule);
 
     if (config.getSharedRefDb().isEnabled()) {
       DynamicSet.bind(binder(), ProjectDeletedListener.class)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
new file mode 100644
index 0000000..62bdcb0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
@@ -0,0 +1,63 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker.kafka;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerCacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
+
+public class KafkaBrokerForwarderModule extends LifecycleModule {
+  private final KafkaConfiguration config;
+
+  @Inject
+  public KafkaBrokerForwarderModule(KafkaConfiguration config) {
+    this.config = config;
+  }
+
+  @Override
+  protected void configure() {
+    if (config.kafkaPublisher().enabled()) {
+      listener().to(BrokerPublisher.class);
+      bind(BrokerSession.class).to(KafkaSession.class);
+
+      if (config.kafkaPublisher().enabledEvent(EventFamily.INDEX_EVENT)) {
+        DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
+      }
+      if (config.kafkaPublisher().enabledEvent(EventFamily.CACHE_EVENT)) {
+        DynamicSet.bind(binder(), CacheEvictionForwarder.class)
+            .to(BrokerCacheEvictionForwarder.class);
+      }
+      if (config.kafkaPublisher().enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
+        DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
+            .to(BrokerProjectListUpdateForwarder.class);
+      }
+      if (config.kafkaPublisher().enabledEvent(EventFamily.STREAM_EVENT)) {
+        DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
+      }
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
index 7c83346..aaf329a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
@@ -16,9 +16,9 @@
 
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
deleted file mode 100644
index b7ee20b..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaPublisher;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-
-public class BrokerForwarderModule extends LifecycleModule {
-  private final KafkaPublisher kafkaPublisher;
-
-  public BrokerForwarderModule(KafkaPublisher kafkaPublisher) {
-    this.kafkaPublisher = kafkaPublisher;
-  }
-
-  @Override
-  protected void configure() {
-    listener().to(BrokerPublisher.class);
-    bind(BrokerSession.class).to(KafkaSession.class);
-
-    if (kafkaPublisher.enabledEvent(EventFamily.INDEX_EVENT)) {
-      DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
-    }
-    if (kafkaPublisher.enabledEvent(EventFamily.CACHE_EVENT)) {
-      DynamicSet.bind(binder(), CacheEvictionForwarder.class)
-          .to(BrokerCacheEvictionForwarder.class);
-    }
-    if (kafkaPublisher.enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
-      DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
-          .to(BrokerProjectListUpdateForwarder.class);
-    }
-    if (kafkaPublisher.enabledEvent(EventFamily.STREAM_EVENT)) {
-      DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
-    }
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
index 808245f..cb05a4e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
@@ -23,7 +23,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
 
 @Singleton
-class BrokerProjectListUpdateForwarder implements ProjectListUpdateForwarder {
+public class BrokerProjectListUpdateForwarder implements ProjectListUpdateForwarder {
   private final BrokerPublisher publisher;
 
   @Inject
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/CacheEvictionEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java
similarity index 96%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/CacheEvictionEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java
index 53afc40..eaf1ff7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/CacheEvictionEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedCacheEvictionEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedCacheEvictionEventRouter.java
similarity index 91%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedCacheEvictionEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedCacheEvictionEventRouter.java
index 7f740d5..79ab0b9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedCacheEvictionEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedCacheEvictionEventRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedEventRouter.java
similarity index 93%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedEventRouter.java
index f7de8cc..f9ad0c9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedEventRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedIndexEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedIndexEventRouter.java
similarity index 91%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedIndexEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedIndexEventRouter.java
index 59dcaa4..a843fcc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedIndexEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedIndexEventRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedProjectListUpdateRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedProjectListUpdateRouter.java
similarity index 92%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedProjectListUpdateRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedProjectListUpdateRouter.java
index 1df3129..37cc3de 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedProjectListUpdateRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedProjectListUpdateRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedStreamEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedStreamEventRouter.java
similarity index 91%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedStreamEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedStreamEventRouter.java
index c96ab1a..0e50ea9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedStreamEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedStreamEventRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.google.gerrit.server.events.Event;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/IndexEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
similarity index 98%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/IndexEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
index 958312a..1e21ae2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/IndexEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import static com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler.Operation.DELETE;
 import static com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler.Operation.INDEX;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ProjectListUpdateRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ProjectListUpdateRouter.java
similarity index 95%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ProjectListUpdateRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ProjectListUpdateRouter.java
index 2cce12c..2248c4c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ProjectListUpdateRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ProjectListUpdateRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/StreamEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
similarity index 94%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/StreamEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
index 39d96bc..a330416 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/StreamEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.permissions.PermissionBackendException;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
similarity index 91%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
index b88699a..ea16bc3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
@@ -12,19 +12,18 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite;
+package com.googlesource.gerrit.plugins.multisite.kafka;
 
 import static com.google.common.base.Suppliers.memoize;
 import static com.google.common.base.Suppliers.ofInstance;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
-import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import java.io.IOException;
 import java.util.HashMap;
@@ -35,7 +34,6 @@
 import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.util.FS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,11 +41,11 @@
 public class KafkaConfiguration {
 
   private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
-  public static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
+  static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
   static final String KAFKA_SECTION = "kafka";
   static final String ENABLE_KEY = "enabled";
-  static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
-  static final boolean DEFAULT_ENABLE_PROCESSING = true;
+  private static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
+  private static final boolean DEFAULT_ENABLE_PROCESSING = true;
   private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
 
   private final Supplier<KafkaSubscriber> subscriber;
@@ -55,22 +53,13 @@
   private final Supplier<KafkaPublisher> publisher;
 
   @Inject
-  KafkaConfiguration(SitePaths sitePaths) {
-    this(getConfigFile(sitePaths, Configuration.MULTI_SITE_CONFIG));
-  }
-
-  @VisibleForTesting
-  public KafkaConfiguration(Config kafkaConfig) {
-    Supplier<Config> lazyCfg = lazyLoad(kafkaConfig);
+  public KafkaConfiguration(Configuration configuration) {
+    Supplier<Config> lazyCfg = lazyLoad(configuration.getMultiSiteConfig());
     kafka = memoize(() -> new Kafka(lazyCfg));
     publisher = memoize(() -> new KafkaPublisher(lazyCfg));
     subscriber = memoize(() -> new KafkaSubscriber(lazyCfg));
   }
 
-  private static FileBasedConfig getConfigFile(SitePaths sitePaths, String configFileName) {
-    return new FileBasedConfig(sitePaths.etc_dir.resolve(configFileName).toFile(), FS.DETECTED);
-  }
-
   public Kafka getKafka() {
     return kafka.get();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index e4d3fff..25d1b6d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -25,11 +25,11 @@
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
index 589afce..c8b3b55 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
@@ -21,10 +21,10 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
 import org.apache.kafka.common.serialization.Deserializer;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
similarity index 87%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
index a5d4c59..bfc0e8d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
@@ -21,17 +21,17 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
 import org.apache.kafka.common.serialization.Deserializer;
 
 @Singleton
-public class CacheEvictionEventSubscriber extends AbstractKafkaSubcriber {
+public class KafkaCacheEvictionEventSubscriber extends AbstractKafkaSubcriber {
   @Inject
-  public CacheEvictionEventSubscriber(
+  public KafkaCacheEvictionEventSubscriber(
       KafkaConfiguration configuration,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
index 3e0e5d7..cc8a849 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
@@ -16,10 +16,11 @@
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Inject;
 import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaSubscriber;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -27,10 +28,11 @@
 
 public class KafkaConsumerModule extends LifecycleModule {
 
-  private final KafkaSubscriber kafkaSubscriber;
+  private KafkaConfiguration config;
 
-  public KafkaConsumerModule(KafkaSubscriber kafkaSubscriber) {
-    this.kafkaSubscriber = kafkaSubscriber;
+  @Inject
+  public KafkaConsumerModule(KafkaConfiguration config) {
+    this.config = config;
   }
 
   @Override
@@ -47,17 +49,17 @@
 
     DynamicSet.setOf(binder(), AbstractKafkaSubcriber.class);
 
-    if (kafkaSubscriber.enabledEvent(EventFamily.INDEX_EVENT)) {
+    if (config.kafkaSubscriber().enabledEvent(EventFamily.INDEX_EVENT)) {
       DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(IndexEventSubscriber.class);
     }
-    if (kafkaSubscriber.enabledEvent(EventFamily.STREAM_EVENT)) {
+    if (config.kafkaSubscriber().enabledEvent(EventFamily.STREAM_EVENT)) {
       DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(StreamEventSubscriber.class);
     }
-    if (kafkaSubscriber.enabledEvent(EventFamily.CACHE_EVENT)) {
+    if (config.kafkaSubscriber().enabledEvent(EventFamily.CACHE_EVENT)) {
       DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
-          .to(CacheEvictionEventSubscriber.class);
+          .to(KafkaCacheEvictionEventSubscriber.class);
     }
-    if (kafkaSubscriber.enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
+    if (config.kafkaSubscriber().enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
       DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
           .to(ProjectUpdateEventSubscriber.class);
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
index 0845595..351d8ab 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
@@ -21,10 +21,10 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.ProjectListUpdateRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
 import org.apache.kafka.common.serialization.Deserializer;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
index 2a1e155..a15c7cd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
@@ -21,10 +21,10 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
 import org.apache.kafka.common.serialization.Deserializer;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouterModule.java
deleted file mode 100644
index dd11180..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouterModule.java
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
-
-import com.google.inject.AbstractModule;
-
-public class ForwardedEventRouterModule extends AbstractModule {
-  @Override
-  protected void configure() {
-    bind(ForwardedIndexEventRouter.class).to(IndexEventRouter.class);
-    bind(ForwardedCacheEvictionEventRouter.class).to(CacheEvictionEventRouter.class);
-    bind(ForwardedProjectListUpdateRouter.class).to(ProjectListUpdateRouter.class);
-    bind(ForwardedStreamEventRouter.class).to(StreamEventRouter.class);
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
new file mode 100644
index 0000000..a6422ae
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
@@ -0,0 +1,52 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka.router;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.CacheEvictionEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedCacheEvictionEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedIndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedProjectListUpdateRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedStreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
+import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaConsumerModule;
+
+public class KafkaForwardedEventRouterModule extends AbstractModule {
+  private KafkaConfiguration kafkaConfig;
+  private KafkaConsumerModule kafkaConsumerModule;
+
+  @Inject
+  public KafkaForwardedEventRouterModule(
+      KafkaConfiguration config, KafkaConsumerModule kafkaConsumerModule) {
+    this.kafkaConfig = config;
+    this.kafkaConsumerModule = kafkaConsumerModule;
+  }
+
+  @Override
+  protected void configure() {
+    if (kafkaConfig.kafkaSubscriber().enabled()) {
+      bind(ForwardedIndexEventRouter.class).to(IndexEventRouter.class);
+      bind(ForwardedCacheEvictionEventRouter.class).to(CacheEvictionEventRouter.class);
+      bind(ForwardedProjectListUpdateRouter.class).to(ProjectListUpdateRouter.class);
+      bind(ForwardedStreamEventRouter.class).to(StreamEventRouter.class);
+
+      install(kafkaConsumerModule);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
index afd3766..efbf390 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
@@ -58,10 +58,22 @@
 
       // Assuming this is a delete node NULL_REF
       if (valueInZk == null) {
+        logger.atInfo().log(
+            "%s:%s not found in Zookeeper, assumed as delete node NULL_REF",
+            project, ref.getName());
         return false;
       }
 
-      return readObjectId(valueInZk).equals(ref.getObjectId());
+      ObjectId objectIdInSharedRefDb = readObjectId(valueInZk);
+      Boolean isUpToDate = objectIdInSharedRefDb.equals(ref.getObjectId());
+
+      if (!isUpToDate) {
+        logger.atWarning().log(
+            "%s:%s is out of sync: local=%s zk=%s",
+            project, ref.getName(), ref.getObjectId(), objectIdInSharedRefDb);
+      }
+
+      return isUpToDate;
     } catch (Exception e) {
       throw new SharedLockException(project, ref.getName(), e);
     }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
index 556917b..59df7bb 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
@@ -17,6 +17,8 @@
 import static com.google.common.truth.Truth.assertThat;
 
 import com.google.gerrit.server.config.SitePaths;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
 import java.io.File;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -36,16 +38,16 @@
   @Mock(answer = Answers.RETURNS_DEEP_STUBS)
   private Configuration configMock;
 
-  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
-  private KafkaConfiguration kafkaConfig;
+  @Mock private KafkaForwardedEventRouterModule routerModule;
+  @Mock private KafkaBrokerForwarderModule brokerForwarderModule;
 
   @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
 
   private Module module;
 
   @Before
-  public void setUp() {
-    module = new Module(configMock, kafkaConfig);
+  public void setup() {
+    module = new Module(configMock, routerModule, brokerForwarderModule);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
similarity index 83%
rename from src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java
rename to src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
index ab0d3bc..ba423ff 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
@@ -12,15 +12,16 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite;
+package com.googlesource.gerrit.plugins.multisite.kafka;
 
 import static com.google.common.truth.Truth.assertThat;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.ENABLE_KEY;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_PROPERTY_PREFIX;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_SECTION;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
+import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.ENABLE_KEY;
+import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KAFKA_PROPERTY_PREFIX;
+import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KAFKA_SECTION;
+import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
+import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
 
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import org.eclipse.jgit.lib.Config;
 import org.junit.Before;
 import org.junit.Test;
@@ -31,14 +32,16 @@
 public class KafkaConfigurationTest {
 
   private Config globalPluginConfig;
+  private Configuration multiSiteConfig;
 
   @Before
-  public void setUp() {
+  public void setup() {
     globalPluginConfig = new Config();
+    multiSiteConfig = new Configuration(globalPluginConfig, new Config());
   }
 
   private KafkaConfiguration getConfiguration() {
-    return new KafkaConfiguration(globalPluginConfig);
+    return new KafkaConfiguration(multiSiteConfig);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java
index ad09323..a19134d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java
@@ -21,7 +21,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedCacheEvictionHandler;
 import com.googlesource.gerrit.plugins.multisite.forwarder.GsonParser;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.CacheEvictionEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.CacheEvictionEventRouter;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 60381ae..aa95fed 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -39,9 +39,11 @@
 import com.google.inject.Key;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.Module;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
+import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -92,12 +94,24 @@
     private final Module multiSiteModule;
 
     @Inject
-    public KafkaTestContainerModule(SitePaths sitePaths) {
+    public KafkaTestContainerModule(SitePaths sitePaths) throws IOException {
       this.config =
           new FileBasedConfig(
               sitePaths.etc_dir.resolve(Configuration.MULTI_SITE_CONFIG).toFile(), FS.DETECTED);
+      config.setBoolean("kafka", "publisher", "enabled", true);
+      config.setBoolean("kafka", "subscriber", "enabled", true);
+      config.setBoolean("ref-database", null, "enabled", false);
+      config.save();
+
+      Configuration multiSiteConfig = new Configuration(config, new Config());
+      KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(multiSiteConfig);
       this.multiSiteModule =
-          new Module(new Configuration(config, new Config()), new KafkaConfiguration(config), true);
+          new Module(
+              multiSiteConfig,
+              new KafkaForwardedEventRouterModule(
+                  kafkaConfiguration, new KafkaConsumerModule(kafkaConfiguration)),
+              new KafkaBrokerForwarderModule(kafkaConfiguration),
+              true);
     }
 
     @Override
@@ -119,9 +133,6 @@
       kafkaContainer.start();
 
       config.setString("kafka", null, "bootstrapServers", kafkaContainer.getBootstrapServers());
-      config.setBoolean("kafka", "publisher", "enabled", true);
-      config.setBoolean("kafka", "subscriber", "enabled", true);
-      config.setBoolean("ref-database", null, "enabled", false);
       config.save();
       Configuration multiSiteConfig = new Configuration(config, new Config());
       bind(Configuration.class).toInstance(multiSiteConfig);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
index 750f466..e2cf5ef 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
@@ -29,7 +29,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
 import java.util.Optional;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java
index 7b3cc74..00a239b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java
@@ -18,7 +18,7 @@
 
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedProjectListUpdateHandler;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.ProjectListUpdateRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
index 0caccd5..ef6c04f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
@@ -22,7 +22,7 @@
 import com.google.gerrit.server.events.CommentAddedEvent;
 import com.google.gerrit.server.util.time.TimeUtil;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;