Move subscribers related code out of kafka module

AbstractSubscriber and subscribers thread pool setup is not related
to kafka.

Feature: Issue 10829
Change-Id: I0da37268bdbaae5264d49b37ff08c211e8ed3ec7
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
index 7cb59c2..f1ab1f6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -17,6 +17,7 @@
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.inject.AbstractModule;
 import com.google.inject.Scopes;
+import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
 
 public class BrokerModule extends AbstractModule {
 
@@ -25,5 +26,7 @@
     DynamicItem.itemOf(binder(), BrokerApi.class);
 
     bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
+
+    install(new SubscriberModule());
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
new file mode 100644
index 0000000..940ad1f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
@@ -0,0 +1,36 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class SubscriberModule extends LifecycleModule {
+
+  @Override
+  protected void configure() {
+    MultiSiteEvent.registerEventTypes();
+    bind(ExecutorService.class)
+        .annotatedWith(ConsumerExecutor.class)
+        .toInstance(Executors.newFixedThreadPool(EventTopic.values().length));
+    listener().to(MultiSiteConsumerRunner.class);
+
+    DynamicSet.setOf(binder(), AbstractSubcriber.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
index 911361c..b7bcabd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
@@ -20,18 +20,13 @@
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
 import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.ConsumerExecutor;
 import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
 import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.MultiSiteConsumerRunner;
 import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
 import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 
@@ -46,18 +41,11 @@
 
   @Override
   protected void configure() {
-    MultiSiteEvent.registerEventTypes();
+
     bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
     bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
         .to(KafkaEventDeserializer.class);
 
-    bind(ExecutorService.class)
-        .annotatedWith(ConsumerExecutor.class)
-        .toInstance(Executors.newFixedThreadPool(EventTopic.values().length));
-    listener().to(MultiSiteConsumerRunner.class);
-
-    DynamicSet.setOf(binder(), AbstractSubcriber.class);
-
     if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
       DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
     }