Rename EventFamily to EventTopic
The name EventFamily was misleading as it did not really
define a family of events but only a topic name for sending
all the events having the same structure.
Use a proper name to represent a topic as EventTopic.
Change-Id: I4e7126129585a16cf7aa32cddd528178207acfb7
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
index 62bdcb0..a7732a0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
@@ -27,7 +27,7 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
public class KafkaBrokerForwarderModule extends LifecycleModule {
@@ -44,18 +44,18 @@
listener().to(BrokerPublisher.class);
bind(BrokerSession.class).to(KafkaSession.class);
- if (config.kafkaPublisher().enabledEvent(EventFamily.INDEX_EVENT)) {
+ if (config.kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC)) {
DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
}
- if (config.kafkaPublisher().enabledEvent(EventFamily.CACHE_EVENT)) {
+ if (config.kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC)) {
DynamicSet.bind(binder(), CacheEvictionForwarder.class)
.to(BrokerCacheEvictionForwarder.class);
}
- if (config.kafkaPublisher().enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
+ if (config.kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
.to(BrokerProjectListUpdateForwarder.class);
}
- if (config.kafkaPublisher().enabledEvent(EventFamily.STREAM_EVENT)) {
+ if (config.kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
index 802abc1..6594544 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
@@ -17,7 +17,7 @@
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@@ -82,7 +82,7 @@
@Override
public boolean publish(String topic, String payload) {
- return publishToTopic(properties.getKafka().getTopic(EventFamily.fromTopic(topic)), payload);
+ return publishToTopic(properties.getKafka().getTopicAlias(EventTopic.of(topic)), payload);
}
private boolean publishToTopic(String topic, String payload) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
index e354b87..e72502b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
@@ -19,7 +19,7 @@
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
@Singleton
public class BrokerCacheEvictionForwarder implements CacheEvictionForwarder {
@@ -32,6 +32,6 @@
@Override
public boolean evict(CacheEvictionEvent event) {
- return broker.send(EventFamily.CACHE_EVENT.topic(), event);
+ return broker.send(EventTopic.CACHE_TOPIC.topic(), event);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
index 1ca9618..c1ac522 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
@@ -17,7 +17,7 @@
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
public class BrokerIndexEventForwarder implements IndexEventForwarder {
@@ -30,6 +30,6 @@
@Override
public boolean index(IndexEvent event) {
- return broker.send(EventFamily.INDEX_EVENT.topic(), event);
+ return broker.send(EventTopic.INDEX_TOPIC.topic(), event);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
index 235806d..4681b21 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
@@ -14,7 +14,7 @@
package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
-import static com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily.PROJECT_LIST_EVENT;
+import static com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic.PROJECT_LIST_TOPIC;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -33,6 +33,6 @@
@Override
public boolean updateProjectList(ProjectListUpdateEvent event) {
- return broker.send(PROJECT_LIST_EVENT.topic(), event);
+ return broker.send(PROJECT_LIST_TOPIC.topic(), event);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
index af45e86..426e101 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
@@ -19,7 +19,7 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
@Singleton
public class BrokerStreamEventForwarder implements StreamEventForwarder {
@@ -32,6 +32,6 @@
@Override
public boolean send(Event event) {
- return broker.send(EventFamily.STREAM_EVENT.topic(), event);
+ return broker.send(EventTopic.STREAM_EVENT_TOPIC.topic(), event);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventFamily.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
similarity index 67%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventFamily.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index b59f8c9..24b29b7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventFamily.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -16,15 +16,15 @@
import com.google.common.base.CaseFormat;
-public enum EventFamily {
- INDEX_EVENT("GERRIT.EVENT.INDEX"),
- CACHE_EVENT("GERRIT.EVENT.CACHE"),
- PROJECT_LIST_EVENT("GERRIT.EVENT.PROJECT.LIST"),
- STREAM_EVENT("GERRIT.EVENT.STREAM");
+public enum EventTopic {
+ INDEX_TOPIC("GERRIT.EVENT.INDEX"),
+ CACHE_TOPIC("GERRIT.EVENT.CACHE"),
+ PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST"),
+ STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM");
private final String topic;
- private EventFamily(String topic) {
+ private EventTopic(String topic) {
this.topic = topic;
}
@@ -36,11 +36,11 @@
return topic;
}
- public static EventFamily fromTopic(String topic) {
- EventFamily[] eventFamilies = EventFamily.values();
- for (EventFamily eventFamily : eventFamilies) {
- if (eventFamily.topic.equals(topic)) {
- return eventFamily;
+ public static EventTopic of(String topicString) {
+ EventTopic[] topics = EventTopic.values();
+ for (EventTopic topic : topics) {
+ if (topic.topic.equals(topicString)) {
+ return topic;
}
}
return null;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
index ea16bc3..baec520 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
@@ -24,7 +24,7 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -106,14 +106,14 @@
return defaultValue;
}
- private static Map<EventFamily, Boolean> eventsEnabled(
+ private static Map<EventTopic, Boolean> eventsEnabled(
Supplier<Config> config, String subsection) {
- Map<EventFamily, Boolean> eventsEnabled = new HashMap<>();
- for (EventFamily eventFamily : EventFamily.values()) {
- String enabledConfigKey = eventFamily.lowerCamelName() + "Enabled";
+ Map<EventTopic, Boolean> eventsEnabled = new HashMap<>();
+ for (EventTopic topic : EventTopic.values()) {
+ String enabledConfigKey = topic.lowerCamelName() + "Enabled";
eventsEnabled.put(
- eventFamily,
+ topic,
config
.get()
.getBoolean(KAFKA_SECTION, subsection, enabledConfigKey, DEFAULT_ENABLE_PROCESSING));
@@ -144,18 +144,18 @@
}
public static class Kafka {
- private final Map<EventFamily, String> eventTopics;
+ private final Map<EventTopic, String> eventTopics;
private final String bootstrapServers;
- private static final ImmutableMap<EventFamily, String> EVENT_TOPICS =
+ private static final ImmutableMap<EventTopic, String> EVENT_TOPICS =
ImmutableMap.of(
- EventFamily.INDEX_EVENT,
+ EventTopic.INDEX_TOPIC,
"GERRIT.EVENT.INDEX",
- EventFamily.STREAM_EVENT,
+ EventTopic.STREAM_EVENT_TOPIC,
"GERRIT.EVENT.STREAM",
- EventFamily.CACHE_EVENT,
+ EventTopic.CACHE_TOPIC,
"GERRIT.EVENT.CACHE",
- EventFamily.PROJECT_LIST_EVENT,
+ EventTopic.PROJECT_LIST_TOPIC,
"GERRIT.EVENT.PROJECT.LIST");
Kafka(Supplier<Config> config) {
@@ -164,7 +164,7 @@
config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
this.eventTopics = new HashMap<>();
- for (Map.Entry<EventFamily, String> topicDefault : EVENT_TOPICS.entrySet()) {
+ for (Map.Entry<EventTopic, String> topicDefault : EVENT_TOPICS.entrySet()) {
String topicConfigKey = topicDefault.getKey().lowerCamelName() + "Topic";
eventTopics.put(
topicDefault.getKey(),
@@ -172,8 +172,8 @@
}
}
- public String getTopic(EventFamily eventType) {
- return eventTopics.get(eventType);
+ public String getTopicAlias(EventTopic topic) {
+ return eventTopics.get(topic);
}
public String getBootstrapServers() {
@@ -199,7 +199,7 @@
public static final boolean DEFAULT_BROKER_ENABLED = false;
private final boolean enabled;
- private final Map<EventFamily, Boolean> eventsEnabled;
+ private final Map<EventTopic, Boolean> eventsEnabled;
private KafkaPublisher(Supplier<Config> cfg) {
enabled =
@@ -230,7 +230,7 @@
return enabled;
}
- public boolean enabledEvent(EventFamily eventType) {
+ public boolean enabledEvent(EventTopic eventType) {
return eventsEnabled.get(eventType);
}
}
@@ -242,7 +242,7 @@
private final boolean enabled;
private final Integer pollingInterval;
- private Map<EventFamily, Boolean> eventsEnabled;
+ private Map<EventTopic, Boolean> eventsEnabled;
private final Config cfg;
public KafkaSubscriber(Supplier<Config> configSupplier) {
@@ -268,8 +268,8 @@
return enabled;
}
- public boolean enabledEvent(EventFamily eventFamily) {
- return eventsEnabled.get(eventFamily);
+ public boolean enabledEvent(EventTopic topic) {
+ return eventsEnabled.get(topic);
}
public Properties initPropsWith(UUID instanceId) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index 839dce8..906b987 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -28,7 +28,7 @@
import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.io.IOException;
@@ -92,10 +92,9 @@
@Override
public void run() {
try {
- final String topic = configuration.getKafka().getTopic(getEventFamily());
+ final String topic = configuration.getKafka().getTopicAlias(getTopic());
logger.atInfo().log(
- "Kafka consumer subscribing to topic [%s] for event family [%s]",
- topic, getEventFamily());
+ "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, getTopic());
consumer.subscribe(Collections.singleton(topic));
while (!closed.get()) {
ConsumerRecords<byte[], byte[]> consumerRecords =
@@ -113,7 +112,7 @@
}
}
- protected abstract EventFamily getEventFamily();
+ protected abstract EventTopic getTopic();
private void processRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
try (ManualRequestContext ctx = oneOffCtx.open()) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
index bc93d95..9e55430 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
@@ -23,7 +23,7 @@
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.UUID;
@@ -59,7 +59,7 @@
}
@Override
- protected EventFamily getEventFamily() {
- return EventFamily.INDEX_EVENT;
+ protected EventTopic getTopic() {
+ return EventTopic.INDEX_TOPIC;
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
index 2031229..6663a64 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
@@ -23,7 +23,7 @@
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.UUID;
@@ -59,7 +59,7 @@
}
@Override
- protected EventFamily getEventFamily() {
- return EventFamily.CACHE_EVENT;
+ protected EventTopic getTopic() {
+ return EventTopic.CACHE_TOPIC;
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
index cc8a849..2943a19 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
@@ -18,7 +18,7 @@
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.inject.Inject;
import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.concurrent.Executor;
@@ -44,22 +44,22 @@
bind(Executor.class)
.annotatedWith(ConsumerExecutor.class)
- .toInstance(Executors.newFixedThreadPool(EventFamily.values().length));
+ .toInstance(Executors.newFixedThreadPool(EventTopic.values().length));
listener().to(MultiSiteKafkaConsumerRunner.class);
DynamicSet.setOf(binder(), AbstractKafkaSubcriber.class);
- if (config.kafkaSubscriber().enabledEvent(EventFamily.INDEX_EVENT)) {
+ if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(IndexEventSubscriber.class);
}
- if (config.kafkaSubscriber().enabledEvent(EventFamily.STREAM_EVENT)) {
+ if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(StreamEventSubscriber.class);
}
- if (config.kafkaSubscriber().enabledEvent(EventFamily.CACHE_EVENT)) {
+ if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
.to(KafkaCacheEvictionEventSubscriber.class);
}
- if (config.kafkaSubscriber().enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
+ if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
.to(ProjectUpdateEventSubscriber.class);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
index 44c381e..e581b0e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
@@ -23,7 +23,7 @@
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.UUID;
@@ -59,7 +59,7 @@
}
@Override
- protected EventFamily getEventFamily() {
- return EventFamily.PROJECT_LIST_EVENT;
+ protected EventTopic getTopic() {
+ return EventTopic.PROJECT_LIST_TOPIC;
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
index 854bf05..dd882cd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
@@ -23,7 +23,7 @@
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.UUID;
@@ -59,7 +59,7 @@
}
@Override
- protected EventFamily getEventFamily() {
- return EventFamily.STREAM_EVENT;
+ protected EventTopic getTopic() {
+ return EventTopic.STREAM_EVENT_TOPIC;
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
index d1a18c8..f31a73a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
@@ -38,7 +38,7 @@
import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import java.util.UUID;
import org.junit.Before;
import org.junit.Test;
@@ -129,7 +129,7 @@
public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
Event event = createSampleEvent();
when(session.publish(any(), any())).thenReturn(true);
- publisher.publish(EventFamily.INDEX_EVENT.topic(), event);
+ publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
}
@@ -138,7 +138,7 @@
Event event = createSampleEvent();
when(session.publish(any(), any())).thenReturn(false);
- publisher.publish(EventFamily.INDEX_EVENT.topic(), event);
+ publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
}
@@ -146,7 +146,7 @@
public void shouldLogEventPublishedMessageWhenPublishingSucceed() {
Event event = createSampleEvent();
when(session.publish(any(), any())).thenReturn(true);
- publisher.publish(EventFamily.INDEX_EVENT.topic(), event);
+ publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
verify(msgLog, only()).log(any(), any());
}
@@ -155,7 +155,7 @@
Event event = createSampleEvent();
when(session.publish(any(), any())).thenReturn(false);
- publisher.publish(EventFamily.INDEX_EVENT.topic(), event);
+ publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
verify(msgLog, never()).log(any(), any());
}