Use BrokerApi for events consuming

Leverage the new BrokerApi for consuming event through
a generic message broker.

Get rid of the references to the KafkaEventSubscriber class.

Feature: Issue 10829
Change-Id: I36c40de5b03363f06315b89380c4918e0a1ae7a2
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 2e0fb1b..15abfac 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -122,6 +122,7 @@
 
     install(new BrokerModule());
     DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
+    listener().to(KafkaBrokerApi.class);
 
     install(kafkaForwardedEventRouterModule);
     install(kafkaBrokerForwarderModule);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
index d6e6c4c..35350e9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.multisite.broker;
 
 import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import java.util.function.Consumer;
 
 /** API for sending/receiving events through a message Broker. */
@@ -35,5 +36,5 @@
    * @param topic
    * @param eventConsumer
    */
-  void receiveAync(String topic, Consumer<Event> eventConsumer);
+  void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index 6dc9577..e83fe53 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -17,6 +17,7 @@
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import java.util.function.Consumer;
 
 public class BrokerApiWrapper implements BrokerApi {
@@ -45,7 +46,7 @@
   }
 
   @Override
-  public void receiveAync(String topic, Consumer<Event> eventConsumer) {
+  public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
     apiDelegate.get().receiveAync(topic, eventConsumer);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
similarity index 84%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 569a864..6e70299 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.registration.DynamicSet;
@@ -22,19 +22,19 @@
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+
 import java.io.IOException;
 import java.util.UUID;
 
-public abstract class AbstractKafkaSubcriber implements Runnable {
+public abstract class AbstractSubcriber implements Runnable {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
-  private final KafkaEventSubscriber subscriber;
+  private final BrokerApi brokerApi;
   private final ForwardedEventRouter eventRouter;
   private final DynamicSet<DroppedEventListener> droppedEventListeners;
   private final Gson gson;
@@ -42,8 +42,8 @@
   private final MessageLogger msgLog;
   private SubscriberMetrics subscriberMetrics;
 
-  public AbstractKafkaSubcriber(
-      KafkaEventSubscriber subscriber,
+  public AbstractSubcriber(
+      BrokerApi brokerApi,
       ForwardedEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gson,
@@ -56,12 +56,12 @@
     this.instanceId = instanceId;
     this.msgLog = msgLog;
     this.subscriberMetrics = subscriberMetrics;
-    this.subscriber = subscriber;
+    this.brokerApi = brokerApi;
   }
 
   @Override
   public void run() {
-    subscriber.subscribe(getTopic(), this::processRecord);
+    brokerApi.receiveAync(getTopic().topic(), this::processRecord);
   }
 
   protected abstract EventTopic getTopic();
@@ -89,9 +89,4 @@
       }
     }
   }
-
-  // Shutdown hook which can be called from a separate thread
-  public void shutdown() {
-    subscriber.shutdown();
-  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
similarity index 83%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index 26725d8..fc62e6d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gson.Gson;
@@ -20,17 +20,17 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
 import java.util.UUID;
 
 @Singleton
-public class KafkaCacheEvictionEventSubscriber extends AbstractKafkaSubcriber {
+public class CacheEvictionEventSubscriber extends AbstractSubcriber {
   @Inject
-  public KafkaCacheEvictionEventSubscriber(
-      KafkaEventSubscriber subscriber,
+  public CacheEvictionEventSubscriber(
+      BrokerApi brokerApi,
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gsonProvider,
@@ -38,7 +38,7 @@
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics) {
     super(
-        subscriber,
+        brokerApi,
         eventRouter,
         droppedEventListeners,
         gsonProvider,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ConsumerExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ConsumerExecutor.java
similarity index 88%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ConsumerExecutor.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ConsumerExecutor.java
index 10422a7..936d07a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ConsumerExecutor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ConsumerExecutor.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 
@@ -21,4 +21,4 @@
 
 @Retention(RUNTIME)
 @BindingAnnotation
-@interface ConsumerExecutor {}
+public @interface ConsumerExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/DroppedEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
similarity index 84%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/DroppedEventListener.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
index 7de8487..680e8ed 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/DroppedEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
@@ -12,9 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 public interface DroppedEventListener {
   /**
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
similarity index 86%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index 28d068e..c201f65 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gson.Gson;
@@ -20,17 +20,17 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
 import java.util.UUID;
 
 @Singleton
-public class IndexEventSubscriber extends AbstractKafkaSubcriber {
+public class IndexEventSubscriber extends AbstractSubcriber {
   @Inject
   public IndexEventSubscriber(
-      KafkaEventSubscriber subscriber,
+      BrokerApi brokerApi,
       IndexEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gsonProvider,
@@ -38,7 +38,7 @@
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics) {
     super(
-        subscriber,
+        brokerApi,
         eventRouter,
         droppedEventListeners,
         gsonProvider,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/MultiSiteKafkaConsumerRunner.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java
similarity index 73%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/MultiSiteKafkaConsumerRunner.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java
index 5229e54..ddaa3d4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/MultiSiteKafkaConsumerRunner.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java
@@ -12,25 +12,26 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import java.util.concurrent.Executor;
+
+import java.util.concurrent.ExecutorService;
 
 @Singleton
-public class MultiSiteKafkaConsumerRunner implements LifecycleListener {
+public class MultiSiteConsumerRunner implements LifecycleListener {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
-  private final DynamicSet<AbstractKafkaSubcriber> consumers;
-  private final Executor executor;
+  private final DynamicSet<AbstractSubcriber> consumers;
+  private final ExecutorService executor;
 
   @Inject
-  public MultiSiteKafkaConsumerRunner(
-      @ConsumerExecutor Executor executor, DynamicSet<AbstractKafkaSubcriber> consumers) {
+  public MultiSiteConsumerRunner(
+      @ConsumerExecutor ExecutorService executor, DynamicSet<AbstractSubcriber> consumers) {
     this.consumers = consumers;
     this.executor = executor;
   }
@@ -44,6 +45,6 @@
   @Override
   public void stop() {
     logger.atInfo().log("shutting down consumers");
-    this.consumers.forEach(c -> c.shutdown());
+    executor.shutdown();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
similarity index 79%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index e47a515..4157609 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gson.Gson;
@@ -20,17 +20,17 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
 import java.util.UUID;
 
 @Singleton
-public class ProjectUpdateEventSubscriber extends AbstractKafkaSubcriber {
+public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
   @Inject
   public ProjectUpdateEventSubscriber(
-      KafkaEventSubscriber subscriber,
+      BrokerApi brokerApi,
       ProjectListUpdateRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gson,
@@ -38,13 +38,7 @@
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics) {
     super(
-        subscriber,
-        eventRouter,
-        droppedEventListeners,
-        gson,
-        instanceId,
-        msgLog,
-        subscriberMetrics);
+        brokerApi, eventRouter, droppedEventListeners, gson, instanceId, msgLog, subscriberMetrics);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
similarity index 79%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index f07d969..af0f3e7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gson.Gson;
@@ -20,17 +20,17 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
 import java.util.UUID;
 
 @Singleton
-public class StreamEventSubscriber extends AbstractKafkaSubcriber {
+public class StreamEventSubscriber extends AbstractSubcriber {
   @Inject
   public StreamEventSubscriber(
-      KafkaEventSubscriber subscriber,
+      BrokerApi brokerApi,
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gson,
@@ -38,13 +38,7 @@
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics) {
     super(
-        subscriber,
-        eventRouter,
-        droppedEventListeners,
-        gson,
-        instanceId,
-        msgLog,
-        subscriberMetrics);
+        brokerApi, eventRouter, droppedEventListeners, gson, instanceId, msgLog, subscriberMetrics);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
index d9b5d10..ff23b78 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
@@ -14,20 +14,31 @@
 
 package com.googlesource.gerrit.plugins.multisite.kafka;
 
-import com.google.gerrit.extensions.restapi.NotImplementedException;
+import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventSubscriber;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.function.Consumer;
 
-public class KafkaBrokerApi implements BrokerApi {
+public class KafkaBrokerApi implements BrokerApi, LifecycleListener {
 
   private final BrokerPublisher publisher;
+  private final Provider<KafkaEventSubscriber> subscriberProvider;
+  private List<KafkaEventSubscriber> subscribers;
 
   @Inject
-  public KafkaBrokerApi(BrokerPublisher publisher) {
+  public KafkaBrokerApi(
+      BrokerPublisher publisher, Provider<KafkaEventSubscriber> subscriberProvider) {
     this.publisher = publisher;
+    this.subscriberProvider = subscriberProvider;
+    subscribers = new ArrayList<>();
   }
 
   @Override
@@ -36,7 +47,21 @@
   }
 
   @Override
-  public void receiveAync(String topic, Consumer<Event> eventConsumer) {
-    throw new NotImplementedException();
+  public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
+    KafkaEventSubscriber subscriber = subscriberProvider.get();
+    synchronized (subscribers) {
+      subscribers.add(subscriber);
+    }
+    subscriber.subscribe(EventTopic.of(topic), eventConsumer);
+  }
+
+  @Override
+  public void start() {}
+
+  @Override
+  public void stop() {
+    for (KafkaEventSubscriber subscriber : subscribers) {
+      subscriber.shutdown();
+    }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
index b35231e..911361c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
@@ -18,11 +18,19 @@
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.Inject;
 import com.google.inject.TypeLiteral;
+import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.ConsumerExecutor;
+import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
+import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.MultiSiteConsumerRunner;
+import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
 import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -43,26 +51,24 @@
     bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
         .to(KafkaEventDeserializer.class);
 
-    bind(Executor.class)
+    bind(ExecutorService.class)
         .annotatedWith(ConsumerExecutor.class)
         .toInstance(Executors.newFixedThreadPool(EventTopic.values().length));
-    listener().to(MultiSiteKafkaConsumerRunner.class);
+    listener().to(MultiSiteConsumerRunner.class);
 
-    DynamicSet.setOf(binder(), AbstractKafkaSubcriber.class);
+    DynamicSet.setOf(binder(), AbstractSubcriber.class);
 
     if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(IndexEventSubscriber.class);
+      DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
     }
     if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(StreamEventSubscriber.class);
+      DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
     }
     if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
-          .to(KafkaCacheEvictionEventSubscriber.class);
+      DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
     }
     if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
-      DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
-          .to(ProjectUpdateEventSubscriber.class);
+      DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
     }
 
     DynamicSet.setOf(binder(), DroppedEventListener.class);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 4606fdb..b69f9e6 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -42,6 +42,7 @@
 import com.googlesource.gerrit.plugins.multisite.NoteDbStatus;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
 import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;