Merge branch 'stable-3.0'

* stable-3.0:
  Move routers configuration out of kafka domain
  Move subscribers related code out of kafka module
  Use BrokerApi for events consuming
  Move SourceAwareEventWrapper out of the kafka package
  Move Kafka consumer logic to a separate class
  Split event consumer logic into separate steps
  Move broker publish metrics to the BrokerApiWrapper
  Introduce DynamicItem<BrokerApi> and its BrokerApiWrapper
  Move BrokerPublisher into the Kafka package domain
  Rename EventFamily to EventTopic
  Use BrokerApi for sending stream events
  Use BrokerApi for sending project list updates
  Use BrokerApi for sending index events
  Introduce the abstract BrokerApi
  Message subscriber metric for number of message consumed
  Skip message logging when publishing failed
  ForwardedIndexChangeHandler: Suppress FutureReturnValueIgnored warning

Leverage the MultiSiteMetrics hierarchy in SubscriberMetrics for
making easier to create new metrics.

Change-Id: Icb1531ad958fed1b9a4e9396ad5d4d7379c8d0f6
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
index 3070c2a..db7dd63 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
@@ -19,7 +19,7 @@
 import com.google.gerrit.server.util.SystemLog;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import org.apache.log4j.PatternLayout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
index 8c8d949..8b07115 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 
 public interface MessageLogger {
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 1ac194d..30c188a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -25,11 +25,15 @@
 import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.google.inject.spi.Message;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
 import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
 import com.googlesource.gerrit.plugins.multisite.event.EventModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
 import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
 import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectDeletedSharedDbCleanup;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.NoopSharedRefDatabase;
@@ -90,8 +94,13 @@
       install(new IndexModule());
     }
 
-    install(kafkaForwardedEventRouterModule);
+    install(new BrokerModule());
+    DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
+    listener().to(KafkaBrokerApi.class);
 
+    install(new RouterModule());
+
+    install(kafkaForwardedEventRouterModule);
     install(kafkaBrokerForwarderModule);
 
     if (config.getSharedRefDb().isEnabled()) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
new file mode 100644
index 0000000..35350e9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
@@ -0,0 +1,40 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import java.util.function.Consumer;
+
+/** API for sending/receiving events through a message Broker. */
+public interface BrokerApi {
+
+  /**
+   * Send an event to a topic.
+   *
+   * @param topic
+   * @param event
+   * @return true if the event was successfully sent. False otherwise.
+   */
+  boolean send(String topic, Event event);
+
+  /**
+   * Receive asynchronously events from a topic.
+   *
+   * @param topic
+   * @param eventConsumer
+   */
+  void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
new file mode 100644
index 0000000..e83fe53
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -0,0 +1,52 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.events.Event;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import java.util.function.Consumer;
+
+public class BrokerApiWrapper implements BrokerApi {
+  private final DynamicItem<BrokerApi> apiDelegate;
+  private final BrokerMetrics metrics;
+
+  @Inject
+  public BrokerApiWrapper(DynamicItem<BrokerApi> apiDelegate, BrokerMetrics metrics) {
+    this.apiDelegate = apiDelegate;
+    this.metrics = metrics;
+  }
+
+  @Override
+  public boolean send(String topic, Event event) {
+    boolean succeeded = false;
+    try {
+      succeeded = apiDelegate.get().send(topic, event);
+    } finally {
+      if (succeeded) {
+        metrics.incrementBrokerPublishedMessage();
+      } else {
+        metrics.incrementBrokerFailedToPublishMessage();
+      }
+    }
+    return succeeded;
+  }
+
+  @Override
+  public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
+    apiDelegate.get().receiveAync(topic, eventConsumer);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
new file mode 100644
index 0000000..f1ab1f6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.inject.AbstractModule;
+import com.google.inject.Scopes;
+import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
+
+public class BrokerModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    DynamicItem.itemOf(binder(), BrokerApi.class);
+
+    bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
+
+    install(new SubscriberModule());
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerSession.java
index 291071c..b04fbff 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerSession.java
@@ -14,8 +14,6 @@
 
 package com.googlesource.gerrit.plugins.multisite.broker;
 
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-
 public interface BrokerSession {
 
   boolean isOpen();
@@ -24,5 +22,5 @@
 
   void disconnect();
 
-  boolean publishEvent(EventFamily eventFamily, String payload);
+  boolean publish(String topic, String payload);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
similarity index 79%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
index 744a558..743d323 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.broker;
+package com.googlesource.gerrit.plugins.multisite.broker.kafka;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.gerrit.extensions.events.LifecycleListener;
@@ -25,9 +25,9 @@
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.SourceAwareEventWrapper;
 import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,20 +40,17 @@
   private final Gson gson;
   private final UUID instanceId;
   private final MessageLogger msgLog;
-  private final BrokerMetrics brokerMetrics;
 
   @Inject
   public BrokerPublisher(
       BrokerSession session,
       @EventGson Gson gson,
       @InstanceId UUID instanceId,
-      MessageLogger msgLog,
-      BrokerMetrics brokerMetrics) {
+      MessageLogger msgLog) {
     this.session = session;
     this.gson = gson;
     this.instanceId = instanceId;
     this.msgLog = msgLog;
-    this.brokerMetrics = brokerMetrics;
   }
 
   @Override
@@ -70,18 +67,15 @@
     }
   }
 
-  public boolean publishEvent(EventFamily eventType, Event event) {
+  public boolean publish(String topic, Event event) {
     if (Context.isForwardedEvent()) {
       return true;
     }
 
     SourceAwareEventWrapper brokerEvent = toBrokerEvent(event);
-    msgLog.log(Direction.PUBLISH, brokerEvent);
-    Boolean eventPublished = session.publishEvent(eventType, getPayload(brokerEvent));
+    Boolean eventPublished = session.publish(topic, getPayload(brokerEvent));
     if (eventPublished) {
-      brokerMetrics.incrementBrokerPublishedMessage();
-    } else {
-      brokerMetrics.incrementBrokerFailedToPublishMessage();
+      msgLog.log(Direction.PUBLISH, brokerEvent);
     }
     return eventPublished;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
index 62bdcb0..a49aa8b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
@@ -17,7 +17,6 @@
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
@@ -27,7 +26,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 
 public class KafkaBrokerForwarderModule extends LifecycleModule {
@@ -44,18 +43,18 @@
       listener().to(BrokerPublisher.class);
       bind(BrokerSession.class).to(KafkaSession.class);
 
-      if (config.kafkaPublisher().enabledEvent(EventFamily.INDEX_EVENT)) {
+      if (config.kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC)) {
         DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
       }
-      if (config.kafkaPublisher().enabledEvent(EventFamily.CACHE_EVENT)) {
+      if (config.kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC)) {
         DynamicSet.bind(binder(), CacheEvictionForwarder.class)
             .to(BrokerCacheEvictionForwarder.class);
       }
-      if (config.kafkaPublisher().enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
+      if (config.kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
         DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
             .to(BrokerProjectListUpdateForwarder.class);
       }
-      if (config.kafkaPublisher().enabledEvent(EventFamily.STREAM_EVENT)) {
+      if (config.kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
         DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
       }
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
index aaf329a..6594544 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
@@ -17,7 +17,7 @@
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
@@ -81,8 +81,8 @@
   }
 
   @Override
-  public boolean publishEvent(EventFamily eventType, String payload) {
-    return publishToTopic(properties.getKafka().getTopic(eventType), payload);
+  public boolean publish(String topic, String payload) {
+    return publishToTopic(properties.getKafka().getTopicAlias(EventTopic.of(topic)), payload);
   }
 
   private boolean publishToTopic(String topic, String payload) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
new file mode 100644
index 0000000..99c533d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -0,0 +1,90 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventGson;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.gson.Gson;
+import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import java.io.IOException;
+import java.util.UUID;
+
+public abstract class AbstractSubcriber implements Runnable {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final BrokerApi brokerApi;
+  private final ForwardedEventRouter eventRouter;
+  private final DynamicSet<DroppedEventListener> droppedEventListeners;
+  private final Gson gson;
+  private final UUID instanceId;
+  private final MessageLogger msgLog;
+  private SubscriberMetrics subscriberMetrics;
+
+  public AbstractSubcriber(
+      BrokerApi brokerApi,
+      ForwardedEventRouter eventRouter,
+      DynamicSet<DroppedEventListener> droppedEventListeners,
+      @EventGson Gson gson,
+      @InstanceId UUID instanceId,
+      MessageLogger msgLog,
+      SubscriberMetrics subscriberMetrics) {
+    this.eventRouter = eventRouter;
+    this.droppedEventListeners = droppedEventListeners;
+    this.gson = gson;
+    this.instanceId = instanceId;
+    this.msgLog = msgLog;
+    this.subscriberMetrics = subscriberMetrics;
+    this.brokerApi = brokerApi;
+  }
+
+  @Override
+  public void run() {
+    brokerApi.receiveAync(getTopic().topic(), this::processRecord);
+  }
+
+  protected abstract EventTopic getTopic();
+
+  private void processRecord(SourceAwareEventWrapper event) {
+
+    if (event.getHeader().getSourceInstanceId().equals(instanceId)) {
+      logger.atFiner().log(
+          "Dropping event %s produced by our instanceId %s",
+          event.toString(), instanceId.toString());
+      droppedEventListeners.forEach(l -> l.onEventDropped(event));
+    } else {
+      try {
+        msgLog.log(Direction.CONSUME, event);
+        eventRouter.route(event.getEventBody(gson));
+        subscriberMetrics.incrementSubscriberConsumedMessage();
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log(
+            "Malformed event '%s': [Exception: %s]", event.getHeader().getEventType());
+        subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
+      } catch (PermissionBackendException | CacheNotFoundException e) {
+        logger.atSevere().withCause(e).log(
+            "Cannot handle message %s: [Exception: %s]", event.getHeader().getEventType());
+        subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
+      }
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
similarity index 64%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index 9d4222d..fc352da 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventGson;
@@ -22,41 +22,35 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
-import org.apache.kafka.common.serialization.Deserializer;
 
 @Singleton
-public class KafkaCacheEvictionEventSubscriber extends AbstractKafkaSubcriber {
+public class CacheEvictionEventSubscriber extends AbstractSubcriber {
   @Inject
-  public KafkaCacheEvictionEventSubscriber(
-      KafkaConfiguration configuration,
-      KafkaConsumerFactory consumerFactory,
-      Deserializer<byte[]> keyDeserializer,
-      Deserializer<SourceAwareEventWrapper> valueDeserializer,
+  public CacheEvictionEventSubscriber(
+      BrokerApi brokerApi,
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @EventGson Gson gsonProvider,
       @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx,
-      MessageLogger msgLog) {
+      MessageLogger msgLog,
+      SubscriberMetrics subscriberMetrics) {
     super(
-        configuration,
-        consumerFactory,
-        keyDeserializer,
-        valueDeserializer,
+        brokerApi,
         eventRouter,
         droppedEventListeners,
         gsonProvider,
         instanceId,
-        oneOffCtx,
-        msgLog);
+        msgLog,
+        subscriberMetrics);
   }
 
   @Override
-  protected EventFamily getEventFamily() {
-    return EventFamily.CACHE_EVENT;
+  protected EventTopic getTopic() {
+    return EventTopic.CACHE_TOPIC;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ConsumerExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ConsumerExecutor.java
similarity index 88%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ConsumerExecutor.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ConsumerExecutor.java
index 10422a7..936d07a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ConsumerExecutor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ConsumerExecutor.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 
@@ -21,4 +21,4 @@
 
 @Retention(RUNTIME)
 @BindingAnnotation
-@interface ConsumerExecutor {}
+public @interface ConsumerExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/DroppedEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
similarity index 92%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/DroppedEventListener.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
index 418c58b..680e8ed 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/DroppedEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 public interface DroppedEventListener {
   /**
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
similarity index 66%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index b2f6e8b..46a34b5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventGson;
@@ -22,41 +22,35 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
-import org.apache.kafka.common.serialization.Deserializer;
 
 @Singleton
-public class IndexEventSubscriber extends AbstractKafkaSubcriber {
+public class IndexEventSubscriber extends AbstractSubcriber {
   @Inject
   public IndexEventSubscriber(
-      KafkaConfiguration configuration,
-      KafkaConsumerFactory consumerFactory,
-      Deserializer<byte[]> keyDeserializer,
-      Deserializer<SourceAwareEventWrapper> valueDeserializer,
+      BrokerApi brokerApi,
       IndexEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @EventGson Gson gsonProvider,
       @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx,
-      MessageLogger msgLog) {
+      MessageLogger msgLog,
+      SubscriberMetrics subscriberMetrics) {
     super(
-        configuration,
-        consumerFactory,
-        keyDeserializer,
-        valueDeserializer,
+        brokerApi,
         eventRouter,
         droppedEventListeners,
         gsonProvider,
         instanceId,
-        oneOffCtx,
-        msgLog);
+        msgLog,
+        subscriberMetrics);
   }
 
   @Override
-  protected EventFamily getEventFamily() {
-    return EventFamily.INDEX_EVENT;
+  protected EventTopic getTopic() {
+    return EventTopic.INDEX_TOPIC;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/MultiSiteKafkaConsumerRunner.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java
similarity index 73%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/MultiSiteKafkaConsumerRunner.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java
index 5229e54..ddaa3d4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/MultiSiteKafkaConsumerRunner.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java
@@ -12,25 +12,26 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import java.util.concurrent.Executor;
+
+import java.util.concurrent.ExecutorService;
 
 @Singleton
-public class MultiSiteKafkaConsumerRunner implements LifecycleListener {
+public class MultiSiteConsumerRunner implements LifecycleListener {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
-  private final DynamicSet<AbstractKafkaSubcriber> consumers;
-  private final Executor executor;
+  private final DynamicSet<AbstractSubcriber> consumers;
+  private final ExecutorService executor;
 
   @Inject
-  public MultiSiteKafkaConsumerRunner(
-      @ConsumerExecutor Executor executor, DynamicSet<AbstractKafkaSubcriber> consumers) {
+  public MultiSiteConsumerRunner(
+      @ConsumerExecutor ExecutorService executor, DynamicSet<AbstractSubcriber> consumers) {
     this.consumers = consumers;
     this.executor = executor;
   }
@@ -44,6 +45,6 @@
   @Override
   public void stop() {
     logger.atInfo().log("shutting down consumers");
-    this.consumers.forEach(c -> c.shutdown());
+    executor.shutdown();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
similarity index 62%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index d359429..6f22566 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventGson;
@@ -22,41 +22,29 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
-import org.apache.kafka.common.serialization.Deserializer;
 
 @Singleton
-public class ProjectUpdateEventSubscriber extends AbstractKafkaSubcriber {
+public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
   @Inject
   public ProjectUpdateEventSubscriber(
-      KafkaConfiguration configuration,
-      KafkaConsumerFactory consumerFactory,
-      Deserializer<byte[]> keyDeserializer,
-      Deserializer<SourceAwareEventWrapper> valueDeserializer,
+      BrokerApi brokerApi,
       ProjectListUpdateRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @EventGson Gson gson,
       @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx,
-      MessageLogger msgLog) {
+      MessageLogger msgLog,
+      SubscriberMetrics subscriberMetrics) {
     super(
-        configuration,
-        consumerFactory,
-        keyDeserializer,
-        valueDeserializer,
-        eventRouter,
-        droppedEventListeners,
-        gson,
-        instanceId,
-        oneOffCtx,
-        msgLog);
+        brokerApi, eventRouter, droppedEventListeners, gson, instanceId, msgLog, subscriberMetrics);
   }
 
   @Override
-  protected EventFamily getEventFamily() {
-    return EventFamily.PROJECT_LIST_EVENT;
+  protected EventTopic getTopic() {
+    return EventTopic.PROJECT_LIST_TOPIC;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/SourceAwareEventWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SourceAwareEventWrapper.java
similarity index 97%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/SourceAwareEventWrapper.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SourceAwareEventWrapper.java
index cc638c0..b8bd0d8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/SourceAwareEventWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SourceAwareEventWrapper.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import static java.util.Objects.requireNonNull;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
similarity index 62%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index b1c43ba..9cbda49 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventGson;
@@ -22,41 +22,29 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
-import org.apache.kafka.common.serialization.Deserializer;
 
 @Singleton
-public class StreamEventSubscriber extends AbstractKafkaSubcriber {
+public class StreamEventSubscriber extends AbstractSubcriber {
   @Inject
   public StreamEventSubscriber(
-      KafkaConfiguration configuration,
-      KafkaConsumerFactory consumerFactory,
-      Deserializer<byte[]> keyDeserializer,
-      Deserializer<SourceAwareEventWrapper> valueDeserializer,
+      BrokerApi brokerApi,
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @EventGson Gson gson,
       @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx,
-      MessageLogger msgLog) {
+      MessageLogger msgLog,
+      SubscriberMetrics subscriberMetrics) {
     super(
-        configuration,
-        consumerFactory,
-        keyDeserializer,
-        valueDeserializer,
-        eventRouter,
-        droppedEventListeners,
-        gson,
-        instanceId,
-        oneOffCtx,
-        msgLog);
+        brokerApi, eventRouter, droppedEventListeners, gson, instanceId, msgLog, subscriberMetrics);
   }
 
   @Override
-  protected EventFamily getEventFamily() {
-    return EventFamily.STREAM_EVENT;
+  protected EventTopic getTopic() {
+    return EventTopic.STREAM_EVENT_TOPIC;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
new file mode 100644
index 0000000..ef10151
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
@@ -0,0 +1,75 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.MultiSiteMetrics;
+
+@Singleton
+public class SubscriberMetrics extends MultiSiteMetrics {
+  private static final String SUBSCRIBER_SUCCESS_COUNTER = "subscriber_msg_consumer_counter";
+  private static final String SUBSCRIBER_FAILURE_COUNTER =
+      "subscriber_msg_consumer_failure_counter";
+  private static final String SUBSCRIBER_POLL_FAILURE_COUNTER =
+      "subscriber_msg_consumer_poll_failure_counter";
+
+  private final Counter1<String> subscriberSuccessCounter;
+  private final Counter1<String> subscriberFailureCounter;
+  private final Counter1<String> subscriberPollFailureCounter;
+
+  @Inject
+  public SubscriberMetrics(MetricMaker metricMaker) {
+
+    this.subscriberSuccessCounter =
+        metricMaker.newCounter(
+            "multi_site/subscriber/subscriber_message_consumer_counter",
+            new Description("Number of messages consumed by the subscriber")
+                .setRate()
+                .setUnit("messages"),
+            stringField(SUBSCRIBER_SUCCESS_COUNTER, "Subscriber message consumed count"));
+    this.subscriberFailureCounter =
+        metricMaker.newCounter(
+            "multi_site/subscriber/subscriber_message_consumer_failure_counter",
+            new Description("Number of messages failed to consume by the subscriber consumer")
+                .setRate()
+                .setUnit("errors"),
+            stringField(SUBSCRIBER_FAILURE_COUNTER, "Subscriber failed to consume messages count"));
+
+    this.subscriberPollFailureCounter =
+        metricMaker.newCounter(
+            "multi_site/subscriber/subscriber_message_consumer_poll_failure_counter",
+            new Description("Number of failed attempts to poll messages by the subscriber")
+                .setRate()
+                .setUnit("errors"),
+            stringField(
+                SUBSCRIBER_POLL_FAILURE_COUNTER, "Subscriber failed to poll messages count"));
+  }
+
+  public void incrementSubscriberConsumedMessage() {
+    subscriberSuccessCounter.increment(SUBSCRIBER_SUCCESS_COUNTER);
+  }
+
+  public void incrementSubscriberFailedToConsumeMessage() {
+    subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
+  }
+
+  public void incrementSubscriberFailedToPollMessages() {
+    subscriberPollFailureCounter.increment(SUBSCRIBER_POLL_FAILURE_COUNTER);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
new file mode 100644
index 0000000..940ad1f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
@@ -0,0 +1,36 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class SubscriberModule extends LifecycleModule {
+
+  @Override
+  protected void configure() {
+    MultiSiteEvent.registerEventTypes();
+    bind(ExecutorService.class)
+        .annotatedWith(ConsumerExecutor.class)
+        .toInstance(Executors.newFixedThreadPool(EventTopic.values().length));
+    listener().to(MultiSiteConsumerRunner.class);
+
+    DynamicSet.setOf(binder(), AbstractSubcriber.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
index 36d2e75..5678d2c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
@@ -29,6 +29,7 @@
 import com.googlesource.gerrit.plugins.multisite.index.ChangeCheckerImpl;
 import com.googlesource.gerrit.plugins.multisite.index.ForwardedIndexExecutor;
 import java.util.Optional;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -127,17 +128,19 @@
         retryCount,
         id,
         retryInterval);
-    indexExecutor.schedule(
-        () -> {
-          try (ManualRequestContext ctx = oneOffCtx.open()) {
-            Context.setForwardedEvent(true);
-            doIndex(id, indexEvent, retryCount);
-          } catch (Exception e) {
-            log.warn("Change {} could not be indexed", id, e);
-          }
-        },
-        retryInterval,
-        TimeUnit.MILLISECONDS);
+    @SuppressWarnings("unused")
+    Future<?> possiblyIgnoredError =
+        indexExecutor.schedule(
+            () -> {
+              try (ManualRequestContext ctx = oneOffCtx.open()) {
+                Context.setForwardedEvent(true);
+                doIndex(id, indexEvent, retryCount);
+              } catch (Exception e) {
+                log.warn("Change {} could not be indexed", id, e);
+              }
+            },
+            retryInterval,
+            TimeUnit.MILLISECONDS);
     return true;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
index 7891336..dff21c1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
@@ -16,22 +16,23 @@
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 
 @Singleton
 public class BrokerCacheEvictionForwarder implements CacheEvictionForwarder {
-  private final BrokerPublisher publisher;
+  private final BrokerApi broker;
 
   @Inject
-  BrokerCacheEvictionForwarder(BrokerPublisher publisher) {
-    this.publisher = publisher;
+  BrokerCacheEvictionForwarder(BrokerApiWrapper broker) {
+    this.broker = broker;
   }
 
   @Override
   public boolean evict(CacheEvictionEvent event) {
-    return publisher.publishEvent(EventFamily.CACHE_EVENT, event);
+    return broker.send(EventTopic.CACHE_TOPIC.topic(), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
index bf70c84..c2cc3dc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
@@ -15,21 +15,22 @@
 package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
 
 import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 
 public class BrokerIndexEventForwarder implements IndexEventForwarder {
-  private final BrokerPublisher publisher;
+  private final BrokerApi broker;
 
   @Inject
-  BrokerIndexEventForwarder(BrokerPublisher publisher) {
-    this.publisher = publisher;
+  BrokerIndexEventForwarder(BrokerApiWrapper broker) {
+    this.broker = broker;
   }
 
   @Override
   public boolean index(IndexEvent event) {
-    return publisher.publishEvent(EventFamily.INDEX_EVENT, event);
+    return broker.send(EventTopic.INDEX_TOPIC.topic(), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
index cb05a4e..1a8b652 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
@@ -14,25 +14,26 @@
 
 package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
 
-import static com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily.PROJECT_LIST_EVENT;
+import static com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic.PROJECT_LIST_TOPIC;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
 
 @Singleton
 public class BrokerProjectListUpdateForwarder implements ProjectListUpdateForwarder {
-  private final BrokerPublisher publisher;
+  private final BrokerApi broker;
 
   @Inject
-  BrokerProjectListUpdateForwarder(BrokerPublisher publisher) {
-    this.publisher = publisher;
+  BrokerProjectListUpdateForwarder(BrokerApiWrapper broker) {
+    this.broker = broker;
   }
 
   @Override
   public boolean updateProjectList(ProjectListUpdateEvent event) {
-    return publisher.publishEvent(PROJECT_LIST_EVENT, event);
+    return broker.send(PROJECT_LIST_TOPIC.topic(), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
index 63db260..ed3a717 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
@@ -17,21 +17,22 @@
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 
 @Singleton
 public class BrokerStreamEventForwarder implements StreamEventForwarder {
-  private final BrokerPublisher publisher;
+  private final BrokerApi broker;
 
   @Inject
-  BrokerStreamEventForwarder(BrokerPublisher publisher) {
-    this.publisher = publisher;
+  BrokerStreamEventForwarder(BrokerApiWrapper broker) {
+    this.broker = broker;
   }
 
   @Override
   public boolean send(Event event) {
-    return publisher.publishEvent(EventFamily.STREAM_EVENT, event);
+    return broker.send(EventTopic.STREAM_EVENT_TOPIC.topic(), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventFamily.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventFamily.java
deleted file mode 100644
index e8d45ed..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventFamily.java
+++ /dev/null
@@ -1,28 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.forwarder.events;
-
-import com.google.common.base.CaseFormat;
-
-public enum EventFamily {
-  INDEX_EVENT,
-  CACHE_EVENT,
-  PROJECT_LIST_EVENT,
-  STREAM_EVENT;
-
-  public String lowerCamelName() {
-    return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, name());
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
new file mode 100644
index 0000000..24b29b7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -0,0 +1,48 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.forwarder.events;
+
+import com.google.common.base.CaseFormat;
+
+public enum EventTopic {
+  INDEX_TOPIC("GERRIT.EVENT.INDEX"),
+  CACHE_TOPIC("GERRIT.EVENT.CACHE"),
+  PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST"),
+  STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM");
+
+  private final String topic;
+
+  private EventTopic(String topic) {
+    this.topic = topic;
+  }
+
+  public String lowerCamelName() {
+    return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, name());
+  }
+
+  public String topic() {
+    return topic;
+  }
+
+  public static EventTopic of(String topicString) {
+    EventTopic[] topics = EventTopic.values();
+    for (EventTopic topic : topics) {
+      if (topic.topic.equals(topicString)) {
+        return topic;
+      }
+    }
+    return null;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/RouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/RouterModule.java
new file mode 100644
index 0000000..6f1572a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/RouterModule.java
@@ -0,0 +1,27 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
+
+import com.google.inject.AbstractModule;
+
+public class RouterModule extends AbstractModule {
+  @Override
+  protected void configure() {
+    bind(ForwardedIndexEventRouter.class).to(IndexEventRouter.class);
+    bind(ForwardedCacheEvictionEventRouter.class).to(CacheEvictionEventRouter.class);
+    bind(ForwardedProjectListUpdateRouter.class).to(ProjectListUpdateRouter.class);
+    bind(ForwardedStreamEventRouter.class).to(StreamEventRouter.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
new file mode 100644
index 0000000..ff23b78
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
@@ -0,0 +1,67 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka;
+
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.server.events.Event;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventSubscriber;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+public class KafkaBrokerApi implements BrokerApi, LifecycleListener {
+
+  private final BrokerPublisher publisher;
+  private final Provider<KafkaEventSubscriber> subscriberProvider;
+  private List<KafkaEventSubscriber> subscribers;
+
+  @Inject
+  public KafkaBrokerApi(
+      BrokerPublisher publisher, Provider<KafkaEventSubscriber> subscriberProvider) {
+    this.publisher = publisher;
+    this.subscriberProvider = subscriberProvider;
+    subscribers = new ArrayList<>();
+  }
+
+  @Override
+  public boolean send(String topic, Event event) {
+    return publisher.publish(topic, event);
+  }
+
+  @Override
+  public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
+    KafkaEventSubscriber subscriber = subscriberProvider.get();
+    synchronized (subscribers) {
+      subscribers.add(subscriber);
+    }
+    subscriber.subscribe(EventTopic.of(topic), eventConsumer);
+  }
+
+  @Override
+  public void start() {}
+
+  @Override
+  public void stop() {
+    for (KafkaEventSubscriber subscriber : subscribers) {
+      subscriber.shutdown();
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
index ea16bc3..baec520 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
@@ -24,7 +24,7 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -106,14 +106,14 @@
     return defaultValue;
   }
 
-  private static Map<EventFamily, Boolean> eventsEnabled(
+  private static Map<EventTopic, Boolean> eventsEnabled(
       Supplier<Config> config, String subsection) {
-    Map<EventFamily, Boolean> eventsEnabled = new HashMap<>();
-    for (EventFamily eventFamily : EventFamily.values()) {
-      String enabledConfigKey = eventFamily.lowerCamelName() + "Enabled";
+    Map<EventTopic, Boolean> eventsEnabled = new HashMap<>();
+    for (EventTopic topic : EventTopic.values()) {
+      String enabledConfigKey = topic.lowerCamelName() + "Enabled";
 
       eventsEnabled.put(
-          eventFamily,
+          topic,
           config
               .get()
               .getBoolean(KAFKA_SECTION, subsection, enabledConfigKey, DEFAULT_ENABLE_PROCESSING));
@@ -144,18 +144,18 @@
   }
 
   public static class Kafka {
-    private final Map<EventFamily, String> eventTopics;
+    private final Map<EventTopic, String> eventTopics;
     private final String bootstrapServers;
 
-    private static final ImmutableMap<EventFamily, String> EVENT_TOPICS =
+    private static final ImmutableMap<EventTopic, String> EVENT_TOPICS =
         ImmutableMap.of(
-            EventFamily.INDEX_EVENT,
+            EventTopic.INDEX_TOPIC,
             "GERRIT.EVENT.INDEX",
-            EventFamily.STREAM_EVENT,
+            EventTopic.STREAM_EVENT_TOPIC,
             "GERRIT.EVENT.STREAM",
-            EventFamily.CACHE_EVENT,
+            EventTopic.CACHE_TOPIC,
             "GERRIT.EVENT.CACHE",
-            EventFamily.PROJECT_LIST_EVENT,
+            EventTopic.PROJECT_LIST_TOPIC,
             "GERRIT.EVENT.PROJECT.LIST");
 
     Kafka(Supplier<Config> config) {
@@ -164,7 +164,7 @@
               config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
 
       this.eventTopics = new HashMap<>();
-      for (Map.Entry<EventFamily, String> topicDefault : EVENT_TOPICS.entrySet()) {
+      for (Map.Entry<EventTopic, String> topicDefault : EVENT_TOPICS.entrySet()) {
         String topicConfigKey = topicDefault.getKey().lowerCamelName() + "Topic";
         eventTopics.put(
             topicDefault.getKey(),
@@ -172,8 +172,8 @@
       }
     }
 
-    public String getTopic(EventFamily eventType) {
-      return eventTopics.get(eventType);
+    public String getTopicAlias(EventTopic topic) {
+      return eventTopics.get(topic);
     }
 
     public String getBootstrapServers() {
@@ -199,7 +199,7 @@
     public static final boolean DEFAULT_BROKER_ENABLED = false;
 
     private final boolean enabled;
-    private final Map<EventFamily, Boolean> eventsEnabled;
+    private final Map<EventTopic, Boolean> eventsEnabled;
 
     private KafkaPublisher(Supplier<Config> cfg) {
       enabled =
@@ -230,7 +230,7 @@
       return enabled;
     }
 
-    public boolean enabledEvent(EventFamily eventType) {
+    public boolean enabledEvent(EventTopic eventType) {
       return eventsEnabled.get(eventType);
     }
   }
@@ -242,7 +242,7 @@
 
     private final boolean enabled;
     private final Integer pollingInterval;
-    private Map<EventFamily, Boolean> eventsEnabled;
+    private Map<EventTopic, Boolean> eventsEnabled;
     private final Config cfg;
 
     public KafkaSubscriber(Supplier<Config> configSupplier) {
@@ -268,8 +268,8 @@
       return enabled;
     }
 
-    public boolean enabledEvent(EventFamily eventFamily) {
-      return eventsEnabled.get(eventFamily);
+    public boolean enabledEvent(EventTopic topic) {
+      return eventsEnabled.get(topic);
     }
 
     public Properties initPropsWith(UUID instanceId) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
deleted file mode 100644
index b3b409c..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ /dev/null
@@ -1,143 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.exceptions.StorageException;
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.EventGson;
-import com.google.gerrit.server.permissions.PermissionBackendException;
-import com.google.gerrit.server.util.ManualRequestContext;
-import com.google.gerrit.server.util.OneOffRequestContext;
-import com.google.gson.Gson;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.Deserializer;
-
-public abstract class AbstractKafkaSubcriber implements Runnable {
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-
-  private final Consumer<byte[], byte[]> consumer;
-  private final ForwardedEventRouter eventRouter;
-  private final DynamicSet<DroppedEventListener> droppedEventListeners;
-  private final Gson gson;
-  private final UUID instanceId;
-  private final AtomicBoolean closed = new AtomicBoolean(false);
-  private final Deserializer<SourceAwareEventWrapper> valueDeserializer;
-  private final KafkaConfiguration configuration;
-  private final OneOffRequestContext oneOffCtx;
-  private final MessageLogger msgLog;
-
-  public AbstractKafkaSubcriber(
-      KafkaConfiguration configuration,
-      KafkaConsumerFactory consumerFactory,
-      Deserializer<byte[]> keyDeserializer,
-      Deserializer<SourceAwareEventWrapper> valueDeserializer,
-      ForwardedEventRouter eventRouter,
-      DynamicSet<DroppedEventListener> droppedEventListeners,
-      @EventGson Gson gson,
-      @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx,
-      MessageLogger msgLog) {
-    this.configuration = configuration;
-    this.eventRouter = eventRouter;
-    this.droppedEventListeners = droppedEventListeners;
-    this.gson = gson;
-    this.instanceId = instanceId;
-    this.oneOffCtx = oneOffCtx;
-    this.msgLog = msgLog;
-    final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
-    try {
-      Thread.currentThread().setContextClassLoader(AbstractKafkaSubcriber.class.getClassLoader());
-      this.consumer = consumerFactory.create(keyDeserializer, instanceId);
-    } finally {
-      Thread.currentThread().setContextClassLoader(previousClassLoader);
-    }
-    this.valueDeserializer = valueDeserializer;
-  }
-
-  @Override
-  public void run() {
-    try {
-      final String topic = configuration.getKafka().getTopic(getEventFamily());
-      logger.atInfo().log(
-          "Kafka consumer subscribing to topic [%s] for event family [%s]",
-          topic, getEventFamily());
-      consumer.subscribe(Collections.singleton(topic));
-      while (!closed.get()) {
-        ConsumerRecords<byte[], byte[]> consumerRecords =
-            consumer.poll(Duration.ofMillis(configuration.kafkaSubscriber().getPollingInterval()));
-        consumerRecords.forEach(this::processRecord);
-      }
-    } catch (WakeupException e) {
-      // Ignore exception if closing
-      if (!closed.get()) throw e;
-    } finally {
-      consumer.close();
-    }
-  }
-
-  protected abstract EventFamily getEventFamily();
-
-  private void processRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
-    try (ManualRequestContext ctx = oneOffCtx.open()) {
-
-      SourceAwareEventWrapper event =
-          valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
-
-      if (event.getHeader().getSourceInstanceId().equals(instanceId)) {
-        logger.atFiner().log(
-            "Dropping event %s produced by our instanceId %s",
-            event.toString(), instanceId.toString());
-        droppedEventListeners.forEach(l -> l.onEventDropped(event));
-      } else {
-        try {
-          msgLog.log(Direction.CONSUME, event);
-          eventRouter.route(event.getEventBody(gson));
-        } catch (IOException e) {
-          logger.atSevere().withCause(e).log(
-              "Malformed event '%s': [Exception: %s]", event.getHeader().getEventType());
-        } catch (PermissionBackendException | StorageException e) {
-          logger.atSevere().withCause(e).log(
-              "Cannot handle message %s: [Exception: %s]", event.getHeader().getEventType());
-        }
-      }
-    } catch (Exception e) {
-      logger.atSevere().withCause(e).log(
-          "Malformed event '%s': [Exception: %s]", new String(consumerRecord.value(), UTF_8));
-    }
-  }
-
-  // Shutdown hook which can be called from a separate thread
-  public void shutdown() {
-    closed.set(true);
-    consumer.wakeup();
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
index cc8a849..b7bcabd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
@@ -18,11 +18,15 @@
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.Inject;
 import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
+import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
+import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 
@@ -37,31 +41,22 @@
 
   @Override
   protected void configure() {
-    MultiSiteEvent.registerEventTypes();
+
     bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
     bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
         .to(KafkaEventDeserializer.class);
 
-    bind(Executor.class)
-        .annotatedWith(ConsumerExecutor.class)
-        .toInstance(Executors.newFixedThreadPool(EventFamily.values().length));
-    listener().to(MultiSiteKafkaConsumerRunner.class);
-
-    DynamicSet.setOf(binder(), AbstractKafkaSubcriber.class);
-
-    if (config.kafkaSubscriber().enabledEvent(EventFamily.INDEX_EVENT)) {
-      DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(IndexEventSubscriber.class);
+    if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
+      DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
     }
-    if (config.kafkaSubscriber().enabledEvent(EventFamily.STREAM_EVENT)) {
-      DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(StreamEventSubscriber.class);
+    if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
+      DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
     }
-    if (config.kafkaSubscriber().enabledEvent(EventFamily.CACHE_EVENT)) {
-      DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
-          .to(KafkaCacheEvictionEventSubscriber.class);
+    if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
+      DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
     }
-    if (config.kafkaSubscriber().enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
-      DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
-          .to(ProjectUpdateEventSubscriber.class);
+    if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
+      DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
     }
 
     DynamicSet.setOf(binder(), DroppedEventListener.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
index 2737675..afd2656 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
@@ -18,6 +18,7 @@
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import java.util.Map;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
new file mode 100644
index 0000000..79a37eb
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
@@ -0,0 +1,119 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.exceptions.StorageException;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventGson;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+public class KafkaEventSubscriber {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final Consumer<byte[], byte[]> consumer;
+  private final OneOffRequestContext oneOffCtx;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  private final Deserializer<SourceAwareEventWrapper> valueDeserializer;
+  private final KafkaConfiguration configuration;
+  private final SubscriberMetrics subscriberMetrics;
+
+  @Inject
+  public KafkaEventSubscriber(
+      KafkaConfiguration configuration,
+      KafkaConsumerFactory consumerFactory,
+      Deserializer<byte[]> keyDeserializer,
+      Deserializer<SourceAwareEventWrapper> valueDeserializer,
+      @InstanceId UUID instanceId,
+      OneOffRequestContext oneOffCtx,
+      SubscriberMetrics subscriberMetrics) {
+
+    this.configuration = configuration;
+    this.oneOffCtx = oneOffCtx;
+    this.subscriberMetrics = subscriberMetrics;
+
+    final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+    try {
+      Thread.currentThread().setContextClassLoader(KafkaEventSubscriber.class.getClassLoader());
+      this.consumer = consumerFactory.create(keyDeserializer, instanceId);
+    } finally {
+      Thread.currentThread().setContextClassLoader(previousClassLoader);
+    }
+    this.valueDeserializer = valueDeserializer;
+  }
+
+  public void subscribe(
+      EventTopic evenTopic, java.util.function.Consumer<SourceAwareEventWrapper> messageProcessor) {
+    try {
+      final String topic = configuration.getKafka().getTopicAlias(evenTopic);
+      logger.atInfo().log(
+          "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, evenTopic);
+      consumer.subscribe(Collections.singleton(topic));
+      while (!closed.get()) {
+        ConsumerRecords<byte[], byte[]> consumerRecords =
+            consumer.poll(Duration.ofMillis(configuration.kafkaSubscriber().getPollingInterval()));
+        consumerRecords.forEach(
+            consumerRecord -> {
+              try (ManualRequestContext ctx = oneOffCtx.open()) {
+                SourceAwareEventWrapper event =
+                    valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
+                messageProcessor.accept(event);
+              } catch (Exception e) {
+                logger.atSevere().withCause(e).log(
+                    "Malformed event '%s': [Exception: %s]",
+                    new String(consumerRecord.value(), UTF_8));
+                subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
+              }
+            });
+      }
+    } catch (WakeupException e) {
+      // Ignore exception if closing
+      if (!closed.get()) throw e;
+    } catch (Exception e) {
+      subscriberMetrics.incrementSubscriberFailedToPollMessages();
+      throw e;
+    } finally {
+      consumer.close();
+    }
+  }
+
+  public void shutdown() {
+    closed.set(true);
+    consumer.wakeup();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
index a6422ae..2e05de8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
@@ -16,14 +16,6 @@
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.forwarder.router.CacheEvictionEventRouter;
-import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedCacheEvictionEventRouter;
-import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedIndexEventRouter;
-import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedProjectListUpdateRouter;
-import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedStreamEventRouter;
-import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
-import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
-import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaConsumerModule;
 
@@ -41,11 +33,6 @@
   @Override
   protected void configure() {
     if (kafkaConfig.kafkaSubscriber().enabled()) {
-      bind(ForwardedIndexEventRouter.class).to(IndexEventRouter.class);
-      bind(ForwardedCacheEvictionEventRouter.class).to(CacheEvictionEventRouter.class);
-      bind(ForwardedProjectListUpdateRouter.class).to(ProjectListUpdateRouter.class);
-      bind(ForwardedStreamEventRouter.class).to(StreamEventRouter.class);
-
       install(kafkaConsumerModule);
     }
   }
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 83082b2..204742e 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -99,3 +99,16 @@
 * Broker failed to publish message count
 
 `metric=multi_site/broker/broker_message_publisher_failure_counter/broker_msg_publisher_failure_counter, type=com.codahale.metrics.Meter`
+
+### Message subscriber
+* Subscriber message consumed count
+
+`multi_site/subscriber/subscriber_message_consumer_counter/subscriber_msg_consumer_counter, type=com.codahale.metrics.Meter`
+
+* Subscriber failed to consume message count
+
+`multi_site/subscriber/subscriber_message_consumer_failure_counter/subscriber_msg_consumer_failure_counter, type=com.codahale.metrics.Meter`
+
+* Subscriber failed to poll messages count
+
+`multi_site/subscriber/subscriber_message_consumer_poll_failure_counter/subscriber_msg_consumer_poll_failure_counter, type=com.codahale.metrics.Meter`
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/DisabledMessageLogger.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/DisabledMessageLogger.java
deleted file mode 100644
index a558be9..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/DisabledMessageLogger.java
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite;
-
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.SourceAwareEventWrapper;
-import org.junit.Ignore;
-
-@Ignore
-public class DisabledMessageLogger implements MessageLogger {
-
-  @Override
-  public void log(Direction direction, SourceAwareEventWrapper event) {}
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
new file mode 100644
index 0000000..2abc7b0
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -0,0 +1,56 @@
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerApiWrapperTest {
+  @Mock private BrokerMetrics brokerMetrics;
+  @Mock private BrokerApi brokerApi;
+  @Mock Event event;
+
+  private BrokerApiWrapper objectUnderTest;
+
+  @Before
+  public void setUp() {
+    objectUnderTest =
+        new BrokerApiWrapper(DynamicItem.itemOf(BrokerApi.class, brokerApi), brokerMetrics);
+  }
+
+  @Test
+  public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
+    when(brokerApi.send(any(), any())).thenReturn(true);
+    objectUnderTest.send(EventTopic.INDEX_TOPIC.topic(), event);
+    verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
+  }
+
+  @Test
+  public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailed() {
+    when(brokerApi.send(any(), any())).thenReturn(false);
+    objectUnderTest.send(EventTopic.INDEX_TOPIC.topic(), event);
+    verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
+  }
+
+  @Test
+  public void shouldIncrementBrokerFailedMetricCounterWhenUnexpectedException() {
+    when(brokerApi.send(any(), any()))
+        .thenThrow(new RuntimeException("Unexpected runtime exception"));
+    try {
+      objectUnderTest.send(EventTopic.INDEX_TOPIC.topic(), event);
+    } catch (RuntimeException e) {
+      // expected
+    }
+    verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
index fc14bed..d5f8b66 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
@@ -16,6 +16,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.only;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -33,12 +34,10 @@
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-import com.googlesource.gerrit.plugins.multisite.DisabledMessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerMetrics;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import java.util.UUID;
 import org.junit.Before;
 import org.junit.Test;
@@ -51,9 +50,9 @@
 
   @Mock private BrokerMetrics brokerMetrics;
   @Mock private BrokerSession session;
+  @Mock private MessageLogger msgLog;
   private BrokerPublisher publisher;
 
-  private MessageLogger NO_MSG_LOG = new DisabledMessageLogger();
   private Gson gson = new EventGsonProvider().get();
 
   private String accountName = "Foo Bar";
@@ -73,7 +72,7 @@
 
   @Before
   public void setUp() {
-    publisher = new BrokerPublisher(session, gson, UUID.randomUUID(), NO_MSG_LOG, brokerMetrics);
+    publisher = new BrokerPublisher(session, gson, UUID.randomUUID(), msgLog);
   }
 
   @Test
@@ -124,20 +123,20 @@
   }
 
   @Test
-  public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
+  public void shouldLogEventPublishedMessageWhenPublishingSucceed() {
     Event event = createSampleEvent();
-    when(session.publishEvent(any(), any())).thenReturn(true);
-    publisher.publishEvent(EventFamily.INDEX_EVENT, event);
-    verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
+    when(session.publish(any(), any())).thenReturn(true);
+    publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
+    verify(msgLog, only()).log(any(), any());
   }
 
   @Test
-  public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublished() {
+  public void shouldSkipEventPublishedLoggingWhenMessagePublishigFailed() {
     Event event = createSampleEvent();
-    when(session.publishEvent(any(), any())).thenReturn(false);
+    when(session.publish(any(), any())).thenReturn(false);
 
-    publisher.publishEvent(EventFamily.INDEX_EVENT, event);
-    verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
+    publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
+    verify(msgLog, never()).log(any(), any());
   }
 
   private Event createSampleEvent() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 73568ea..5683bfb 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -41,6 +41,8 @@
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.Module;
 import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
index e44683d..76ad452 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
@@ -19,6 +19,7 @@
 
 import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gson.Gson;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import java.util.UUID;
 import org.junit.Before;
 import org.junit.Test;