Introduce InitStep

Help the user to set configurable parameters for the events-kafka
plugin. This gives visibility on what is actually configurable and
makes evident what defaults are.

Bug: Issue 14942
Change-Id: I58815a6b02524fc07bb57808981cda9bf261383e
diff --git a/BUILD b/BUILD
index c0eab0c..1604d58 100644
--- a/BUILD
+++ b/BUILD
@@ -11,6 +11,7 @@
     srcs = glob(["src/main/java/**/*.java"]),
     manifest_entries = [
         "Gerrit-PluginName: events-kafka",
+        "Gerrit-InitStep: com.googlesource.gerrit.plugins.kafka.InitConfig",
         "Gerrit-Module: com.googlesource.gerrit.plugins.kafka.Module",
         "Implementation-Title: Gerrit Apache Kafka plugin",
         "Implementation-URL: https://gerrit.googlesource.com/plugins/events-kafka",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/InitConfig.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/InitConfig.java
new file mode 100644
index 0000000..0a99fcb
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/InitConfig.java
@@ -0,0 +1,90 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.kafka;
+
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_ASYNC_DEFAULT;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_ASYNC_FIELD;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_STREAM_EVENTS_DEFAULT;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_STREAM_EVENTS_FIELD;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.STREAM_EVENTS_TOPIC_DEFAULT;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.STREAM_EVENTS_TOPIC_FIELD;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties.DEFAULT_NUMBER_OF_SUBSCRIBERS;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties.DEFAULT_POLLING_INTERVAL_MS;
+
+import com.google.common.base.Strings;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.pgm.init.api.ConsoleUI;
+import com.google.gerrit.pgm.init.api.InitStep;
+import com.google.gerrit.pgm.init.api.Section;
+import com.google.gerrit.server.config.GerritInstanceIdProvider;
+import com.google.inject.Inject;
+
+public class InitConfig implements InitStep {
+  private static final String GROUP_ID_FIELD = "groupId";
+  private static final String POLLING_INTERVAL_FIELD = "pollingIntervalMs";
+  private static final String NUMBER_OF_SUBSCRIBERS_FIELD = "numberOfSubscribers";
+
+  private final Section pluginSection;
+  private final String pluginName;
+  private final ConsoleUI ui;
+  private final GerritInstanceIdProvider gerritInstanceIdProvider;
+
+  @Inject
+  InitConfig(
+      Section.Factory sections,
+      @PluginName String pluginName,
+      GerritInstanceIdProvider gerritInstanceIdProvider,
+      ConsoleUI ui) {
+    this.pluginName = pluginName;
+    this.ui = ui;
+    this.gerritInstanceIdProvider = gerritInstanceIdProvider;
+    this.pluginSection = sections.get("plugin", pluginName);
+  }
+
+  @Override
+  public void run() throws Exception {
+    ui.header(String.format("%s plugin", pluginName));
+
+    boolean sendStreamEvents = ui.yesno(SEND_STREAM_EVENTS_DEFAULT, "Should send stream events?");
+    pluginSection.set(SEND_STREAM_EVENTS_FIELD, Boolean.toString(sendStreamEvents));
+
+    if (sendStreamEvents) {
+      pluginSection.string(
+          "Stream events topic", STREAM_EVENTS_TOPIC_FIELD, STREAM_EVENTS_TOPIC_DEFAULT);
+    }
+
+    boolean sendAsync = ui.yesno(SEND_ASYNC_DEFAULT, "Should send messages asynchronously?");
+    pluginSection.set(SEND_ASYNC_FIELD, Boolean.toString(sendAsync));
+
+    pluginSection.string(
+        "Polling interval (ms)", POLLING_INTERVAL_FIELD, DEFAULT_POLLING_INTERVAL_MS);
+
+    pluginSection.string(
+        "Number of subscribers", NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS);
+
+    String consumerGroup =
+        pluginSection.string("Consumer group", GROUP_ID_FIELD, gerritInstanceIdProvider.get());
+    while (Strings.isNullOrEmpty(consumerGroup) && !ui.isBatch()) {
+      ui.message("'%s' is mandatory. Please specify a value.", GROUP_ID_FIELD);
+      consumerGroup =
+          pluginSection.string("Consumer group", GROUP_ID_FIELD, gerritInstanceIdProvider.get());
+    }
+
+    if (Strings.isNullOrEmpty(consumerGroup) && ui.isBatch()) {
+      System.err.printf(
+          "FATAL [%s plugin]: Could not set '%s' in batch mode. %s will not work%n",
+          pluginName, GROUP_ID_FIELD, pluginName);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index d7be825..0c58c1b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -30,6 +30,13 @@
 @Singleton
 public class KafkaProperties extends java.util.Properties {
   private static final long serialVersionUID = 0L;
+  public static final String SEND_STREAM_EVENTS_FIELD = "sendStreamEvents";
+  public static final String STREAM_EVENTS_TOPIC_FIELD = "topic";
+  public static final String SEND_ASYNC_FIELD = "sendAsync";
+
+  public static final Boolean SEND_STREAM_EVENTS_DEFAULT = false;
+  public static final String STREAM_EVENTS_TOPIC_DEFAULT = "gerrit";
+  public static final Boolean SEND_ASYNC_DEFAULT = true;
 
   public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
 
@@ -42,9 +49,10 @@
     super();
     setDefaults();
     PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
-    topic = fromGerritConfig.getString("topic", "gerrit");
-    sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
-    sendStreamEvents = fromGerritConfig.getBoolean("sendStreamEvents", false);
+    topic = fromGerritConfig.getString(STREAM_EVENTS_TOPIC_FIELD, STREAM_EVENTS_TOPIC_DEFAULT);
+    sendAsync = fromGerritConfig.getBoolean(SEND_ASYNC_FIELD, SEND_ASYNC_DEFAULT);
+    sendStreamEvents =
+        fromGerritConfig.getBoolean(SEND_STREAM_EVENTS_FIELD, SEND_STREAM_EVENTS_DEFAULT);
     applyConfig(fromGerritConfig);
     initDockerizedKafkaServer();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
index 52d4726..94e8e62 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
@@ -23,8 +23,8 @@
 @Singleton
 public class KafkaSubscriberProperties extends KafkaProperties {
   private static final long serialVersionUID = 1L;
-  private static final String DEFAULT_POLLING_INTERVAL_MS = "1000";
-  private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
+  public static final String DEFAULT_POLLING_INTERVAL_MS = "1000";
+  public static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
 
   private final Integer pollingInterval;
   private final String groupId;
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 5008888..a10004f 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -65,3 +65,20 @@
 `plugin.@PLUGIN@.sendStreamEvents`
 :   Whether to send stream events to the `topic` topic.
     Default: false
+
+Gerrit init integration
+-----------------------
+
+The @PLUGIN@ plugin provides an init step that helps to set up the configuration.
+
+```shell
+*** events-kafka plugin
+***
+
+Should send stream events?     [y/N]? y
+Stream events topic            [gerrit]: gerrit_stream_events
+Should send messages asynchronously? [Y/n]? y
+Polling interval (ms)          [1000]: 3000
+Number of subscribers          [6]: 6
+Consumer group                 [my_group_id]: my_group_id
+```