Merge Kafka-related modules into single KafkaBrokerModule

Merge all Kafka-related modules in preparation to moving it
to a separate plugin.

Feature: Issue 10829
Change-Id: I4a06c4d7247512563219ff3a5edeba312888b535
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index 637ceef..d445251 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -20,9 +20,8 @@
 import com.google.inject.Inject;
 import com.google.inject.Scopes;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
 
 public class PluginModule extends LifecycleModule {
@@ -30,19 +29,16 @@
 
   private Configuration config;
   private ZkValidationModule zkValidationModule;
-  private KafkaForwardedEventRouterModule kafkaForwardedEventRouterModule;
-  private KafkaBrokerForwarderModule kafkaBrokerForwarderModule;
+  private KafkaBrokerModule kafkaBrokerModule;
 
   @Inject
   public PluginModule(
       Configuration config,
       ZkValidationModule zkValidationModule,
-      KafkaForwardedEventRouterModule forwardedEeventRouterModule,
-      KafkaBrokerForwarderModule brokerForwarderModule) {
+      KafkaBrokerModule kafkaBrokerModule) {
     this.config = config;
     this.zkValidationModule = zkValidationModule;
-    this.kafkaForwardedEventRouterModule = forwardedEeventRouterModule;
-    this.kafkaBrokerForwarderModule = brokerForwarderModule;
+    this.kafkaBrokerModule = kafkaBrokerModule;
   }
 
   @Override
@@ -55,7 +51,6 @@
     DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
     listener().to(KafkaBrokerApi.class);
 
-    install(kafkaForwardedEventRouterModule);
-    install(kafkaBrokerForwarderModule);
+    install(kafkaBrokerModule);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
deleted file mode 100644
index a49aa8b..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
+++ /dev/null
@@ -1,62 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker.kafka;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerCacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-
-public class KafkaBrokerForwarderModule extends LifecycleModule {
-  private final KafkaConfiguration config;
-
-  @Inject
-  public KafkaBrokerForwarderModule(KafkaConfiguration config) {
-    this.config = config;
-  }
-
-  @Override
-  protected void configure() {
-    if (config.kafkaPublisher().enabled()) {
-      listener().to(BrokerPublisher.class);
-      bind(BrokerSession.class).to(KafkaSession.class);
-
-      if (config.kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC)) {
-        DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
-      }
-      if (config.kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC)) {
-        DynamicSet.bind(binder(), CacheEvictionForwarder.class)
-            .to(BrokerCacheEvictionForwarder.class);
-      }
-      if (config.kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
-        DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
-            .to(BrokerProjectListUpdateForwarder.class);
-      }
-      if (config.kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
-        DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
-      }
-    }
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
new file mode 100644
index 0000000..be57b18
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
@@ -0,0 +1,95 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Inject;
+import com.google.inject.TypeLiteral;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaSession;
+import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
+import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerCacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventDeserializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+
+public class KafkaBrokerModule extends LifecycleModule {
+  private KafkaConfiguration config;
+
+  @Inject
+  public KafkaBrokerModule(KafkaConfiguration config) {
+    this.config = config;
+  }
+
+  @Override
+  protected void configure() {
+    if (config.kafkaSubscriber().enabled()) {
+      bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
+      bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
+          .to(KafkaEventDeserializer.class);
+
+      if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
+        DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
+      }
+      if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
+        DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
+      }
+      if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
+        DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
+      }
+      if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
+        DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
+      }
+
+      DynamicSet.setOf(binder(), DroppedEventListener.class);
+    }
+
+    if (config.kafkaPublisher().enabled()) {
+      listener().to(BrokerPublisher.class);
+      bind(BrokerSession.class).to(KafkaSession.class);
+
+      if (config.kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC)) {
+        DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
+      }
+      if (config.kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC)) {
+        DynamicSet.bind(binder(), CacheEvictionForwarder.class)
+            .to(BrokerCacheEvictionForwarder.class);
+      }
+      if (config.kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
+        DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
+            .to(BrokerProjectListUpdateForwarder.class);
+      }
+      if (config.kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
+        DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
+      }
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
deleted file mode 100644
index b7bcabd..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.inject.Inject;
-import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
-import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.Deserializer;
-
-public class KafkaConsumerModule extends LifecycleModule {
-
-  private KafkaConfiguration config;
-
-  @Inject
-  public KafkaConsumerModule(KafkaConfiguration config) {
-    this.config = config;
-  }
-
-  @Override
-  protected void configure() {
-
-    bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
-    bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
-        .to(KafkaEventDeserializer.class);
-
-    if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
-    }
-    if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
-    }
-    if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
-    }
-    if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
-    }
-
-    DynamicSet.setOf(binder(), DroppedEventListener.class);
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
deleted file mode 100644
index 2e05de8..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaConsumerModule;
-
-public class KafkaForwardedEventRouterModule extends AbstractModule {
-  private KafkaConfiguration kafkaConfig;
-  private KafkaConsumerModule kafkaConsumerModule;
-
-  @Inject
-  public KafkaForwardedEventRouterModule(
-      KafkaConfiguration config, KafkaConsumerModule kafkaConsumerModule) {
-    this.kafkaConfig = config;
-    this.kafkaConsumerModule = kafkaConsumerModule;
-  }
-
-  @Override
-  protected void configure() {
-    if (kafkaConfig.kafkaSubscriber().enabled()) {
-      install(kafkaConsumerModule);
-    }
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 9596946..cbd192d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -47,13 +47,12 @@
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
 import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -136,10 +135,7 @@
           new PluginModule(
               multiSiteConfig,
               new ZkValidationModule(multiSiteConfig),
-              new KafkaForwardedEventRouterModule(
-                  new KafkaConfiguration(multiSiteConfig),
-                  new KafkaConsumerModule(new KafkaConfiguration(multiSiteConfig))),
-              new KafkaBrokerForwarderModule(new KafkaConfiguration(multiSiteConfig)));
+              new KafkaBrokerModule(new KafkaConfiguration(multiSiteConfig)));
     }
 
     @Override