Kafka module should be responsible for reading its own configuration

Allow kafka module to read its own configuration from multi-site.config
file. This separation allows to move kafka related code to
a separate plugin plus it will make easy to introduce new config file for
kafka plugin.

Feature: Issue 10825
Change-Id: I5988a8678cfce78d89bbda48fac5c05950ea6377
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
index b6c01f3..0857334 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
@@ -15,24 +15,21 @@
 package com.googlesource.gerrit.plugins.multisite.kafka;
 
 import static com.google.common.base.Suppliers.memoize;
-import static com.google.common.base.Suppliers.ofInstance;
 
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.PluginConfigFactory;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.lib.Config;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,11 +47,11 @@
   private final Supplier<KafkaPublisher> publisher;
 
   @Inject
-  public KafkaConfiguration(Configuration configuration) {
-    Supplier<Config> lazyCfg = lazyLoad(configuration.getMultiSiteConfig());
-    kafka = memoize(() -> new Kafka(lazyCfg));
-    publisher = memoize(() -> new KafkaPublisher(lazyCfg));
-    subscriber = memoize(() -> new KafkaSubscriber(lazyCfg));
+  public KafkaConfiguration(PluginConfigFactory configFactory, @PluginName String pluginName) {
+    Config cfg = configFactory.getGlobalPluginConfig(pluginName);
+    kafka = memoize(() -> new Kafka(cfg));
+    publisher = memoize(() -> new KafkaPublisher(cfg));
+    subscriber = memoize(() -> new KafkaSubscriber(cfg));
   }
 
   public Kafka getKafka() {
@@ -65,9 +62,7 @@
     return subscriber.get();
   }
 
-  private static void applyKafkaConfig(
-      Supplier<Config> configSupplier, String subsectionName, Properties target) {
-    Config config = configSupplier.get();
+  private static void applyKafkaConfig(Config config, String subsectionName, Properties target) {
     for (String section : config.getSubsections(KAFKA_SECTION)) {
       if (section.equals(subsectionName)) {
         for (String name : config.getNames(KAFKA_SECTION, section, true)) {
@@ -87,16 +82,12 @@
     target.put(
         "bootstrap.servers",
         getString(
-            configSupplier,
-            KAFKA_SECTION,
-            null,
-            "bootstrapServers",
-            DEFAULT_KAFKA_BOOTSTRAP_SERVERS));
+            config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS));
   }
 
   private static String getString(
-      Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
-    String value = cfg.get().getString(section, subsection, name);
+      Config cfg, String section, String subsection, String name, String defaultValue) {
+    String value = cfg.getString(section, subsection, name);
     if (!Strings.isNullOrEmpty(value)) {
       return value;
     }
@@ -107,29 +98,11 @@
     return publisher.get();
   }
 
-  private Supplier<Config> lazyLoad(Config config) {
-    if (config instanceof FileBasedConfig) {
-      return memoize(
-          () -> {
-            FileBasedConfig fileConfig = (FileBasedConfig) config;
-            String fileConfigFileName = fileConfig.getFile().getPath();
-            try {
-              log.info("Loading configuration from {}", fileConfigFileName);
-              fileConfig.load();
-            } catch (IOException | ConfigInvalidException e) {
-              log.error("Unable to load configuration from " + fileConfigFileName, e);
-            }
-            return fileConfig;
-          });
-    }
-    return ofInstance(config);
-  }
-
   public static class Kafka {
     private final Map<EventTopic, String> eventTopics;
     private final String bootstrapServers;
 
-    Kafka(Supplier<Config> config) {
+    Kafka(Config config) {
       this.bootstrapServers =
           getString(
               config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
@@ -151,8 +124,8 @@
     }
 
     private static String getString(
-        Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
-      String value = cfg.get().getString(section, subsection, name);
+        Config cfg, String section, String subsection, String name, String defaultValue) {
+      String value = cfg.getString(section, subsection, name);
       if (!Strings.isNullOrEmpty(value)) {
         return value;
       }
@@ -166,7 +139,7 @@
     public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
     public static final String KAFKA_PUBLISHER_SUBSECTION = "publisher";
 
-    private KafkaPublisher(Supplier<Config> kafkaConfig) {
+    private KafkaPublisher(Config kafkaConfig) {
       setDefaults();
       applyKafkaConfig(kafkaConfig, KAFKA_PUBLISHER_SUBSECTION, this);
     }
@@ -191,8 +164,8 @@
     private final Integer pollingInterval;
     private final Config cfg;
 
-    public KafkaSubscriber(Supplier<Config> kafkaCfg) {
-      this.cfg = kafkaCfg.get();
+    public KafkaSubscriber(Config kafkaCfg) {
+      this.cfg = kafkaCfg;
 
       this.pollingInterval =
           cfg.getInt(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
index 6d8c60e..2d266c2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
@@ -18,36 +18,38 @@
 import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KAFKA_SECTION;
 import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
 import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
+import static org.mockito.Mockito.when;
 
-import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.google.gerrit.server.config.PluginConfigFactory;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import org.eclipse.jgit.lib.Config;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
 public class KafkaConfigurationTest {
 
-  private Config globalPluginConfig;
-  private Configuration multiSiteConfig;
+  private Config kafkaConfig;
+  @Mock PluginConfigFactory configFactory;
 
   @Before
   public void setup() {
-    globalPluginConfig = new Config();
-    multiSiteConfig = new Configuration(globalPluginConfig, new Config());
+    kafkaConfig = new Config();
+    when(configFactory.getGlobalPluginConfig("multi-site")).thenReturn(kafkaConfig);
   }
 
   private KafkaConfiguration getConfiguration() {
-    return new KafkaConfiguration(multiSiteConfig);
+    return new KafkaConfiguration(configFactory, "multi-site");
   }
 
   @Test
   public void kafkaSubscriberPropertiesAreIgnoredWhenPrefixIsNotSet() {
     final String kafkaPropertyName = "fooBarBaz";
     final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setString(
+    kafkaConfig.setString(
         KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
 
     final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
@@ -59,7 +61,7 @@
   public void kafkaPublisherPropertiesAreIgnoredWhenPrefixIsNotSet() {
     final String kafkaPropertyName = "fooBarBaz";
     final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setString(
+    kafkaConfig.setString(
         KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
 
     final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
@@ -102,6 +104,6 @@
   }
 
   private void setKafkaTopicAlias(String topicKey, String topic) {
-    globalPluginConfig.setString(KAFKA_SECTION, null, topicKey, topic);
+    kafkaConfig.setString(KAFKA_SECTION, null, topicKey, topic);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 4dc915d..b13c7d9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -56,6 +56,7 @@
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -136,9 +137,7 @@
       this.multiSiteModule = new Module(multiSiteConfig, new TestBrokerModule());
       this.pluginModule =
           new PluginModule(
-              multiSiteConfig,
-              new ZkValidationModule(zookeeperConfig),
-              new KafkaBrokerModule());
+              multiSiteConfig, new ZkValidationModule(zookeeperConfig), new KafkaBrokerModule());
       this.gitModule = new GitModule(multiSiteConfig);
     }
 
@@ -162,10 +161,14 @@
       KafkaContainer kafkaContainer = new KafkaContainer();
       kafkaContainer.start();
 
-      config.setString("kafka", null, "bootstrapServers", kafkaContainer.getBootstrapServers());
-      config.save();
-      Configuration multiSiteConfig = new Configuration(config, new Config());
-      bind(Configuration.class).toInstance(multiSiteConfig);
+      Config kafkaConfig = new Config();
+      kafkaConfig.setString(
+          "kafka", null, "bootstrapServers", kafkaContainer.getBootstrapServers());
+
+      PluginConfigFactory configFactory = mock(PluginConfigFactory.class);
+      when(configFactory.getGlobalPluginConfig("multi-site")).thenReturn(kafkaConfig);
+      KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(configFactory, "multi-site");
+      bind(KafkaConfiguration.class).toInstance(kafkaConfiguration);
 
       listener().toInstance(new KafkaStopAtShutdown(kafkaContainer));