Merge branch 'stable-3.4' into stable-3.5

* stable-3.4:
  Consume events-broker from source
  Allow specifying failoverTimeMillis property for Kinesis consumer
  Allow specifying RecordMaxBufferedTime property for Kinesis producer
  Fix issue with casting aws credentials providers
  Add AWS credentials profile name parameter

Change-Id: If3d0c4993bedb2ebb86a3d5ad1baba37b318a8f3
diff --git a/BUILD b/BUILD
index 5f395ca..1f6cdfa 100644
--- a/BUILD
+++ b/BUILD
@@ -12,6 +12,7 @@
     srcs = glob(["src/main/java/**/*.java"]),
     manifest_entries = [
         "Gerrit-PluginName: events-aws-kinesis",
+        "Gerrit-InitStep: com.googlesource.gerrit.plugins.kinesis.InitConfig",
         "Gerrit-Module: com.googlesource.gerrit.plugins.kinesis.Module",
         "Implementation-Title: Gerrit events listener to send events to AWS Kinesis broker",
         "Implementation-URL: https://gerrit.googlesource.com/plugins/events-aws-kinesis",
@@ -42,8 +43,10 @@
         "@awssdk-protocol-core//jar",
         "@awssdk-query-protocol//jar",
         "@commons-codec//jar",
+        "@commons-lang//jar",
         "@io-netty-all//jar",
         "@jackson-annotations//jar",
+        "@jackson-core//jar",
         "@jackson-databind//jar",
         "@jackson-dataformat-cbor//jar",
         "@javax-xml-bind//jar",
@@ -60,7 +63,6 @@
     tags = ["events-aws-kinesis"],
     deps = [
         ":events-aws-kinesis__plugin_test_deps",
-        "//lib/testcontainers",
         "//plugins/events-broker",
         "@amazon-http-client-spi//jar",
         "@amazon-kinesis-client//jar",
@@ -75,10 +77,13 @@
     visibility = ["//visibility:public"],
     exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
         ":events-aws-kinesis__plugin",
-        "//lib/jackson:jackson-annotations",
-        "//lib/testcontainers",
-        "//lib/testcontainers:docker-java-api",
-        "//lib/testcontainers:docker-java-transport",
+        "@jackson-annotations//jar",
+        "@testcontainers//jar",
+        "@docker-java-api//jar",
+        "@docker-java-transport//jar",
+        "@duct-tape//jar",
+        "@visible-assertions//jar",
+        "@jna//jar",
         "@amazon-regions//jar",
         "@amazon-auth//jar",
         "@amazon-kinesis//jar",
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index f2fc36a..65abd99 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -102,6 +102,18 @@
     )
 
     maven_jar(
+        name = "jackson-annotations",
+        artifact = "com.fasterxml.jackson.core:jackson-annotations:" + JACKSON_VER,
+        sha1 = "6ae6028aff033f194c9710ad87c224ccaadeed6c",
+    )
+
+    maven_jar(
+        name = "jackson-core",
+        artifact = "com.fasterxml.jackson.core:jackson-core:" + JACKSON_VER,
+        sha1 = "8796585e716440d6dd5128b30359932a9eb74d0d",
+    )
+
+    maven_jar(
         name = "jackson-dataformat-cbor",
         artifact = "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:" + JACKSON_VER,
         sha1 = "c854bb2d46138198cb5d4aae86ef6c04b8bc1e70",
@@ -156,9 +168,55 @@
     )
 
     maven_jar(
+        name = "commons-lang",
+        artifact = "commons-lang:commons-lang:2.6",
+        sha1 = "0ce1edb914c94ebc388f086c6827e8bdeec71ac2",
+    )
+
+    TESTCONTAINERS_VERSION = "1.15.3"
+
+    maven_jar(
+        name = "testcontainers",
+        artifact = "org.testcontainers:testcontainers:" + TESTCONTAINERS_VERSION,
+        sha1 = "95c6cfde71c2209f0c29cb14e432471e0b111880",
+    )
+
+    maven_jar(
         name = "testcontainer-localstack",
-        artifact = "org.testcontainers:localstack:1.15.2",
-        sha1 = "ae3c4717bc5f37410abbb490cb46d349a77990a0",
+        artifact = "org.testcontainers:localstack:" + TESTCONTAINERS_VERSION,
+        sha1 = "7aa69995bdaafb4b06e69fdab9bd98c4fddee43d",
+    )
+
+    maven_jar(
+        name = "duct-tape",
+        artifact = "org.rnorth.duct-tape:duct-tape:1.0.8",
+        sha1 = "92edc22a9ab2f3e17c9bf700aaee377d50e8b530",
+    )
+
+    maven_jar(
+        name = "visible-assertions",
+        artifact = "org.rnorth.visible-assertions:visible-assertions:2.1.2",
+        sha1 = "20d31a578030ec8e941888537267d3123c2ad1c1",
+    )
+
+    maven_jar(
+        name = "jna",
+        artifact = "net.java.dev.jna:jna:5.5.0",
+        sha1 = "0e0845217c4907822403912ad6828d8e0b256208",
+    )
+
+    DOCKER_JAVA_VERS = "3.2.8"
+
+    maven_jar(
+        name = "docker-java-api",
+        artifact = "com.github.docker-java:docker-java-api:" + DOCKER_JAVA_VERS,
+        sha1 = "4ac22a72d546a9f3523cd4b5fabffa77c4a6ec7c",
+    )
+
+    maven_jar(
+        name = "docker-java-transport",
+        artifact = "com.github.docker-java:docker-java-transport:" + DOCKER_JAVA_VERS,
+        sha1 = "c3b5598c67d0a5e2e780bf48f520da26b9915eab",
     )
 
     maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
index e0edc71..89476cf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
@@ -30,19 +30,36 @@
 @Singleton
 class Configuration {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-  private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
-  private static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit";
-  private static final String DEFAULT_INITIAL_POSITION = "latest";
-  private static final Long DEFAULT_POLLING_INTERVAL_MS = 1000L;
-  private static final Integer DEFAULT_MAX_RECORDS = 100;
-  private static final Long DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS = 6000L;
-  private static final Long DEFAULT_PUBLISH_RECORD_MAX_BUFFERED_TIME_MS = 100L;
-  private static final Long DEFAULT_CONSUMER_FAILOVER_TIME_MS = 10000L;
-  private static final Long DEFAULT_PUBLISH_TIMEOUT_MS = 6000L;
-  private static final Long DEFAULT_SHUTDOWN_TIMEOUT_MS = 20000L;
-  private static final Long DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 60000L; // 5 min
-  private static final Level DEFAULT_AWS_LIB_LOG_LEVEL = Level.WARN;
-  private static final Boolean DEFAULT_SEND_ASYNC = true;
+
+  static final String REGION_FIELD = "region";
+  static final String ENDPOINT_FIELD = "endpoint";
+  static final String STREAM_EVENTS_TOPIC_FIELD = "topic";
+  static final String NUMBER_OF_SUBSCRIBERS_FIELD = "numberOfSubscribers";
+  static final String APPLICATION_NAME_FIELD = "applicationName";
+  static final String INITIAL_POSITION_FIELD = "initialPosition";
+  static final String POLLING_INTERVAL_MS_FIELD = "pollingIntervalMs";
+  static final String MAX_RECORDS_FIELD = "maxRecords";
+  static final String PUBLISH_SINGLE_REQUEST_TIMEOUT_MS_FIELD = "publishSingleRequestTimeoutMs";
+  static final String PUBLISH_TIMEOUT_MS_FIELD = "publishTimeoutMs";
+  static final String SHUTDOWN_MS_FIELD = "shutdownTimeoutMs";
+  static final String AWS_LIB_LOG_LEVEL_FIELD = "awsLibLogLevel";
+  static final String SEND_ASYNC_FIELD = "sendAsync";
+  static final String SEND_STREAM_EVENTS_FIELD = "sendStreamEvents";
+
+  static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
+  static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit";
+  static final String DEFAULT_INITIAL_POSITION = "latest";
+  static final Long DEFAULT_POLLING_INTERVAL_MS = 1000L;
+  static final Integer DEFAULT_MAX_RECORDS = 100;
+  static final Long DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS = 6000L;
+  static final Long DEFAULT_PUBLISH_RECORD_MAX_BUFFERED_TIME_MS = 100L;
+  static final Long DEFAULT_CONSUMER_FAILOVER_TIME_MS = 10000L;
+  static final Long DEFAULT_PUBLISH_TIMEOUT_MS = 6000L;
+  static final Long DEFAULT_SHUTDOWN_TIMEOUT_MS = 20000L;
+  static final Level DEFAULT_AWS_LIB_LOG_LEVEL = Level.WARN;
+  static final Boolean DEFAULT_SEND_ASYNC = true;
+  static final Boolean DEFAULT_SEND_STREAM_EVENTS = false;
+  static final Long DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 60000L; // 5 min
 
   private final String applicationName;
   private final String streamEventsTopic;
@@ -58,6 +75,7 @@
   private final Long checkpointIntervalMs;
   private final Level awsLibLogLevel;
   private final Boolean sendAsync;
+  private final Boolean sendStreamEvents;
   private final Optional<String> awsConfigurationProfileName;
   private final Long publishRecordMaxBufferedTimeMs;
   private final Long consumerFailoverTimeInMs;
@@ -66,32 +84,40 @@
   public Configuration(PluginConfigFactory configFactory, @PluginName String pluginName) {
     PluginConfig pluginConfig = configFactory.getFromGerritConfig(pluginName);
 
-    this.region = Optional.ofNullable(getStringParam(pluginConfig, "region", null)).map(Region::of);
+    this.region =
+        Optional.ofNullable(getStringParam(pluginConfig, REGION_FIELD, null)).map(Region::of);
     this.endpoint =
-        Optional.ofNullable(getStringParam(pluginConfig, "endpoint", null)).map(URI::create);
-    this.streamEventsTopic = getStringParam(pluginConfig, "topic", DEFAULT_STREAM_EVENTS_TOPIC);
+        Optional.ofNullable(getStringParam(pluginConfig, ENDPOINT_FIELD, null)).map(URI::create);
+    this.streamEventsTopic =
+        getStringParam(pluginConfig, STREAM_EVENTS_TOPIC_FIELD, DEFAULT_STREAM_EVENTS_TOPIC);
+    this.sendStreamEvents =
+        Optional.ofNullable(getStringParam(pluginConfig, SEND_STREAM_EVENTS_FIELD, null))
+            .map(Boolean::new)
+            .orElse(DEFAULT_SEND_STREAM_EVENTS);
     this.numberOfSubscribers =
         Integer.parseInt(
-            getStringParam(pluginConfig, "numberOfSubscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS));
-    this.applicationName = getStringParam(pluginConfig, "applicationName", pluginName);
+            getStringParam(
+                pluginConfig, NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS));
+    this.applicationName = getStringParam(pluginConfig, APPLICATION_NAME_FIELD, pluginName);
 
     this.initialPosition =
         InitialPositionInStream.valueOf(
-            getStringParam(pluginConfig, "initialPosition", DEFAULT_INITIAL_POSITION)
+            getStringParam(pluginConfig, INITIAL_POSITION_FIELD, DEFAULT_INITIAL_POSITION)
                 .toUpperCase());
 
     this.pollingIntervalMs =
-        Optional.ofNullable(getStringParam(pluginConfig, "pollingIntervalMs", null))
+        Optional.ofNullable(getStringParam(pluginConfig, POLLING_INTERVAL_MS_FIELD, null))
             .map(Long::parseLong)
             .orElse(DEFAULT_POLLING_INTERVAL_MS);
 
     this.maxRecords =
-        Optional.ofNullable(getStringParam(pluginConfig, "maxRecords", null))
+        Optional.ofNullable(getStringParam(pluginConfig, MAX_RECORDS_FIELD, null))
             .map(Integer::parseInt)
             .orElse(DEFAULT_MAX_RECORDS);
 
     this.publishSingleRequestTimeoutMs =
-        Optional.ofNullable(getStringParam(pluginConfig, "publishSingleRequestTimeoutMs", null))
+        Optional.ofNullable(
+                getStringParam(pluginConfig, PUBLISH_SINGLE_REQUEST_TIMEOUT_MS_FIELD, null))
             .map(Long::parseLong)
             .orElse(DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS);
 
@@ -106,12 +132,12 @@
             .orElse(DEFAULT_CONSUMER_FAILOVER_TIME_MS);
 
     this.publishTimeoutMs =
-        Optional.ofNullable(getStringParam(pluginConfig, "publishTimeoutMs", null))
+        Optional.ofNullable(getStringParam(pluginConfig, PUBLISH_TIMEOUT_MS_FIELD, null))
             .map(Long::parseLong)
             .orElse(DEFAULT_PUBLISH_TIMEOUT_MS);
 
     this.shutdownTimeoutMs =
-        Optional.ofNullable(getStringParam(pluginConfig, "shutdownTimeoutMs", null))
+        Optional.ofNullable(getStringParam(pluginConfig, SHUTDOWN_MS_FIELD, null))
             .map(Long::parseLong)
             .orElse(DEFAULT_SHUTDOWN_TIMEOUT_MS);
 
@@ -121,12 +147,12 @@
             .orElse(DEFAULT_CHECKPOINT_INTERVAL_MS);
 
     this.awsLibLogLevel =
-        Optional.ofNullable(getStringParam(pluginConfig, "awsLibLogLevel", null))
+        Optional.ofNullable(getStringParam(pluginConfig, AWS_LIB_LOG_LEVEL_FIELD, null))
             .map(l -> Level.toLevel(l, DEFAULT_AWS_LIB_LOG_LEVEL))
             .orElse(DEFAULT_AWS_LIB_LOG_LEVEL);
 
     this.sendAsync =
-        Optional.ofNullable(getStringParam(pluginConfig, "sendAsync", null))
+        Optional.ofNullable(getStringParam(pluginConfig, SEND_ASYNC_FIELD, null))
             .map(Boolean::new)
             .orElse(DEFAULT_SEND_ASYNC);
 
@@ -221,4 +247,8 @@
   public Boolean isSendAsync() {
     return sendAsync;
   }
+
+  public Boolean isSendStreamEvents() {
+    return sendStreamEvents;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/InitConfig.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/InitConfig.java
new file mode 100644
index 0000000..6ab4f83
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/InitConfig.java
@@ -0,0 +1,116 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.kinesis;
+
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.APPLICATION_NAME_FIELD;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.AWS_LIB_LOG_LEVEL_FIELD;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_AWS_LIB_LOG_LEVEL;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_INITIAL_POSITION;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_MAX_RECORDS;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_NUMBER_OF_SUBSCRIBERS;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_POLLING_INTERVAL_MS;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_PUBLISH_TIMEOUT_MS;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_SEND_ASYNC;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_SEND_STREAM_EVENTS;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_SHUTDOWN_TIMEOUT_MS;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_STREAM_EVENTS_TOPIC;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.ENDPOINT_FIELD;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.INITIAL_POSITION_FIELD;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.MAX_RECORDS_FIELD;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.NUMBER_OF_SUBSCRIBERS_FIELD;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.POLLING_INTERVAL_MS_FIELD;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.PUBLISH_SINGLE_REQUEST_TIMEOUT_MS_FIELD;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.PUBLISH_TIMEOUT_MS_FIELD;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.REGION_FIELD;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.SEND_ASYNC_FIELD;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.SEND_STREAM_EVENTS_FIELD;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.SHUTDOWN_MS_FIELD;
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.STREAM_EVENTS_TOPIC_FIELD;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.pgm.init.api.ConsoleUI;
+import com.google.gerrit.pgm.init.api.InitStep;
+import com.google.gerrit.pgm.init.api.Section;
+import com.google.gerrit.server.config.GerritInstanceIdProvider;
+import com.google.inject.Inject;
+
+public class InitConfig implements InitStep {
+  private final Section pluginSection;
+  private final String pluginName;
+  private final ConsoleUI ui;
+  private final GerritInstanceIdProvider gerritInstanceIdProvider;
+
+  @Inject
+  InitConfig(
+      Section.Factory sections,
+      @PluginName String pluginName,
+      GerritInstanceIdProvider gerritInstanceIdProvider,
+      ConsoleUI ui) {
+    this.pluginName = pluginName;
+    this.ui = ui;
+    this.gerritInstanceIdProvider = gerritInstanceIdProvider;
+    this.pluginSection = sections.get("plugin", pluginName);
+  }
+
+  @Override
+  public void run() throws Exception {
+    ui.header(String.format("%s plugin", pluginName));
+
+    pluginSection.string("AWS region (leave blank for default provider chain)", REGION_FIELD, null);
+    pluginSection.string("AWS endpoint (dev or testing, not for production)", ENDPOINT_FIELD, null);
+
+    boolean sendStreamEvents = ui.yesno(DEFAULT_SEND_STREAM_EVENTS, "Should send stream events?");
+    pluginSection.set(SEND_STREAM_EVENTS_FIELD, Boolean.toString(sendStreamEvents));
+
+    if (sendStreamEvents) {
+      pluginSection.string(
+          "Stream events topic", STREAM_EVENTS_TOPIC_FIELD, DEFAULT_STREAM_EVENTS_TOPIC);
+    }
+    pluginSection.string(
+        "Number of subscribers", NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS);
+    pluginSection.string("Application name", APPLICATION_NAME_FIELD, pluginName);
+    pluginSection.string("Initial position", INITIAL_POSITION_FIELD, DEFAULT_INITIAL_POSITION);
+    pluginSection.string(
+        "Polling Interval (ms)",
+        POLLING_INTERVAL_MS_FIELD,
+        Long.toString(DEFAULT_POLLING_INTERVAL_MS));
+    pluginSection.string(
+        "Maximum number of record to fetch",
+        MAX_RECORDS_FIELD,
+        Integer.toString(DEFAULT_MAX_RECORDS));
+
+    pluginSection.string(
+        "Maximum total time waiting for a publish result (ms)",
+        PUBLISH_SINGLE_REQUEST_TIMEOUT_MS_FIELD,
+        Long.toString(DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS));
+
+    pluginSection.string(
+        "Maximum total time waiting for publishing, including retries",
+        PUBLISH_TIMEOUT_MS_FIELD,
+        Long.toString(DEFAULT_PUBLISH_TIMEOUT_MS));
+
+    pluginSection.string(
+        "Maximum total time waiting when shutting down (ms)",
+        SHUTDOWN_MS_FIELD,
+        Long.toString(DEFAULT_SHUTDOWN_TIMEOUT_MS));
+    pluginSection.string(
+        "Which level AWS libraries should log at",
+        AWS_LIB_LOG_LEVEL_FIELD,
+        DEFAULT_AWS_LIB_LOG_LEVEL.toString());
+
+    boolean sendAsync = ui.yesno(DEFAULT_SEND_ASYNC, "Should send messages asynchronously?");
+    pluginSection.set(SEND_ASYNC_FIELD, Boolean.toString(sendAsync));
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
index 598aa6c..b794aaf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
@@ -122,7 +122,7 @@
   }
 
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
-    logger.atInfo().log("Checkpointing shard: " + kinesisShardId);
+    logger.atInfo().log("Checkpointing shard: %s", kinesisShardId);
     try {
       checkpointer.checkpoint();
     } catch (ShutdownException se) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
index 10d752c..6fcf93f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
@@ -37,8 +37,14 @@
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 
 public class Module extends LifecycleModule {
+  private final Configuration configuration;
   private Set<TopicSubscriber> activeConsumers = Sets.newHashSet();
 
+  @Inject
+  Module(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
   /**
    * By default the events-broker library (loaded directly by the multi-site) registers a noop
    * implementation, which itself registers a list of topic subscribers. Since we have no control
@@ -78,7 +84,9 @@
     DynamicItem.bind(binder(), BrokerApi.class).to(KinesisBrokerApi.class).in(Scopes.SINGLETON);
     DynamicSet.bind(binder(), LifecycleListener.class).to(KinesisBrokerLifeCycleManager.class);
     factory(KinesisConsumer.Factory.class);
-    DynamicSet.bind(binder(), EventListener.class).to(KinesisPublisher.class);
+    if (configuration.isSendStreamEvents()) {
+      DynamicSet.bind(binder(), EventListener.class).to(KinesisPublisher.class);
+    }
     listener().to(AWSLogLevelListener.class);
   }
 }
diff --git a/src/main/resources/Documentation/Config.md b/src/main/resources/Documentation/Config.md
index 9758aab..f3e5ffa 100644
--- a/src/main/resources/Documentation/Config.md
+++ b/src/main/resources/Documentation/Config.md
@@ -121,6 +121,10 @@
     plugin exposes all stream events under this topic name.
     Default: gerrit
 
+`plugin.events-aws-kinesis.sendStreamEvents`
+:   Whether to send stream events to the `streamEventsTopic` topic.
+    Default: false
+
 `plugin.events-aws-kinesis.sendAsync`
 :   Optional. Whether to send messages to Kinesis asynchronously, without
     waiting for the result of the operation.
@@ -140,4 +144,29 @@
 =========================
 
 Note that System properties always override and take priority over the above
-gerrit.config configuration.
\ No newline at end of file
+gerrit.config configuration.
+
+Gerrit init integration
+-----------------------
+
+The plugin provides an init step that helps to set up the configuration.
+
+```
+*** events-aws-kinesis plugin
+***
+
+AWS region (leave blank for default provider chain) :
+AWS endpoint (dev or testing, not for production) :
+Should send stream events?     [y/N]? y
+Stream events topic            [gerrit]:
+Number of subscribers          [6]:
+Application name               [events-aws-kinesis]:
+Initial position               [latest]:
+Polling Interval (ms)          [1000]:
+Maximum number of record to fetch [100]:
+The maximum total time waiting for a publish result (ms) [6000]:
+The maximum total time waiting for publishing, including retries [6000]:
+The maximum total time waiting when shutting down (ms) [20000]:
+Which level AWS libraries should log at [WARN]:
+Should send messages asynchronously? [Y/n]?
+```
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java
index 401838e..9ce2cba 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java
@@ -94,6 +94,27 @@
   }
 
   @Test
+  public void shouldConfigureSendStreamEvents() {
+    pluginConfig.setBoolean("sendStreamEvents", true);
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    Configuration configuration = new Configuration(pluginConfigFactoryMock, PLUGIN_NAME);
+
+    assertThat(configuration.isSendStreamEvents()).isEqualTo(true);
+  }
+
+  @Test
+  public void shouldDefaultSendStreamEvents() {
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    Configuration configuration = new Configuration(pluginConfigFactoryMock, PLUGIN_NAME);
+
+    assertThat(configuration.isSendStreamEvents()).isEqualTo(false);
+  }
+
+  @Test
   public void shouldReturnAWSProfileNameWhenConfigured() {
     String awsProfileName = "aws_profile_name";
     pluginConfig.setString("profileName", awsProfileName);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
index f70a5be..7449440 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
@@ -57,6 +57,7 @@
   private static final Duration WAIT_FOR_CONSUMPTION = Duration.ofSeconds(120);
   private static final Duration STREAM_CREATION_TIMEOUT = Duration.ofSeconds(10);
   private static final long SEND_TIMEOUT_MILLIS = 200;
+  private static final String STREAM_EVENTS = "stream_events";
 
   private static final int LOCALSTACK_PORT = 4566;
 
@@ -219,6 +220,28 @@
     assertThat(result.get()).isTrue();
   }
 
+  @Test
+  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
+  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
+  @GerritConfig(name = "plugin.events-aws-kinesis.topic", value = STREAM_EVENTS)
+  @GerritConfig(name = "plugin.events-aws-kinesis.sendStreamEvents", value = "true")
+  public void shouldSendStreamEventsWhenEnabled() throws Exception {
+    createStreamAndWait(STREAM_EVENTS, STREAM_CREATION_TIMEOUT);
+
+    EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
+    kinesisBroker().receiveAsync(STREAM_EVENTS, eventConsumerCounter);
+
+    createChange();
+
+    // There are 4 events are received in the following order:
+    // 1. refUpdate:        ref: refs/sequences/changes
+    // 2. refUpdate:        ref: refs/changes/01/1/1
+    // 3. refUpdate:        ref: refs/changes/01/1/meta
+    // 4. patchset-created: ref: refs/changes/01/1/1
+    WaitUtil.waitUntil(
+        () -> eventConsumerCounter.getConsumedMessages().size() == 4, WAIT_FOR_CONSUMPTION);
+  }
+
   public KinesisBrokerApi kinesisBroker() {
     return (KinesisBrokerApi) plugin.getSysInjector().getInstance(BrokerApi.class);
   }