Fix issue with Kafka topic alias resolving
EventTopic doesn't match one to one with kafka topic configuration key.
This was an incompatible change because previous version is
producing/consuming messages with topics from config file while
the new code is using EventTopic class.
Change-Id: I7444286a9532686bc1d051bdb75fa4a5be233318
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index 24b29b7..7d42acc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -14,28 +14,32 @@
package com.googlesource.gerrit.plugins.multisite.forwarder.events;
-import com.google.common.base.CaseFormat;
-
public enum EventTopic {
- INDEX_TOPIC("GERRIT.EVENT.INDEX"),
- CACHE_TOPIC("GERRIT.EVENT.CACHE"),
- PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST"),
- STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM");
+ INDEX_TOPIC("GERRIT.EVENT.INDEX", "indexEvent"),
+ CACHE_TOPIC("GERRIT.EVENT.CACHE", "cacheEvent"),
+ PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST", "projectListEvent"),
+ STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM", "streamEvent");
private final String topic;
+ private final String aliasKey;
- private EventTopic(String topic) {
+ private EventTopic(String topic, String aliasKey) {
this.topic = topic;
- }
-
- public String lowerCamelName() {
- return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, name());
+ this.aliasKey = aliasKey;
}
public String topic() {
return topic;
}
+ public String topicAliasKey() {
+ return aliasKey + "Topic";
+ }
+
+ public String enabledKey() {
+ return aliasKey + "Enabled";
+ }
+
public static EventTopic of(String topicString) {
EventTopic[] topics = EventTopic.values();
for (EventTopic topic : topics) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
index baec520..97528de 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
@@ -20,7 +20,6 @@
import com.google.common.base.CaseFormat;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.Configuration;
@@ -110,13 +109,12 @@
Supplier<Config> config, String subsection) {
Map<EventTopic, Boolean> eventsEnabled = new HashMap<>();
for (EventTopic topic : EventTopic.values()) {
- String enabledConfigKey = topic.lowerCamelName() + "Enabled";
-
eventsEnabled.put(
topic,
config
.get()
- .getBoolean(KAFKA_SECTION, subsection, enabledConfigKey, DEFAULT_ENABLE_PROCESSING));
+ .getBoolean(
+ KAFKA_SECTION, subsection, topic.enabledKey(), DEFAULT_ENABLE_PROCESSING));
}
return eventsEnabled;
}
@@ -147,28 +145,16 @@
private final Map<EventTopic, String> eventTopics;
private final String bootstrapServers;
- private static final ImmutableMap<EventTopic, String> EVENT_TOPICS =
- ImmutableMap.of(
- EventTopic.INDEX_TOPIC,
- "GERRIT.EVENT.INDEX",
- EventTopic.STREAM_EVENT_TOPIC,
- "GERRIT.EVENT.STREAM",
- EventTopic.CACHE_TOPIC,
- "GERRIT.EVENT.CACHE",
- EventTopic.PROJECT_LIST_TOPIC,
- "GERRIT.EVENT.PROJECT.LIST");
-
Kafka(Supplier<Config> config) {
this.bootstrapServers =
getString(
config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
this.eventTopics = new HashMap<>();
- for (Map.Entry<EventTopic, String> topicDefault : EVENT_TOPICS.entrySet()) {
- String topicConfigKey = topicDefault.getKey().lowerCamelName() + "Topic";
+ for (EventTopic eventTopic : EventTopic.values()) {
eventTopics.put(
- topicDefault.getKey(),
- getString(config, KAFKA_SECTION, null, topicConfigKey, topicDefault.getValue()));
+ eventTopic,
+ getString(config, KAFKA_SECTION, null, eventTopic.topicAliasKey(), eventTopic.topic()));
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
index ba423ff..84e1dd8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
@@ -22,6 +22,7 @@
import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import org.eclipse.jgit.lib.Config;
import org.junit.Before;
import org.junit.Test;
@@ -121,4 +122,79 @@
assertThat(property).isNull();
}
+
+ @Test
+ public void shouldReturnKafkaTopicAliasForIndexTopic() {
+ setKafkaTopicAlias("indexEventTopic", "gerrit_index");
+ final String property = getConfiguration().getKafka().getTopicAlias(EventTopic.INDEX_TOPIC);
+
+ assertThat(property).isEqualTo("gerrit_index");
+ }
+
+ @Test
+ public void shouldReturnKafkaTopicAliasForStreamEventTopic() {
+ setKafkaTopicAlias("streamEventTopic", "gerrit_stream_events");
+ final String property =
+ getConfiguration().getKafka().getTopicAlias(EventTopic.STREAM_EVENT_TOPIC);
+
+ assertThat(property).isEqualTo("gerrit_stream_events");
+ }
+
+ @Test
+ public void shouldReturnKafkaTopicAliasForProjectListEventTopic() {
+ setKafkaTopicAlias("projectListEventTopic", "gerrit_project_list");
+ final String property =
+ getConfiguration().getKafka().getTopicAlias(EventTopic.PROJECT_LIST_TOPIC);
+
+ assertThat(property).isEqualTo("gerrit_project_list");
+ }
+
+ @Test
+ public void shouldReturnKafkaTopicAliasForCacheEventTopic() {
+ setKafkaTopicAlias("cacheEventTopic", "gerrit_cache");
+ final String property = getConfiguration().getKafka().getTopicAlias(EventTopic.CACHE_TOPIC);
+
+ assertThat(property).isEqualTo("gerrit_cache");
+ }
+
+ @Test
+ public void shouldReturnKafkaTopicEnabledForCacheEventTopic() {
+ setKafkaTopicEnabled("cacheEventEnabled", false);
+ final Boolean property =
+ getConfiguration().kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC);
+ assertThat(property).isFalse();
+ }
+
+ @Test
+ public void shouldReturnKafkaTopicEnabledForIndexTopic() {
+ setKafkaTopicEnabled("indexEventEnabled", false);
+ final Boolean property =
+ getConfiguration().kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC);
+ assertThat(property).isFalse();
+ }
+
+ @Test
+ public void shouldReturnKafkaTopicEnabledForStreamEventTopic() {
+ setKafkaTopicEnabled("streamEventEnabled", false);
+ final Boolean property =
+ getConfiguration().kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC);
+ assertThat(property).isFalse();
+ }
+
+ @Test
+ public void shouldReturnKafkaTopicEnabledForProjectListEventTopic() {
+ setKafkaTopicEnabled("projectListEventEnabled", false);
+ final Boolean property =
+ getConfiguration().kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC);
+ assertThat(property).isFalse();
+ }
+
+ private void setKafkaTopicAlias(String topicKey, String topic) {
+ globalPluginConfig.setString(KAFKA_SECTION, null, topicKey, topic);
+ }
+
+ private void setKafkaTopicEnabled(String topicEnabledKey, Boolean isEnabled) {
+ globalPluginConfig.setBoolean(
+ KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, topicEnabledKey, isEnabled);
+ }
}