Introduce InitStep
Help the user to set configurable parameters for the events-kafka
plugin. This gives visibility on what is actually configurable and
makes evident what defaults are.
Bug: Issue 14942
Change-Id: I58815a6b02524fc07bb57808981cda9bf261383e
diff --git a/BUILD b/BUILD
index c0eab0c..1604d58 100644
--- a/BUILD
+++ b/BUILD
@@ -11,6 +11,7 @@
srcs = glob(["src/main/java/**/*.java"]),
manifest_entries = [
"Gerrit-PluginName: events-kafka",
+ "Gerrit-InitStep: com.googlesource.gerrit.plugins.kafka.InitConfig",
"Gerrit-Module: com.googlesource.gerrit.plugins.kafka.Module",
"Implementation-Title: Gerrit Apache Kafka plugin",
"Implementation-URL: https://gerrit.googlesource.com/plugins/events-kafka",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/InitConfig.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/InitConfig.java
new file mode 100644
index 0000000..0a99fcb
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/InitConfig.java
@@ -0,0 +1,90 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.kafka;
+
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_ASYNC_DEFAULT;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_ASYNC_FIELD;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_STREAM_EVENTS_DEFAULT;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_STREAM_EVENTS_FIELD;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.STREAM_EVENTS_TOPIC_DEFAULT;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.STREAM_EVENTS_TOPIC_FIELD;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties.DEFAULT_NUMBER_OF_SUBSCRIBERS;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties.DEFAULT_POLLING_INTERVAL_MS;
+
+import com.google.common.base.Strings;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.pgm.init.api.ConsoleUI;
+import com.google.gerrit.pgm.init.api.InitStep;
+import com.google.gerrit.pgm.init.api.Section;
+import com.google.gerrit.server.config.GerritInstanceIdProvider;
+import com.google.inject.Inject;
+
+public class InitConfig implements InitStep {
+ private static final String GROUP_ID_FIELD = "groupId";
+ private static final String POLLING_INTERVAL_FIELD = "pollingIntervalMs";
+ private static final String NUMBER_OF_SUBSCRIBERS_FIELD = "numberOfSubscribers";
+
+ private final Section pluginSection;
+ private final String pluginName;
+ private final ConsoleUI ui;
+ private final GerritInstanceIdProvider gerritInstanceIdProvider;
+
+ @Inject
+ InitConfig(
+ Section.Factory sections,
+ @PluginName String pluginName,
+ GerritInstanceIdProvider gerritInstanceIdProvider,
+ ConsoleUI ui) {
+ this.pluginName = pluginName;
+ this.ui = ui;
+ this.gerritInstanceIdProvider = gerritInstanceIdProvider;
+ this.pluginSection = sections.get("plugin", pluginName);
+ }
+
+ @Override
+ public void run() throws Exception {
+ ui.header(String.format("%s plugin", pluginName));
+
+ boolean sendStreamEvents = ui.yesno(SEND_STREAM_EVENTS_DEFAULT, "Should send stream events?");
+ pluginSection.set(SEND_STREAM_EVENTS_FIELD, Boolean.toString(sendStreamEvents));
+
+ if (sendStreamEvents) {
+ pluginSection.string(
+ "Stream events topic", STREAM_EVENTS_TOPIC_FIELD, STREAM_EVENTS_TOPIC_DEFAULT);
+ }
+
+ boolean sendAsync = ui.yesno(SEND_ASYNC_DEFAULT, "Should send messages asynchronously?");
+ pluginSection.set(SEND_ASYNC_FIELD, Boolean.toString(sendAsync));
+
+ pluginSection.string(
+ "Polling interval (ms)", POLLING_INTERVAL_FIELD, DEFAULT_POLLING_INTERVAL_MS);
+
+ pluginSection.string(
+ "Number of subscribers", NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS);
+
+ String consumerGroup =
+ pluginSection.string("Consumer group", GROUP_ID_FIELD, gerritInstanceIdProvider.get());
+ while (Strings.isNullOrEmpty(consumerGroup) && !ui.isBatch()) {
+ ui.message("'%s' is mandatory. Please specify a value.", GROUP_ID_FIELD);
+ consumerGroup =
+ pluginSection.string("Consumer group", GROUP_ID_FIELD, gerritInstanceIdProvider.get());
+ }
+
+ if (Strings.isNullOrEmpty(consumerGroup) && ui.isBatch()) {
+ System.err.printf(
+ "FATAL [%s plugin]: Could not set '%s' in batch mode. %s will not work%n",
+ pluginName, GROUP_ID_FIELD, pluginName);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index d7be825..0c58c1b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -30,6 +30,13 @@
@Singleton
public class KafkaProperties extends java.util.Properties {
private static final long serialVersionUID = 0L;
+ public static final String SEND_STREAM_EVENTS_FIELD = "sendStreamEvents";
+ public static final String STREAM_EVENTS_TOPIC_FIELD = "topic";
+ public static final String SEND_ASYNC_FIELD = "sendAsync";
+
+ public static final Boolean SEND_STREAM_EVENTS_DEFAULT = false;
+ public static final String STREAM_EVENTS_TOPIC_DEFAULT = "gerrit";
+ public static final Boolean SEND_ASYNC_DEFAULT = true;
public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
@@ -42,9 +49,10 @@
super();
setDefaults();
PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
- topic = fromGerritConfig.getString("topic", "gerrit");
- sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
- sendStreamEvents = fromGerritConfig.getBoolean("sendStreamEvents", false);
+ topic = fromGerritConfig.getString(STREAM_EVENTS_TOPIC_FIELD, STREAM_EVENTS_TOPIC_DEFAULT);
+ sendAsync = fromGerritConfig.getBoolean(SEND_ASYNC_FIELD, SEND_ASYNC_DEFAULT);
+ sendStreamEvents =
+ fromGerritConfig.getBoolean(SEND_STREAM_EVENTS_FIELD, SEND_STREAM_EVENTS_DEFAULT);
applyConfig(fromGerritConfig);
initDockerizedKafkaServer();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
index 52d4726..94e8e62 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
@@ -23,8 +23,8 @@
@Singleton
public class KafkaSubscriberProperties extends KafkaProperties {
private static final long serialVersionUID = 1L;
- private static final String DEFAULT_POLLING_INTERVAL_MS = "1000";
- private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
+ public static final String DEFAULT_POLLING_INTERVAL_MS = "1000";
+ public static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
private final Integer pollingInterval;
private final String groupId;
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 5008888..a10004f 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -65,3 +65,20 @@
`plugin.@PLUGIN@.sendStreamEvents`
: Whether to send stream events to the `topic` topic.
Default: false
+
+Gerrit init integration
+-----------------------
+
+The @PLUGIN@ plugin provides an init step that helps to set up the configuration.
+
+```shell
+*** events-kafka plugin
+***
+
+Should send stream events? [y/N]? y
+Stream events topic [gerrit]: gerrit_stream_events
+Should send messages asynchronously? [Y/n]? y
+Polling interval (ms) [1000]: 3000
+Number of subscribers [6]: 6
+Consumer group [my_group_id]: my_group_id
+```