Kafka module should be responsible for reading its own configuration
Allow kafka module to read its own configuration from multi-site.config
file. This separation allows to move kafka related code to
a separate plugin plus it will make easy to introduce new config file for
kafka plugin.
Feature: Issue 10825
Change-Id: I5988a8678cfce78d89bbda48fac5c05950ea6377
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
index b6c01f3..0857334 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
@@ -15,24 +15,21 @@
package com.googlesource.gerrit.plugins.multisite.kafka;
import static com.google.common.base.Suppliers.memoize;
-import static com.google.common.base.Suppliers.ofInstance;
import com.google.common.base.CaseFormat;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.PluginConfigFactory;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Config;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,11 +47,11 @@
private final Supplier<KafkaPublisher> publisher;
@Inject
- public KafkaConfiguration(Configuration configuration) {
- Supplier<Config> lazyCfg = lazyLoad(configuration.getMultiSiteConfig());
- kafka = memoize(() -> new Kafka(lazyCfg));
- publisher = memoize(() -> new KafkaPublisher(lazyCfg));
- subscriber = memoize(() -> new KafkaSubscriber(lazyCfg));
+ public KafkaConfiguration(PluginConfigFactory configFactory, @PluginName String pluginName) {
+ Config cfg = configFactory.getGlobalPluginConfig(pluginName);
+ kafka = memoize(() -> new Kafka(cfg));
+ publisher = memoize(() -> new KafkaPublisher(cfg));
+ subscriber = memoize(() -> new KafkaSubscriber(cfg));
}
public Kafka getKafka() {
@@ -65,9 +62,7 @@
return subscriber.get();
}
- private static void applyKafkaConfig(
- Supplier<Config> configSupplier, String subsectionName, Properties target) {
- Config config = configSupplier.get();
+ private static void applyKafkaConfig(Config config, String subsectionName, Properties target) {
for (String section : config.getSubsections(KAFKA_SECTION)) {
if (section.equals(subsectionName)) {
for (String name : config.getNames(KAFKA_SECTION, section, true)) {
@@ -87,16 +82,12 @@
target.put(
"bootstrap.servers",
getString(
- configSupplier,
- KAFKA_SECTION,
- null,
- "bootstrapServers",
- DEFAULT_KAFKA_BOOTSTRAP_SERVERS));
+ config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS));
}
private static String getString(
- Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
- String value = cfg.get().getString(section, subsection, name);
+ Config cfg, String section, String subsection, String name, String defaultValue) {
+ String value = cfg.getString(section, subsection, name);
if (!Strings.isNullOrEmpty(value)) {
return value;
}
@@ -107,29 +98,11 @@
return publisher.get();
}
- private Supplier<Config> lazyLoad(Config config) {
- if (config instanceof FileBasedConfig) {
- return memoize(
- () -> {
- FileBasedConfig fileConfig = (FileBasedConfig) config;
- String fileConfigFileName = fileConfig.getFile().getPath();
- try {
- log.info("Loading configuration from {}", fileConfigFileName);
- fileConfig.load();
- } catch (IOException | ConfigInvalidException e) {
- log.error("Unable to load configuration from " + fileConfigFileName, e);
- }
- return fileConfig;
- });
- }
- return ofInstance(config);
- }
-
public static class Kafka {
private final Map<EventTopic, String> eventTopics;
private final String bootstrapServers;
- Kafka(Supplier<Config> config) {
+ Kafka(Config config) {
this.bootstrapServers =
getString(
config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
@@ -151,8 +124,8 @@
}
private static String getString(
- Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
- String value = cfg.get().getString(section, subsection, name);
+ Config cfg, String section, String subsection, String name, String defaultValue) {
+ String value = cfg.getString(section, subsection, name);
if (!Strings.isNullOrEmpty(value)) {
return value;
}
@@ -166,7 +139,7 @@
public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
public static final String KAFKA_PUBLISHER_SUBSECTION = "publisher";
- private KafkaPublisher(Supplier<Config> kafkaConfig) {
+ private KafkaPublisher(Config kafkaConfig) {
setDefaults();
applyKafkaConfig(kafkaConfig, KAFKA_PUBLISHER_SUBSECTION, this);
}
@@ -191,8 +164,8 @@
private final Integer pollingInterval;
private final Config cfg;
- public KafkaSubscriber(Supplier<Config> kafkaCfg) {
- this.cfg = kafkaCfg.get();
+ public KafkaSubscriber(Config kafkaCfg) {
+ this.cfg = kafkaCfg;
this.pollingInterval =
cfg.getInt(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
index 6d8c60e..2d266c2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
@@ -18,36 +18,38 @@
import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KAFKA_SECTION;
import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
+import static org.mockito.Mockito.when;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.google.gerrit.server.config.PluginConfigFactory;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import org.eclipse.jgit.lib.Config;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class KafkaConfigurationTest {
- private Config globalPluginConfig;
- private Configuration multiSiteConfig;
+ private Config kafkaConfig;
+ @Mock PluginConfigFactory configFactory;
@Before
public void setup() {
- globalPluginConfig = new Config();
- multiSiteConfig = new Configuration(globalPluginConfig, new Config());
+ kafkaConfig = new Config();
+ when(configFactory.getGlobalPluginConfig("multi-site")).thenReturn(kafkaConfig);
}
private KafkaConfiguration getConfiguration() {
- return new KafkaConfiguration(multiSiteConfig);
+ return new KafkaConfiguration(configFactory, "multi-site");
}
@Test
public void kafkaSubscriberPropertiesAreIgnoredWhenPrefixIsNotSet() {
final String kafkaPropertyName = "fooBarBaz";
final String kafkaPropertyValue = "aValue";
- globalPluginConfig.setString(
+ kafkaConfig.setString(
KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
@@ -59,7 +61,7 @@
public void kafkaPublisherPropertiesAreIgnoredWhenPrefixIsNotSet() {
final String kafkaPropertyName = "fooBarBaz";
final String kafkaPropertyValue = "aValue";
- globalPluginConfig.setString(
+ kafkaConfig.setString(
KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
@@ -102,6 +104,6 @@
}
private void setKafkaTopicAlias(String topicKey, String topic) {
- globalPluginConfig.setString(KAFKA_SECTION, null, topicKey, topic);
+ kafkaConfig.setString(KAFKA_SECTION, null, topicKey, topic);
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 4dc915d..b13c7d9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -56,6 +56,7 @@
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
import java.io.IOException;
import java.util.ArrayList;
@@ -136,9 +137,7 @@
this.multiSiteModule = new Module(multiSiteConfig, new TestBrokerModule());
this.pluginModule =
new PluginModule(
- multiSiteConfig,
- new ZkValidationModule(zookeeperConfig),
- new KafkaBrokerModule());
+ multiSiteConfig, new ZkValidationModule(zookeeperConfig), new KafkaBrokerModule());
this.gitModule = new GitModule(multiSiteConfig);
}
@@ -162,10 +161,14 @@
KafkaContainer kafkaContainer = new KafkaContainer();
kafkaContainer.start();
- config.setString("kafka", null, "bootstrapServers", kafkaContainer.getBootstrapServers());
- config.save();
- Configuration multiSiteConfig = new Configuration(config, new Config());
- bind(Configuration.class).toInstance(multiSiteConfig);
+ Config kafkaConfig = new Config();
+ kafkaConfig.setString(
+ "kafka", null, "bootstrapServers", kafkaContainer.getBootstrapServers());
+
+ PluginConfigFactory configFactory = mock(PluginConfigFactory.class);
+ when(configFactory.getGlobalPluginConfig("multi-site")).thenReturn(kafkaConfig);
+ KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(configFactory, "multi-site");
+ bind(KafkaConfiguration.class).toInstance(kafkaConfiguration);
listener().toInstance(new KafkaStopAtShutdown(kafkaContainer));