Decouple Kafka configuration class
Decouple Kafka configuration from the main Configuration logic.
KafkaConfiguration reads it's own configuration properties from
multi-site.config.
Feature: Issue 10825
Change-Id: Ib39e538b8b7374a7d4c8613cf112123518a772f0
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 3764013..3226a42 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -18,7 +18,6 @@
import static com.google.common.base.Suppliers.ofInstance;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.CaseFormat;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
@@ -62,25 +61,18 @@
static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
static final int DEFAULT_INDEX_MAX_TRIES = 2;
static final int DEFAULT_INDEX_RETRY_INTERVAL = 30000;
- private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
static final int DEFAULT_THREAD_POOL_SIZE = 4;
static final String NUM_STRIPED_LOCKS = "numStripedLocks";
static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
static final String ENABLE_KEY = "enabled";
- static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
- static final boolean DEFAULT_ENABLE_PROCESSING = true;
- static final String KAFKA_SECTION = "kafka";
- public static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
- private final Supplier<KafkaPublisher> publisher;
private final Supplier<Cache> cache;
private final Supplier<Event> event;
private final Supplier<Index> index;
- private final Supplier<KafkaSubscriber> subscriber;
- private final Supplier<Kafka> kafka;
private final Supplier<SharedRefDatabase> sharedRefDb;
private final Supplier<Collection<Message>> replicationConfigValidation;
private final Config multiSiteConfig;
+ private final KafkaConfiguration kafkaConfig;
@Inject
Configuration(SitePaths sitePaths) {
@@ -91,10 +83,8 @@
public Configuration(Config multiSiteConfig, Config replicationConfig) {
Supplier<Config> lazyMultiSiteCfg = lazyLoad(multiSiteConfig);
this.multiSiteConfig = multiSiteConfig;
+ this.kafkaConfig = new KafkaConfiguration(multiSiteConfig);
replicationConfigValidation = lazyValidateReplicatioConfig(replicationConfig);
- kafka = memoize(() -> new Kafka(lazyMultiSiteCfg));
- publisher = memoize(() -> new KafkaPublisher(lazyMultiSiteCfg));
- subscriber = memoize(() -> new KafkaSubscriber(lazyMultiSiteCfg));
cache = memoize(() -> new Cache(lazyMultiSiteCfg));
event = memoize(() -> new Event(lazyMultiSiteCfg));
index = memoize(() -> new Index(lazyMultiSiteCfg));
@@ -110,11 +100,11 @@
}
public Kafka getKafka() {
- return kafka.get();
+ return kafkaConfig.getKafka();
}
public KafkaPublisher kafkaPublisher() {
- return publisher.get();
+ return kafkaConfig.kafkaPublisher();
}
public Cache cache() {
@@ -130,7 +120,7 @@
}
public KafkaSubscriber kafkaSubscriber() {
- return subscriber.get();
+ return kafkaConfig.kafkaSubscriber();
}
public Collection<Message> validate() {
@@ -192,59 +182,6 @@
}
}
- private static String getString(
- Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
- String value = cfg.get().getString(section, subsection, name);
- if (!Strings.isNullOrEmpty(value)) {
- return value;
- }
- return defaultValue;
- }
-
- private static Map<EventFamily, Boolean> eventsEnabled(
- Supplier<Config> config, String subsection) {
- Map<EventFamily, Boolean> eventsEnabled = new HashMap<>();
- for (EventFamily eventFamily : EventFamily.values()) {
- String enabledConfigKey = eventFamily.lowerCamelName() + "Enabled";
-
- eventsEnabled.put(
- eventFamily,
- config
- .get()
- .getBoolean(KAFKA_SECTION, subsection, enabledConfigKey, DEFAULT_ENABLE_PROCESSING));
- }
- return eventsEnabled;
- }
-
- private static void applyKafkaConfig(
- Supplier<Config> configSupplier, String subsectionName, Properties target) {
- Config config = configSupplier.get();
- for (String section : config.getSubsections(KAFKA_SECTION)) {
- if (section.equals(subsectionName)) {
- for (String name : config.getNames(KAFKA_SECTION, section, true)) {
- if (name.startsWith(KAFKA_PROPERTY_PREFIX)) {
- Object value = config.getString(KAFKA_SECTION, subsectionName, name);
- String configProperty = name.replaceFirst(KAFKA_PROPERTY_PREFIX, "");
- String propName =
- CaseFormat.LOWER_CAMEL
- .to(CaseFormat.LOWER_HYPHEN, configProperty)
- .replaceAll("-", ".");
- log.info("[{}] Setting kafka property: {} = {}", subsectionName, propName, value);
- target.put(propName, value);
- }
- }
- }
- }
- target.put(
- "bootstrap.servers",
- getString(
- configSupplier,
- KAFKA_SECTION,
- null,
- "bootstrapServers",
- DEFAULT_KAFKA_BOOTSTRAP_SERVERS));
- }
-
public static class Kafka {
private final Map<EventFamily, String> eventTopics;
private final String bootstrapServers;
@@ -263,14 +200,23 @@
Kafka(Supplier<Config> config) {
this.bootstrapServers =
getString(
- config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
+ config,
+ KafkaConfiguration.KAFKA_SECTION,
+ null,
+ "bootstrapServers",
+ KafkaConfiguration.DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
this.eventTopics = new HashMap<>();
for (Map.Entry<EventFamily, String> topicDefault : EVENT_TOPICS.entrySet()) {
String topicConfigKey = topicDefault.getKey().lowerCamelName() + "Topic";
eventTopics.put(
topicDefault.getKey(),
- getString(config, KAFKA_SECTION, null, topicConfigKey, topicDefault.getValue()));
+ getString(
+ config,
+ KafkaConfiguration.KAFKA_SECTION,
+ null,
+ topicConfigKey,
+ topicDefault.getValue()));
}
}
@@ -281,6 +227,15 @@
public String getBootstrapServers() {
return bootstrapServers;
}
+
+ private static String getString(
+ Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
+ String value = cfg.get().getString(section, subsection, name);
+ if (!Strings.isNullOrEmpty(value)) {
+ return value;
+ }
+ return defaultValue;
+ }
}
public static class KafkaPublisher extends Properties {
@@ -294,17 +249,20 @@
private final boolean enabled;
private final Map<EventFamily, Boolean> eventsEnabled;
- private KafkaPublisher(Supplier<Config> cfg) {
+ KafkaPublisher(Supplier<Config> cfg) {
enabled =
cfg.get()
.getBoolean(
- KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, DEFAULT_BROKER_ENABLED);
+ KafkaConfiguration.KAFKA_SECTION,
+ KAFKA_PUBLISHER_SUBSECTION,
+ KafkaConfiguration.ENABLE_KEY,
+ DEFAULT_BROKER_ENABLED);
- eventsEnabled = eventsEnabled(cfg, KAFKA_PUBLISHER_SUBSECTION);
+ eventsEnabled = KafkaConfiguration.eventsEnabled(cfg, KAFKA_PUBLISHER_SUBSECTION);
if (enabled) {
setDefaults();
- applyKafkaConfig(cfg, KAFKA_PUBLISHER_SUBSECTION, this);
+ KafkaConfiguration.applyKafkaConfig(cfg, KAFKA_PUBLISHER_SUBSECTION, this);
}
}
@@ -343,17 +301,22 @@
this.pollingInterval =
cfg.getInt(
- KAFKA_SECTION,
+ KafkaConfiguration.KAFKA_SECTION,
KAFKA_SUBSCRIBER_SUBSECTION,
"pollingIntervalMs",
- DEFAULT_POLLING_INTERVAL_MS);
+ KafkaConfiguration.DEFAULT_POLLING_INTERVAL_MS);
- enabled = cfg.getBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
+ enabled =
+ cfg.getBoolean(
+ KafkaConfiguration.KAFKA_SECTION,
+ KAFKA_SUBSCRIBER_SUBSECTION,
+ KafkaConfiguration.ENABLE_KEY,
+ false);
- eventsEnabled = eventsEnabled(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION);
+ eventsEnabled = KafkaConfiguration.eventsEnabled(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION);
if (enabled) {
- applyKafkaConfig(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION, this);
+ KafkaConfiguration.applyKafkaConfig(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION, this);
}
}
@@ -368,7 +331,11 @@
public Properties initPropsWith(UUID instanceId) {
String groupId =
getString(
- cfg, KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, "groupId", instanceId.toString());
+ cfg,
+ KafkaConfiguration.KAFKA_SECTION,
+ KAFKA_SUBSCRIBER_SUBSECTION,
+ "groupId",
+ instanceId.toString());
this.put("group.id", groupId);
return this;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
new file mode 100644
index 0000000..3376b49
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
@@ -0,0 +1,145 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import static com.google.common.base.Suppliers.memoize;
+import static com.google.common.base.Suppliers.ofInstance;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.CaseFormat;
+import com.google.common.base.Strings;
+import com.google.common.base.Supplier;
+import com.googlesource.gerrit.plugins.multisite.Configuration.Kafka;
+import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher;
+import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaSubscriber;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaConfiguration {
+ private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
+
+ static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
+ static final String KAFKA_SECTION = "kafka";
+ static final String ENABLE_KEY = "enabled";
+ static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
+ static final boolean DEFAULT_ENABLE_PROCESSING = true;
+ static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
+
+ private final Supplier<KafkaSubscriber> subscriber;
+ private final Supplier<Kafka> kafka;
+ private final Supplier<KafkaPublisher> publisher;
+
+ @VisibleForTesting
+ public KafkaConfiguration(Config kafkaConfig) {
+ Supplier<Config> lazyCfg = lazyLoad(kafkaConfig);
+ kafka = memoize(() -> new Kafka(lazyCfg));
+ publisher = memoize(() -> new KafkaPublisher(lazyCfg));
+ subscriber = memoize(() -> new KafkaSubscriber(lazyCfg));
+ }
+
+ public Kafka getKafka() {
+ return kafka.get();
+ }
+
+ public KafkaSubscriber kafkaSubscriber() {
+ return subscriber.get();
+ }
+
+ static void applyKafkaConfig(
+ Supplier<Config> configSupplier, String subsectionName, Properties target) {
+ Config config = configSupplier.get();
+ for (String section : config.getSubsections(KAFKA_SECTION)) {
+ if (section.equals(subsectionName)) {
+ for (String name : config.getNames(KAFKA_SECTION, section, true)) {
+ if (name.startsWith(KAFKA_PROPERTY_PREFIX)) {
+ Object value = config.getString(KAFKA_SECTION, subsectionName, name);
+ String configProperty = name.replaceFirst(KAFKA_PROPERTY_PREFIX, "");
+ String propName =
+ CaseFormat.LOWER_CAMEL
+ .to(CaseFormat.LOWER_HYPHEN, configProperty)
+ .replaceAll("-", ".");
+ log.info("[{}] Setting kafka property: {} = {}", subsectionName, propName, value);
+ target.put(propName, value);
+ }
+ }
+ }
+ }
+ target.put(
+ "bootstrap.servers",
+ getString(
+ configSupplier,
+ KAFKA_SECTION,
+ null,
+ "bootstrapServers",
+ DEFAULT_KAFKA_BOOTSTRAP_SERVERS));
+ }
+
+ private static String getString(
+ Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
+ String value = cfg.get().getString(section, subsection, name);
+ if (!Strings.isNullOrEmpty(value)) {
+ return value;
+ }
+ return defaultValue;
+ }
+
+ static Map<EventFamily, Boolean> eventsEnabled(Supplier<Config> config, String subsection) {
+ Map<EventFamily, Boolean> eventsEnabled = new HashMap<>();
+ for (EventFamily eventFamily : EventFamily.values()) {
+ String enabledConfigKey = eventFamily.lowerCamelName() + "Enabled";
+
+ eventsEnabled.put(
+ eventFamily,
+ config
+ .get()
+ .getBoolean(
+ KafkaConfiguration.KAFKA_SECTION,
+ subsection,
+ enabledConfigKey,
+ KafkaConfiguration.DEFAULT_ENABLE_PROCESSING));
+ }
+ return eventsEnabled;
+ }
+
+ public KafkaPublisher kafkaPublisher() {
+ return publisher.get();
+ }
+
+ private Supplier<Config> lazyLoad(Config config) {
+ if (config instanceof FileBasedConfig) {
+ return memoize(
+ () -> {
+ FileBasedConfig fileConfig = (FileBasedConfig) config;
+ String fileConfigFileName = fileConfig.getFile().getPath();
+ try {
+ log.info("Loading configuration from {}", fileConfigFileName);
+ fileConfig.load();
+ } catch (IOException | ConfigInvalidException e) {
+ log.error("Unable to load configuration from " + fileConfigFileName, e);
+ }
+ return fileConfig;
+ });
+ }
+ return ofInstance(config);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
index 4ceead9..ec3a431 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
@@ -23,11 +23,11 @@
import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.DEFAULT_SYNCHRONIZE;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.SYNCHRONIZE_KEY;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.INDEX_SECTION;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.KAFKA_PROPERTY_PREFIX;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.KAFKA_SECTION;
import static com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
import static com.googlesource.gerrit.plugins.multisite.Configuration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
import static com.googlesource.gerrit.plugins.multisite.Configuration.THREAD_POOL_SIZE_KEY;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_PROPERTY_PREFIX;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_SECTION;
import com.google.common.collect.ImmutableList;
import org.eclipse.jgit.lib.Config;