Fix issue with Kafka topic alias resolving

EventTopic doesn't match one to one with kafka topic configuration key.
This was an incompatible change because previous version is
producing/consuming messages with topics from config file while
the new code is using EventTopic class.

Change-Id: I7444286a9532686bc1d051bdb75fa4a5be233318
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index 24b29b7..7d42acc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -14,28 +14,32 @@
 
 package com.googlesource.gerrit.plugins.multisite.forwarder.events;
 
-import com.google.common.base.CaseFormat;
-
 public enum EventTopic {
-  INDEX_TOPIC("GERRIT.EVENT.INDEX"),
-  CACHE_TOPIC("GERRIT.EVENT.CACHE"),
-  PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST"),
-  STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM");
+  INDEX_TOPIC("GERRIT.EVENT.INDEX", "indexEvent"),
+  CACHE_TOPIC("GERRIT.EVENT.CACHE", "cacheEvent"),
+  PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST", "projectListEvent"),
+  STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM", "streamEvent");
 
   private final String topic;
+  private final String aliasKey;
 
-  private EventTopic(String topic) {
+  private EventTopic(String topic, String aliasKey) {
     this.topic = topic;
-  }
-
-  public String lowerCamelName() {
-    return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, name());
+    this.aliasKey = aliasKey;
   }
 
   public String topic() {
     return topic;
   }
 
+  public String topicAliasKey() {
+    return aliasKey + "Topic";
+  }
+
+  public String enabledKey() {
+    return aliasKey + "Enabled";
+  }
+
   public static EventTopic of(String topicString) {
     EventTopic[] topics = EventTopic.values();
     for (EventTopic topic : topics) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
index baec520..97528de 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
@@ -20,7 +20,6 @@
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
@@ -110,13 +109,12 @@
       Supplier<Config> config, String subsection) {
     Map<EventTopic, Boolean> eventsEnabled = new HashMap<>();
     for (EventTopic topic : EventTopic.values()) {
-      String enabledConfigKey = topic.lowerCamelName() + "Enabled";
-
       eventsEnabled.put(
           topic,
           config
               .get()
-              .getBoolean(KAFKA_SECTION, subsection, enabledConfigKey, DEFAULT_ENABLE_PROCESSING));
+              .getBoolean(
+                  KAFKA_SECTION, subsection, topic.enabledKey(), DEFAULT_ENABLE_PROCESSING));
     }
     return eventsEnabled;
   }
@@ -147,28 +145,16 @@
     private final Map<EventTopic, String> eventTopics;
     private final String bootstrapServers;
 
-    private static final ImmutableMap<EventTopic, String> EVENT_TOPICS =
-        ImmutableMap.of(
-            EventTopic.INDEX_TOPIC,
-            "GERRIT.EVENT.INDEX",
-            EventTopic.STREAM_EVENT_TOPIC,
-            "GERRIT.EVENT.STREAM",
-            EventTopic.CACHE_TOPIC,
-            "GERRIT.EVENT.CACHE",
-            EventTopic.PROJECT_LIST_TOPIC,
-            "GERRIT.EVENT.PROJECT.LIST");
-
     Kafka(Supplier<Config> config) {
       this.bootstrapServers =
           getString(
               config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
 
       this.eventTopics = new HashMap<>();
-      for (Map.Entry<EventTopic, String> topicDefault : EVENT_TOPICS.entrySet()) {
-        String topicConfigKey = topicDefault.getKey().lowerCamelName() + "Topic";
+      for (EventTopic eventTopic : EventTopic.values()) {
         eventTopics.put(
-            topicDefault.getKey(),
-            getString(config, KAFKA_SECTION, null, topicConfigKey, topicDefault.getValue()));
+            eventTopic,
+            getString(config, KAFKA_SECTION, null, eventTopic.topicAliasKey(), eventTopic.topic()));
       }
     }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
index ba423ff..84e1dd8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
@@ -22,6 +22,7 @@
 import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
 
 import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import org.eclipse.jgit.lib.Config;
 import org.junit.Before;
 import org.junit.Test;
@@ -121,4 +122,79 @@
 
     assertThat(property).isNull();
   }
+
+  @Test
+  public void shouldReturnKafkaTopicAliasForIndexTopic() {
+    setKafkaTopicAlias("indexEventTopic", "gerrit_index");
+    final String property = getConfiguration().getKafka().getTopicAlias(EventTopic.INDEX_TOPIC);
+
+    assertThat(property).isEqualTo("gerrit_index");
+  }
+
+  @Test
+  public void shouldReturnKafkaTopicAliasForStreamEventTopic() {
+    setKafkaTopicAlias("streamEventTopic", "gerrit_stream_events");
+    final String property =
+        getConfiguration().getKafka().getTopicAlias(EventTopic.STREAM_EVENT_TOPIC);
+
+    assertThat(property).isEqualTo("gerrit_stream_events");
+  }
+
+  @Test
+  public void shouldReturnKafkaTopicAliasForProjectListEventTopic() {
+    setKafkaTopicAlias("projectListEventTopic", "gerrit_project_list");
+    final String property =
+        getConfiguration().getKafka().getTopicAlias(EventTopic.PROJECT_LIST_TOPIC);
+
+    assertThat(property).isEqualTo("gerrit_project_list");
+  }
+
+  @Test
+  public void shouldReturnKafkaTopicAliasForCacheEventTopic() {
+    setKafkaTopicAlias("cacheEventTopic", "gerrit_cache");
+    final String property = getConfiguration().getKafka().getTopicAlias(EventTopic.CACHE_TOPIC);
+
+    assertThat(property).isEqualTo("gerrit_cache");
+  }
+
+  @Test
+  public void shouldReturnKafkaTopicEnabledForCacheEventTopic() {
+    setKafkaTopicEnabled("cacheEventEnabled", false);
+    final Boolean property =
+        getConfiguration().kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC);
+    assertThat(property).isFalse();
+  }
+
+  @Test
+  public void shouldReturnKafkaTopicEnabledForIndexTopic() {
+    setKafkaTopicEnabled("indexEventEnabled", false);
+    final Boolean property =
+        getConfiguration().kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC);
+    assertThat(property).isFalse();
+  }
+
+  @Test
+  public void shouldReturnKafkaTopicEnabledForStreamEventTopic() {
+    setKafkaTopicEnabled("streamEventEnabled", false);
+    final Boolean property =
+        getConfiguration().kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC);
+    assertThat(property).isFalse();
+  }
+
+  @Test
+  public void shouldReturnKafkaTopicEnabledForProjectListEventTopic() {
+    setKafkaTopicEnabled("projectListEventEnabled", false);
+    final Boolean property =
+        getConfiguration().kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC);
+    assertThat(property).isFalse();
+  }
+
+  private void setKafkaTopicAlias(String topicKey, String topic) {
+    globalPluginConfig.setString(KAFKA_SECTION, null, topicKey, topic);
+  }
+
+  private void setKafkaTopicEnabled(String topicEnabledKey, Boolean isEnabled) {
+    globalPluginConfig.setBoolean(
+        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, topicEnabledKey, isEnabled);
+  }
 }