Decouple Kafka configuration class

Decouple Kafka configuration from the main Configuration logic.
KafkaConfiguration reads it's own configuration properties from
multi-site.config.

Feature: Issue 10825
Change-Id: Ib39e538b8b7374a7d4c8613cf112123518a772f0
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 3764013..3226a42 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -18,7 +18,6 @@
 import static com.google.common.base.Suppliers.ofInstance;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.CaseFormat;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
@@ -62,25 +61,18 @@
   static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
   static final int DEFAULT_INDEX_MAX_TRIES = 2;
   static final int DEFAULT_INDEX_RETRY_INTERVAL = 30000;
-  private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
   static final int DEFAULT_THREAD_POOL_SIZE = 4;
   static final String NUM_STRIPED_LOCKS = "numStripedLocks";
   static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
   static final String ENABLE_KEY = "enabled";
-  static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
-  static final boolean DEFAULT_ENABLE_PROCESSING = true;
-  static final String KAFKA_SECTION = "kafka";
-  public static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
 
-  private final Supplier<KafkaPublisher> publisher;
   private final Supplier<Cache> cache;
   private final Supplier<Event> event;
   private final Supplier<Index> index;
-  private final Supplier<KafkaSubscriber> subscriber;
-  private final Supplier<Kafka> kafka;
   private final Supplier<SharedRefDatabase> sharedRefDb;
   private final Supplier<Collection<Message>> replicationConfigValidation;
   private final Config multiSiteConfig;
+  private final KafkaConfiguration kafkaConfig;
 
   @Inject
   Configuration(SitePaths sitePaths) {
@@ -91,10 +83,8 @@
   public Configuration(Config multiSiteConfig, Config replicationConfig) {
     Supplier<Config> lazyMultiSiteCfg = lazyLoad(multiSiteConfig);
     this.multiSiteConfig = multiSiteConfig;
+    this.kafkaConfig = new KafkaConfiguration(multiSiteConfig);
     replicationConfigValidation = lazyValidateReplicatioConfig(replicationConfig);
-    kafka = memoize(() -> new Kafka(lazyMultiSiteCfg));
-    publisher = memoize(() -> new KafkaPublisher(lazyMultiSiteCfg));
-    subscriber = memoize(() -> new KafkaSubscriber(lazyMultiSiteCfg));
     cache = memoize(() -> new Cache(lazyMultiSiteCfg));
     event = memoize(() -> new Event(lazyMultiSiteCfg));
     index = memoize(() -> new Index(lazyMultiSiteCfg));
@@ -110,11 +100,11 @@
   }
 
   public Kafka getKafka() {
-    return kafka.get();
+    return kafkaConfig.getKafka();
   }
 
   public KafkaPublisher kafkaPublisher() {
-    return publisher.get();
+    return kafkaConfig.kafkaPublisher();
   }
 
   public Cache cache() {
@@ -130,7 +120,7 @@
   }
 
   public KafkaSubscriber kafkaSubscriber() {
-    return subscriber.get();
+    return kafkaConfig.kafkaSubscriber();
   }
 
   public Collection<Message> validate() {
@@ -192,59 +182,6 @@
     }
   }
 
-  private static String getString(
-      Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
-    String value = cfg.get().getString(section, subsection, name);
-    if (!Strings.isNullOrEmpty(value)) {
-      return value;
-    }
-    return defaultValue;
-  }
-
-  private static Map<EventFamily, Boolean> eventsEnabled(
-      Supplier<Config> config, String subsection) {
-    Map<EventFamily, Boolean> eventsEnabled = new HashMap<>();
-    for (EventFamily eventFamily : EventFamily.values()) {
-      String enabledConfigKey = eventFamily.lowerCamelName() + "Enabled";
-
-      eventsEnabled.put(
-          eventFamily,
-          config
-              .get()
-              .getBoolean(KAFKA_SECTION, subsection, enabledConfigKey, DEFAULT_ENABLE_PROCESSING));
-    }
-    return eventsEnabled;
-  }
-
-  private static void applyKafkaConfig(
-      Supplier<Config> configSupplier, String subsectionName, Properties target) {
-    Config config = configSupplier.get();
-    for (String section : config.getSubsections(KAFKA_SECTION)) {
-      if (section.equals(subsectionName)) {
-        for (String name : config.getNames(KAFKA_SECTION, section, true)) {
-          if (name.startsWith(KAFKA_PROPERTY_PREFIX)) {
-            Object value = config.getString(KAFKA_SECTION, subsectionName, name);
-            String configProperty = name.replaceFirst(KAFKA_PROPERTY_PREFIX, "");
-            String propName =
-                CaseFormat.LOWER_CAMEL
-                    .to(CaseFormat.LOWER_HYPHEN, configProperty)
-                    .replaceAll("-", ".");
-            log.info("[{}] Setting kafka property: {} = {}", subsectionName, propName, value);
-            target.put(propName, value);
-          }
-        }
-      }
-    }
-    target.put(
-        "bootstrap.servers",
-        getString(
-            configSupplier,
-            KAFKA_SECTION,
-            null,
-            "bootstrapServers",
-            DEFAULT_KAFKA_BOOTSTRAP_SERVERS));
-  }
-
   public static class Kafka {
     private final Map<EventFamily, String> eventTopics;
     private final String bootstrapServers;
@@ -263,14 +200,23 @@
     Kafka(Supplier<Config> config) {
       this.bootstrapServers =
           getString(
-              config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
+              config,
+              KafkaConfiguration.KAFKA_SECTION,
+              null,
+              "bootstrapServers",
+              KafkaConfiguration.DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
 
       this.eventTopics = new HashMap<>();
       for (Map.Entry<EventFamily, String> topicDefault : EVENT_TOPICS.entrySet()) {
         String topicConfigKey = topicDefault.getKey().lowerCamelName() + "Topic";
         eventTopics.put(
             topicDefault.getKey(),
-            getString(config, KAFKA_SECTION, null, topicConfigKey, topicDefault.getValue()));
+            getString(
+                config,
+                KafkaConfiguration.KAFKA_SECTION,
+                null,
+                topicConfigKey,
+                topicDefault.getValue()));
       }
     }
 
@@ -281,6 +227,15 @@
     public String getBootstrapServers() {
       return bootstrapServers;
     }
+
+    private static String getString(
+        Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
+      String value = cfg.get().getString(section, subsection, name);
+      if (!Strings.isNullOrEmpty(value)) {
+        return value;
+      }
+      return defaultValue;
+    }
   }
 
   public static class KafkaPublisher extends Properties {
@@ -294,17 +249,20 @@
     private final boolean enabled;
     private final Map<EventFamily, Boolean> eventsEnabled;
 
-    private KafkaPublisher(Supplier<Config> cfg) {
+    KafkaPublisher(Supplier<Config> cfg) {
       enabled =
           cfg.get()
               .getBoolean(
-                  KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, DEFAULT_BROKER_ENABLED);
+                  KafkaConfiguration.KAFKA_SECTION,
+                  KAFKA_PUBLISHER_SUBSECTION,
+                  KafkaConfiguration.ENABLE_KEY,
+                  DEFAULT_BROKER_ENABLED);
 
-      eventsEnabled = eventsEnabled(cfg, KAFKA_PUBLISHER_SUBSECTION);
+      eventsEnabled = KafkaConfiguration.eventsEnabled(cfg, KAFKA_PUBLISHER_SUBSECTION);
 
       if (enabled) {
         setDefaults();
-        applyKafkaConfig(cfg, KAFKA_PUBLISHER_SUBSECTION, this);
+        KafkaConfiguration.applyKafkaConfig(cfg, KAFKA_PUBLISHER_SUBSECTION, this);
       }
     }
 
@@ -343,17 +301,22 @@
 
       this.pollingInterval =
           cfg.getInt(
-              KAFKA_SECTION,
+              KafkaConfiguration.KAFKA_SECTION,
               KAFKA_SUBSCRIBER_SUBSECTION,
               "pollingIntervalMs",
-              DEFAULT_POLLING_INTERVAL_MS);
+              KafkaConfiguration.DEFAULT_POLLING_INTERVAL_MS);
 
-      enabled = cfg.getBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
+      enabled =
+          cfg.getBoolean(
+              KafkaConfiguration.KAFKA_SECTION,
+              KAFKA_SUBSCRIBER_SUBSECTION,
+              KafkaConfiguration.ENABLE_KEY,
+              false);
 
-      eventsEnabled = eventsEnabled(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION);
+      eventsEnabled = KafkaConfiguration.eventsEnabled(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION);
 
       if (enabled) {
-        applyKafkaConfig(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION, this);
+        KafkaConfiguration.applyKafkaConfig(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION, this);
       }
     }
 
@@ -368,7 +331,11 @@
     public Properties initPropsWith(UUID instanceId) {
       String groupId =
           getString(
-              cfg, KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, "groupId", instanceId.toString());
+              cfg,
+              KafkaConfiguration.KAFKA_SECTION,
+              KAFKA_SUBSCRIBER_SUBSECTION,
+              "groupId",
+              instanceId.toString());
       this.put("group.id", groupId);
 
       return this;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
new file mode 100644
index 0000000..3376b49
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
@@ -0,0 +1,145 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import static com.google.common.base.Suppliers.memoize;
+import static com.google.common.base.Suppliers.ofInstance;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.CaseFormat;
+import com.google.common.base.Strings;
+import com.google.common.base.Supplier;
+import com.googlesource.gerrit.plugins.multisite.Configuration.Kafka;
+import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher;
+import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaSubscriber;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaConfiguration {
+  private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
+
+  static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
+  static final String KAFKA_SECTION = "kafka";
+  static final String ENABLE_KEY = "enabled";
+  static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
+  static final boolean DEFAULT_ENABLE_PROCESSING = true;
+  static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
+
+  private final Supplier<KafkaSubscriber> subscriber;
+  private final Supplier<Kafka> kafka;
+  private final Supplier<KafkaPublisher> publisher;
+
+  @VisibleForTesting
+  public KafkaConfiguration(Config kafkaConfig) {
+    Supplier<Config> lazyCfg = lazyLoad(kafkaConfig);
+    kafka = memoize(() -> new Kafka(lazyCfg));
+    publisher = memoize(() -> new KafkaPublisher(lazyCfg));
+    subscriber = memoize(() -> new KafkaSubscriber(lazyCfg));
+  }
+
+  public Kafka getKafka() {
+    return kafka.get();
+  }
+
+  public KafkaSubscriber kafkaSubscriber() {
+    return subscriber.get();
+  }
+
+  static void applyKafkaConfig(
+      Supplier<Config> configSupplier, String subsectionName, Properties target) {
+    Config config = configSupplier.get();
+    for (String section : config.getSubsections(KAFKA_SECTION)) {
+      if (section.equals(subsectionName)) {
+        for (String name : config.getNames(KAFKA_SECTION, section, true)) {
+          if (name.startsWith(KAFKA_PROPERTY_PREFIX)) {
+            Object value = config.getString(KAFKA_SECTION, subsectionName, name);
+            String configProperty = name.replaceFirst(KAFKA_PROPERTY_PREFIX, "");
+            String propName =
+                CaseFormat.LOWER_CAMEL
+                    .to(CaseFormat.LOWER_HYPHEN, configProperty)
+                    .replaceAll("-", ".");
+            log.info("[{}] Setting kafka property: {} = {}", subsectionName, propName, value);
+            target.put(propName, value);
+          }
+        }
+      }
+    }
+    target.put(
+        "bootstrap.servers",
+        getString(
+            configSupplier,
+            KAFKA_SECTION,
+            null,
+            "bootstrapServers",
+            DEFAULT_KAFKA_BOOTSTRAP_SERVERS));
+  }
+
+  private static String getString(
+      Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
+    String value = cfg.get().getString(section, subsection, name);
+    if (!Strings.isNullOrEmpty(value)) {
+      return value;
+    }
+    return defaultValue;
+  }
+
+  static Map<EventFamily, Boolean> eventsEnabled(Supplier<Config> config, String subsection) {
+    Map<EventFamily, Boolean> eventsEnabled = new HashMap<>();
+    for (EventFamily eventFamily : EventFamily.values()) {
+      String enabledConfigKey = eventFamily.lowerCamelName() + "Enabled";
+
+      eventsEnabled.put(
+          eventFamily,
+          config
+              .get()
+              .getBoolean(
+                  KafkaConfiguration.KAFKA_SECTION,
+                  subsection,
+                  enabledConfigKey,
+                  KafkaConfiguration.DEFAULT_ENABLE_PROCESSING));
+    }
+    return eventsEnabled;
+  }
+
+  public KafkaPublisher kafkaPublisher() {
+    return publisher.get();
+  }
+
+  private Supplier<Config> lazyLoad(Config config) {
+    if (config instanceof FileBasedConfig) {
+      return memoize(
+          () -> {
+            FileBasedConfig fileConfig = (FileBasedConfig) config;
+            String fileConfigFileName = fileConfig.getFile().getPath();
+            try {
+              log.info("Loading configuration from {}", fileConfigFileName);
+              fileConfig.load();
+            } catch (IOException | ConfigInvalidException e) {
+              log.error("Unable to load configuration from " + fileConfigFileName, e);
+            }
+            return fileConfig;
+          });
+    }
+    return ofInstance(config);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
index 4ceead9..ec3a431 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
@@ -23,11 +23,11 @@
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.DEFAULT_SYNCHRONIZE;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.SYNCHRONIZE_KEY;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.INDEX_SECTION;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.KAFKA_PROPERTY_PREFIX;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.KAFKA_SECTION;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.THREAD_POOL_SIZE_KEY;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_PROPERTY_PREFIX;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_SECTION;
 
 import com.google.common.collect.ImmutableList;
 import org.eclipse.jgit.lib.Config;