Introduce InitStep Help the admin to set configurable parameters for the events-aws-kinesis plugin. This gives visibility on what is actually configurable and makes evident what defaults are. Bug: Issue 14953 Change-Id: I18bc370341c59c33ccd9d92d07c8cbc1f72f5d8d
diff --git a/BUILD b/BUILD index 8fdbdc3..3456a2e 100644 --- a/BUILD +++ b/BUILD
@@ -12,6 +12,7 @@ srcs = glob(["src/main/java/**/*.java"]), manifest_entries = [ "Gerrit-PluginName: events-aws-kinesis", + "Gerrit-InitStep: com.googlesource.gerrit.plugins.kinesis.InitConfig", "Gerrit-Module: com.googlesource.gerrit.plugins.kinesis.Module", "Implementation-Title: Gerrit events listener to send events to AWS Kinesis broker", "Implementation-URL: https://gerrit.googlesource.com/plugins/events-aws-kinesis",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java index a12cca3..2a15d2c 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java +++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
@@ -30,17 +30,33 @@ @Singleton class Configuration { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); - private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6"; - private static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit"; - private static final String DEFAULT_INITIAL_POSITION = "latest"; - private static final Long DEFAULT_POLLING_INTERVAL_MS = 1000L; - private static final Integer DEFAULT_MAX_RECORDS = 100; - private static final Long DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS = 6000L; - private static final Long DEFAULT_PUBLISH_TIMEOUT_MS = 6000L; - private static final Long DEFAULT_SHUTDOWN_TIMEOUT_MS = 20000L; - private static final Level DEFAULT_AWS_LIB_LOG_LEVEL = Level.WARN; - private static final Boolean DEFAULT_SEND_ASYNC = true; - private static final Boolean DEFAULT_SEND_STREAM_EVENTS = false; + + static final String REGION_FIELD = "region"; + static final String ENDPOINT_FIELD = "endpoint"; + static final String STREAM_EVENTS_TOPIC_FIELD = "topic"; + static final String NUMBER_OF_SUBSCRIBERS_FIELD = "numberOfSubscribers"; + static final String APPLICATION_NAME_FIELD = "applicationName"; + static final String INITIAL_POSITION_FIELD = "initialPosition"; + static final String POLLING_INTERVAL_MS_FIELD = "pollingIntervalMs"; + static final String MAX_RECORDS_FIELD = "maxRecords"; + static final String PUBLISH_SINGLE_REQUEST_TIMEOUT_MS_FIELD = "publishSingleRequestTimeoutMs"; + static final String PUBLISH_TIMEOUT_MS_FIELD = "publishTimeoutMs"; + static final String SHUTDOWN_MS_FIELD = "shutdownTimeoutMs"; + static final String AWS_LIB_LOG_LEVEL_FIELD = "awsLibLogLevel"; + static final String SEND_ASYNC_FIELD = "sendAsync"; + static final String SEND_STREAM_EVENTS_FIELD = "sendStreamEvents"; + + static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6"; + static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit"; + static final String DEFAULT_INITIAL_POSITION = "latest"; + static final Long DEFAULT_POLLING_INTERVAL_MS = 1000L; + static final Integer DEFAULT_MAX_RECORDS = 100; + static final Long DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS = 6000L; + static final Long DEFAULT_PUBLISH_TIMEOUT_MS = 6000L; + static final Long DEFAULT_SHUTDOWN_TIMEOUT_MS = 20000L; + static final Level DEFAULT_AWS_LIB_LOG_LEVEL = Level.WARN; + static final Boolean DEFAULT_SEND_ASYNC = true; + static final Boolean DEFAULT_SEND_STREAM_EVENTS = false; private final String applicationName; private final String streamEventsTopic; @@ -61,56 +77,60 @@ public Configuration(PluginConfigFactory configFactory, @PluginName String pluginName) { PluginConfig pluginConfig = configFactory.getFromGerritConfig(pluginName); - this.region = Optional.ofNullable(getStringParam(pluginConfig, "region", null)).map(Region::of); + this.region = + Optional.ofNullable(getStringParam(pluginConfig, REGION_FIELD, null)).map(Region::of); this.endpoint = - Optional.ofNullable(getStringParam(pluginConfig, "endpoint", null)).map(URI::create); - this.streamEventsTopic = getStringParam(pluginConfig, "topic", DEFAULT_STREAM_EVENTS_TOPIC); + Optional.ofNullable(getStringParam(pluginConfig, ENDPOINT_FIELD, null)).map(URI::create); + this.streamEventsTopic = + getStringParam(pluginConfig, STREAM_EVENTS_TOPIC_FIELD, DEFAULT_STREAM_EVENTS_TOPIC); this.sendStreamEvents = - Optional.ofNullable(getStringParam(pluginConfig, "sendStreamEvents", null)) + Optional.ofNullable(getStringParam(pluginConfig, SEND_STREAM_EVENTS_FIELD, null)) .map(Boolean::new) .orElse(DEFAULT_SEND_STREAM_EVENTS); this.numberOfSubscribers = Integer.parseInt( - getStringParam(pluginConfig, "numberOfSubscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS)); - this.applicationName = getStringParam(pluginConfig, "applicationName", pluginName); + getStringParam( + pluginConfig, NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS)); + this.applicationName = getStringParam(pluginConfig, APPLICATION_NAME_FIELD, pluginName); this.initialPosition = InitialPositionInStream.valueOf( - getStringParam(pluginConfig, "initialPosition", DEFAULT_INITIAL_POSITION) + getStringParam(pluginConfig, INITIAL_POSITION_FIELD, DEFAULT_INITIAL_POSITION) .toUpperCase()); this.pollingIntervalMs = - Optional.ofNullable(getStringParam(pluginConfig, "pollingIntervalMs", null)) + Optional.ofNullable(getStringParam(pluginConfig, POLLING_INTERVAL_MS_FIELD, null)) .map(Long::parseLong) .orElse(DEFAULT_POLLING_INTERVAL_MS); this.maxRecords = - Optional.ofNullable(getStringParam(pluginConfig, "maxRecords", null)) + Optional.ofNullable(getStringParam(pluginConfig, MAX_RECORDS_FIELD, null)) .map(Integer::parseInt) .orElse(DEFAULT_MAX_RECORDS); this.publishSingleRequestTimeoutMs = - Optional.ofNullable(getStringParam(pluginConfig, "publishSingleRequestTimeoutMs", null)) + Optional.ofNullable( + getStringParam(pluginConfig, PUBLISH_SINGLE_REQUEST_TIMEOUT_MS_FIELD, null)) .map(Long::parseLong) .orElse(DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS); this.publishTimeoutMs = - Optional.ofNullable(getStringParam(pluginConfig, "publishTimeoutMs", null)) + Optional.ofNullable(getStringParam(pluginConfig, PUBLISH_TIMEOUT_MS_FIELD, null)) .map(Long::parseLong) .orElse(DEFAULT_PUBLISH_TIMEOUT_MS); this.shutdownTimeoutMs = - Optional.ofNullable(getStringParam(pluginConfig, "shutdownTimeoutMs", null)) + Optional.ofNullable(getStringParam(pluginConfig, SHUTDOWN_MS_FIELD, null)) .map(Long::parseLong) .orElse(DEFAULT_SHUTDOWN_TIMEOUT_MS); this.awsLibLogLevel = - Optional.ofNullable(getStringParam(pluginConfig, "awsLibLogLevel", null)) + Optional.ofNullable(getStringParam(pluginConfig, AWS_LIB_LOG_LEVEL_FIELD, null)) .map(l -> Level.toLevel(l, DEFAULT_AWS_LIB_LOG_LEVEL)) .orElse(DEFAULT_AWS_LIB_LOG_LEVEL); this.sendAsync = - Optional.ofNullable(getStringParam(pluginConfig, "sendAsync", null)) + Optional.ofNullable(getStringParam(pluginConfig, SEND_ASYNC_FIELD, null)) .map(Boolean::new) .orElse(DEFAULT_SEND_ASYNC);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/InitConfig.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/InitConfig.java new file mode 100644 index 0000000..6ab4f83 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/InitConfig.java
@@ -0,0 +1,116 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.googlesource.gerrit.plugins.kinesis; + +import static com.googlesource.gerrit.plugins.kinesis.Configuration.APPLICATION_NAME_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.AWS_LIB_LOG_LEVEL_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_AWS_LIB_LOG_LEVEL; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_INITIAL_POSITION; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_MAX_RECORDS; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_NUMBER_OF_SUBSCRIBERS; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_POLLING_INTERVAL_MS; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_PUBLISH_TIMEOUT_MS; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_SEND_ASYNC; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_SEND_STREAM_EVENTS; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_SHUTDOWN_TIMEOUT_MS; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_STREAM_EVENTS_TOPIC; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.ENDPOINT_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.INITIAL_POSITION_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.MAX_RECORDS_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.NUMBER_OF_SUBSCRIBERS_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.POLLING_INTERVAL_MS_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.PUBLISH_SINGLE_REQUEST_TIMEOUT_MS_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.PUBLISH_TIMEOUT_MS_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.REGION_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.SEND_ASYNC_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.SEND_STREAM_EVENTS_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.SHUTDOWN_MS_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.STREAM_EVENTS_TOPIC_FIELD; + +import com.google.gerrit.extensions.annotations.PluginName; +import com.google.gerrit.pgm.init.api.ConsoleUI; +import com.google.gerrit.pgm.init.api.InitStep; +import com.google.gerrit.pgm.init.api.Section; +import com.google.gerrit.server.config.GerritInstanceIdProvider; +import com.google.inject.Inject; + +public class InitConfig implements InitStep { + private final Section pluginSection; + private final String pluginName; + private final ConsoleUI ui; + private final GerritInstanceIdProvider gerritInstanceIdProvider; + + @Inject + InitConfig( + Section.Factory sections, + @PluginName String pluginName, + GerritInstanceIdProvider gerritInstanceIdProvider, + ConsoleUI ui) { + this.pluginName = pluginName; + this.ui = ui; + this.gerritInstanceIdProvider = gerritInstanceIdProvider; + this.pluginSection = sections.get("plugin", pluginName); + } + + @Override + public void run() throws Exception { + ui.header(String.format("%s plugin", pluginName)); + + pluginSection.string("AWS region (leave blank for default provider chain)", REGION_FIELD, null); + pluginSection.string("AWS endpoint (dev or testing, not for production)", ENDPOINT_FIELD, null); + + boolean sendStreamEvents = ui.yesno(DEFAULT_SEND_STREAM_EVENTS, "Should send stream events?"); + pluginSection.set(SEND_STREAM_EVENTS_FIELD, Boolean.toString(sendStreamEvents)); + + if (sendStreamEvents) { + pluginSection.string( + "Stream events topic", STREAM_EVENTS_TOPIC_FIELD, DEFAULT_STREAM_EVENTS_TOPIC); + } + pluginSection.string( + "Number of subscribers", NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS); + pluginSection.string("Application name", APPLICATION_NAME_FIELD, pluginName); + pluginSection.string("Initial position", INITIAL_POSITION_FIELD, DEFAULT_INITIAL_POSITION); + pluginSection.string( + "Polling Interval (ms)", + POLLING_INTERVAL_MS_FIELD, + Long.toString(DEFAULT_POLLING_INTERVAL_MS)); + pluginSection.string( + "Maximum number of record to fetch", + MAX_RECORDS_FIELD, + Integer.toString(DEFAULT_MAX_RECORDS)); + + pluginSection.string( + "Maximum total time waiting for a publish result (ms)", + PUBLISH_SINGLE_REQUEST_TIMEOUT_MS_FIELD, + Long.toString(DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS)); + + pluginSection.string( + "Maximum total time waiting for publishing, including retries", + PUBLISH_TIMEOUT_MS_FIELD, + Long.toString(DEFAULT_PUBLISH_TIMEOUT_MS)); + + pluginSection.string( + "Maximum total time waiting when shutting down (ms)", + SHUTDOWN_MS_FIELD, + Long.toString(DEFAULT_SHUTDOWN_TIMEOUT_MS)); + pluginSection.string( + "Which level AWS libraries should log at", + AWS_LIB_LOG_LEVEL_FIELD, + DEFAULT_AWS_LIB_LOG_LEVEL.toString()); + + boolean sendAsync = ui.yesno(DEFAULT_SEND_ASYNC, "Should send messages asynchronously?"); + pluginSection.set(SEND_ASYNC_FIELD, Boolean.toString(sendAsync)); + } +}
diff --git a/src/main/resources/Documentation/Config.md b/src/main/resources/Documentation/Config.md index 34ab8f9..0683975 100644 --- a/src/main/resources/Documentation/Config.md +++ b/src/main/resources/Documentation/Config.md
@@ -113,4 +113,29 @@ ========================= Note that System properties always override and take priority over the above -gerrit.config configuration. \ No newline at end of file +gerrit.config configuration. + +Gerrit init integration +----------------------- + +The plugin provides an init step that helps to set up the configuration. + +``` +*** events-aws-kinesis plugin +*** + +AWS region (leave blank for default provider chain) : +AWS endpoint (dev or testing, not for production) : +Should send stream events? [y/N]? y +Stream events topic [gerrit]: +Number of subscribers [6]: +Application name [events-aws-kinesis]: +Initial position [latest]: +Polling Interval (ms) [1000]: +Maximum number of record to fetch [100]: +The maximum total time waiting for a publish result (ms) [6000]: +The maximum total time waiting for publishing, including retries [6000]: +The maximum total time waiting when shutting down (ms) [20000]: +Which level AWS libraries should log at [WARN]: +Should send messages asynchronously? [Y/n]? +``` \ No newline at end of file