Rename EventFamily to EventTopic

The name EventFamily was misleading as it did not really
define a family of events but only a topic name for sending
all the events having the same structure.

Use a proper name to represent a topic as EventTopic.

Change-Id: I4e7126129585a16cf7aa32cddd528178207acfb7
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
index 62bdcb0..a7732a0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
@@ -27,7 +27,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 
 public class KafkaBrokerForwarderModule extends LifecycleModule {
@@ -44,18 +44,18 @@
       listener().to(BrokerPublisher.class);
       bind(BrokerSession.class).to(KafkaSession.class);
 
-      if (config.kafkaPublisher().enabledEvent(EventFamily.INDEX_EVENT)) {
+      if (config.kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC)) {
         DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
       }
-      if (config.kafkaPublisher().enabledEvent(EventFamily.CACHE_EVENT)) {
+      if (config.kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC)) {
         DynamicSet.bind(binder(), CacheEvictionForwarder.class)
             .to(BrokerCacheEvictionForwarder.class);
       }
-      if (config.kafkaPublisher().enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
+      if (config.kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
         DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
             .to(BrokerProjectListUpdateForwarder.class);
       }
-      if (config.kafkaPublisher().enabledEvent(EventFamily.STREAM_EVENT)) {
+      if (config.kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
         DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
       }
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
index 802abc1..6594544 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
@@ -17,7 +17,7 @@
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
@@ -82,7 +82,7 @@
 
   @Override
   public boolean publish(String topic, String payload) {
-    return publishToTopic(properties.getKafka().getTopic(EventFamily.fromTopic(topic)), payload);
+    return publishToTopic(properties.getKafka().getTopicAlias(EventTopic.of(topic)), payload);
   }
 
   private boolean publishToTopic(String topic, String payload) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
index e354b87..e72502b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
@@ -19,7 +19,7 @@
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 
 @Singleton
 public class BrokerCacheEvictionForwarder implements CacheEvictionForwarder {
@@ -32,6 +32,6 @@
 
   @Override
   public boolean evict(CacheEvictionEvent event) {
-    return broker.send(EventFamily.CACHE_EVENT.topic(), event);
+    return broker.send(EventTopic.CACHE_TOPIC.topic(), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
index 1ca9618..c1ac522 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
@@ -17,7 +17,7 @@
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 
 public class BrokerIndexEventForwarder implements IndexEventForwarder {
@@ -30,6 +30,6 @@
 
   @Override
   public boolean index(IndexEvent event) {
-    return broker.send(EventFamily.INDEX_EVENT.topic(), event);
+    return broker.send(EventTopic.INDEX_TOPIC.topic(), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
index 235806d..4681b21 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
 
-import static com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily.PROJECT_LIST_EVENT;
+import static com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic.PROJECT_LIST_TOPIC;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -33,6 +33,6 @@
 
   @Override
   public boolean updateProjectList(ProjectListUpdateEvent event) {
-    return broker.send(PROJECT_LIST_EVENT.topic(), event);
+    return broker.send(PROJECT_LIST_TOPIC.topic(), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
index af45e86..426e101 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
@@ -19,7 +19,7 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 
 @Singleton
 public class BrokerStreamEventForwarder implements StreamEventForwarder {
@@ -32,6 +32,6 @@
 
   @Override
   public boolean send(Event event) {
-    return broker.send(EventFamily.STREAM_EVENT.topic(), event);
+    return broker.send(EventTopic.STREAM_EVENT_TOPIC.topic(), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventFamily.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
similarity index 67%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventFamily.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index b59f8c9..24b29b7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventFamily.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -16,15 +16,15 @@
 
 import com.google.common.base.CaseFormat;
 
-public enum EventFamily {
-  INDEX_EVENT("GERRIT.EVENT.INDEX"),
-  CACHE_EVENT("GERRIT.EVENT.CACHE"),
-  PROJECT_LIST_EVENT("GERRIT.EVENT.PROJECT.LIST"),
-  STREAM_EVENT("GERRIT.EVENT.STREAM");
+public enum EventTopic {
+  INDEX_TOPIC("GERRIT.EVENT.INDEX"),
+  CACHE_TOPIC("GERRIT.EVENT.CACHE"),
+  PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST"),
+  STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM");
 
   private final String topic;
 
-  private EventFamily(String topic) {
+  private EventTopic(String topic) {
     this.topic = topic;
   }
 
@@ -36,11 +36,11 @@
     return topic;
   }
 
-  public static EventFamily fromTopic(String topic) {
-    EventFamily[] eventFamilies = EventFamily.values();
-    for (EventFamily eventFamily : eventFamilies) {
-      if (eventFamily.topic.equals(topic)) {
-        return eventFamily;
+  public static EventTopic of(String topicString) {
+    EventTopic[] topics = EventTopic.values();
+    for (EventTopic topic : topics) {
+      if (topic.topic.equals(topicString)) {
+        return topic;
       }
     }
     return null;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
index ea16bc3..baec520 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
@@ -24,7 +24,7 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -106,14 +106,14 @@
     return defaultValue;
   }
 
-  private static Map<EventFamily, Boolean> eventsEnabled(
+  private static Map<EventTopic, Boolean> eventsEnabled(
       Supplier<Config> config, String subsection) {
-    Map<EventFamily, Boolean> eventsEnabled = new HashMap<>();
-    for (EventFamily eventFamily : EventFamily.values()) {
-      String enabledConfigKey = eventFamily.lowerCamelName() + "Enabled";
+    Map<EventTopic, Boolean> eventsEnabled = new HashMap<>();
+    for (EventTopic topic : EventTopic.values()) {
+      String enabledConfigKey = topic.lowerCamelName() + "Enabled";
 
       eventsEnabled.put(
-          eventFamily,
+          topic,
           config
               .get()
               .getBoolean(KAFKA_SECTION, subsection, enabledConfigKey, DEFAULT_ENABLE_PROCESSING));
@@ -144,18 +144,18 @@
   }
 
   public static class Kafka {
-    private final Map<EventFamily, String> eventTopics;
+    private final Map<EventTopic, String> eventTopics;
     private final String bootstrapServers;
 
-    private static final ImmutableMap<EventFamily, String> EVENT_TOPICS =
+    private static final ImmutableMap<EventTopic, String> EVENT_TOPICS =
         ImmutableMap.of(
-            EventFamily.INDEX_EVENT,
+            EventTopic.INDEX_TOPIC,
             "GERRIT.EVENT.INDEX",
-            EventFamily.STREAM_EVENT,
+            EventTopic.STREAM_EVENT_TOPIC,
             "GERRIT.EVENT.STREAM",
-            EventFamily.CACHE_EVENT,
+            EventTopic.CACHE_TOPIC,
             "GERRIT.EVENT.CACHE",
-            EventFamily.PROJECT_LIST_EVENT,
+            EventTopic.PROJECT_LIST_TOPIC,
             "GERRIT.EVENT.PROJECT.LIST");
 
     Kafka(Supplier<Config> config) {
@@ -164,7 +164,7 @@
               config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
 
       this.eventTopics = new HashMap<>();
-      for (Map.Entry<EventFamily, String> topicDefault : EVENT_TOPICS.entrySet()) {
+      for (Map.Entry<EventTopic, String> topicDefault : EVENT_TOPICS.entrySet()) {
         String topicConfigKey = topicDefault.getKey().lowerCamelName() + "Topic";
         eventTopics.put(
             topicDefault.getKey(),
@@ -172,8 +172,8 @@
       }
     }
 
-    public String getTopic(EventFamily eventType) {
-      return eventTopics.get(eventType);
+    public String getTopicAlias(EventTopic topic) {
+      return eventTopics.get(topic);
     }
 
     public String getBootstrapServers() {
@@ -199,7 +199,7 @@
     public static final boolean DEFAULT_BROKER_ENABLED = false;
 
     private final boolean enabled;
-    private final Map<EventFamily, Boolean> eventsEnabled;
+    private final Map<EventTopic, Boolean> eventsEnabled;
 
     private KafkaPublisher(Supplier<Config> cfg) {
       enabled =
@@ -230,7 +230,7 @@
       return enabled;
     }
 
-    public boolean enabledEvent(EventFamily eventType) {
+    public boolean enabledEvent(EventTopic eventType) {
       return eventsEnabled.get(eventType);
     }
   }
@@ -242,7 +242,7 @@
 
     private final boolean enabled;
     private final Integer pollingInterval;
-    private Map<EventFamily, Boolean> eventsEnabled;
+    private Map<EventTopic, Boolean> eventsEnabled;
     private final Config cfg;
 
     public KafkaSubscriber(Supplier<Config> configSupplier) {
@@ -268,8 +268,8 @@
       return enabled;
     }
 
-    public boolean enabledEvent(EventFamily eventFamily) {
-      return eventsEnabled.get(eventFamily);
+    public boolean enabledEvent(EventTopic topic) {
+      return eventsEnabled.get(topic);
     }
 
     public Properties initPropsWith(UUID instanceId) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index 839dce8..906b987 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -28,7 +28,7 @@
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.io.IOException;
@@ -92,10 +92,9 @@
   @Override
   public void run() {
     try {
-      final String topic = configuration.getKafka().getTopic(getEventFamily());
+      final String topic = configuration.getKafka().getTopicAlias(getTopic());
       logger.atInfo().log(
-          "Kafka consumer subscribing to topic [%s] for event family [%s]",
-          topic, getEventFamily());
+          "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, getTopic());
       consumer.subscribe(Collections.singleton(topic));
       while (!closed.get()) {
         ConsumerRecords<byte[], byte[]> consumerRecords =
@@ -113,7 +112,7 @@
     }
   }
 
-  protected abstract EventFamily getEventFamily();
+  protected abstract EventTopic getTopic();
 
   private void processRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
     try (ManualRequestContext ctx = oneOffCtx.open()) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
index bc93d95..9e55430 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
@@ -23,7 +23,7 @@
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
@@ -59,7 +59,7 @@
   }
 
   @Override
-  protected EventFamily getEventFamily() {
-    return EventFamily.INDEX_EVENT;
+  protected EventTopic getTopic() {
+    return EventTopic.INDEX_TOPIC;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
index 2031229..6663a64 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
@@ -23,7 +23,7 @@
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
@@ -59,7 +59,7 @@
   }
 
   @Override
-  protected EventFamily getEventFamily() {
-    return EventFamily.CACHE_EVENT;
+  protected EventTopic getTopic() {
+    return EventTopic.CACHE_TOPIC;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
index cc8a849..2943a19 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
@@ -18,7 +18,7 @@
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.Inject;
 import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.concurrent.Executor;
@@ -44,22 +44,22 @@
 
     bind(Executor.class)
         .annotatedWith(ConsumerExecutor.class)
-        .toInstance(Executors.newFixedThreadPool(EventFamily.values().length));
+        .toInstance(Executors.newFixedThreadPool(EventTopic.values().length));
     listener().to(MultiSiteKafkaConsumerRunner.class);
 
     DynamicSet.setOf(binder(), AbstractKafkaSubcriber.class);
 
-    if (config.kafkaSubscriber().enabledEvent(EventFamily.INDEX_EVENT)) {
+    if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
       DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(IndexEventSubscriber.class);
     }
-    if (config.kafkaSubscriber().enabledEvent(EventFamily.STREAM_EVENT)) {
+    if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
       DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(StreamEventSubscriber.class);
     }
-    if (config.kafkaSubscriber().enabledEvent(EventFamily.CACHE_EVENT)) {
+    if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
       DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
           .to(KafkaCacheEvictionEventSubscriber.class);
     }
-    if (config.kafkaSubscriber().enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
+    if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
       DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
           .to(ProjectUpdateEventSubscriber.class);
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
index 44c381e..e581b0e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
@@ -23,7 +23,7 @@
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
@@ -59,7 +59,7 @@
   }
 
   @Override
-  protected EventFamily getEventFamily() {
-    return EventFamily.PROJECT_LIST_EVENT;
+  protected EventTopic getTopic() {
+    return EventTopic.PROJECT_LIST_TOPIC;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
index 854bf05..dd882cd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
@@ -23,7 +23,7 @@
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
@@ -59,7 +59,7 @@
   }
 
   @Override
-  protected EventFamily getEventFamily() {
-    return EventFamily.STREAM_EVENT;
+  protected EventTopic getTopic() {
+    return EventTopic.STREAM_EVENT_TOPIC;
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
index d1a18c8..f31a73a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
@@ -38,7 +38,7 @@
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
 import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import java.util.UUID;
 import org.junit.Before;
 import org.junit.Test;
@@ -129,7 +129,7 @@
   public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
     Event event = createSampleEvent();
     when(session.publish(any(), any())).thenReturn(true);
-    publisher.publish(EventFamily.INDEX_EVENT.topic(), event);
+    publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
     verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
   }
 
@@ -138,7 +138,7 @@
     Event event = createSampleEvent();
     when(session.publish(any(), any())).thenReturn(false);
 
-    publisher.publish(EventFamily.INDEX_EVENT.topic(), event);
+    publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
     verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
   }
 
@@ -146,7 +146,7 @@
   public void shouldLogEventPublishedMessageWhenPublishingSucceed() {
     Event event = createSampleEvent();
     when(session.publish(any(), any())).thenReturn(true);
-    publisher.publish(EventFamily.INDEX_EVENT.topic(), event);
+    publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
     verify(msgLog, only()).log(any(), any());
   }
 
@@ -155,7 +155,7 @@
     Event event = createSampleEvent();
     when(session.publish(any(), any())).thenReturn(false);
 
-    publisher.publish(EventFamily.INDEX_EVENT.topic(), event);
+    publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
     verify(msgLog, never()).log(any(), any());
   }