Merge branch 'stable-3.4' into stable-3.5 * stable-3.4: Log at lower level records processing Reduce verbose logging when processing message Change-Id: I2b17a3ba19351cdc2d6535e9584a2300e4474c88
diff --git a/BUILD b/BUILD index fbbd1d6..2aa6678 100644 --- a/BUILD +++ b/BUILD
@@ -12,6 +12,7 @@ srcs = glob(["src/main/java/**/*.java"]), manifest_entries = [ "Gerrit-PluginName: events-aws-kinesis", + "Gerrit-InitStep: com.googlesource.gerrit.plugins.kinesis.InitConfig", "Gerrit-Module: com.googlesource.gerrit.plugins.kinesis.Module", "Implementation-Title: Gerrit events listener to send events to AWS Kinesis broker", "Implementation-URL: https://gerrit.googlesource.com/plugins/events-aws-kinesis", @@ -41,9 +42,11 @@ "@awssdk-protocol-core//jar", "@awssdk-query-protocol//jar", "@commons-codec//jar", + "@commons-lang//jar", "@events-broker//jar:neverlink", "@io-netty-all//jar", "@jackson-annotations//jar", + "@jackson-core//jar", "@jackson-databind//jar", "@jackson-dataformat-cbor//jar", "@javax-xml-bind//jar", @@ -60,7 +63,6 @@ tags = ["events-aws-kinesis"], deps = [ ":events-aws-kinesis__plugin_test_deps", - "//lib/testcontainers", "@amazon-http-client-spi//jar", "@amazon-kinesis-client//jar", "@amazon-kinesis//jar", @@ -75,10 +77,13 @@ visibility = ["//visibility:public"], exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [ ":events-aws-kinesis__plugin", - "//lib/jackson:jackson-annotations", - "//lib/testcontainers", - "//lib/testcontainers:docker-java-api", - "//lib/testcontainers:docker-java-transport", + "@jackson-annotations//jar", + "@testcontainers//jar", + "@docker-java-api//jar", + "@docker-java-transport//jar", + "@duct-tape//jar", + "@visible-assertions//jar", + "@jna//jar", "@amazon-regions//jar", "@amazon-auth//jar", "@amazon-kinesis//jar",
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl index 388a15e..9e44a2e 100644 --- a/external_plugin_deps.bzl +++ b/external_plugin_deps.bzl
@@ -102,6 +102,18 @@ ) maven_jar( + name = "jackson-annotations", + artifact = "com.fasterxml.jackson.core:jackson-annotations:" + JACKSON_VER, + sha1 = "6ae6028aff033f194c9710ad87c224ccaadeed6c", + ) + + maven_jar( + name = "jackson-core", + artifact = "com.fasterxml.jackson.core:jackson-core:" + JACKSON_VER, + sha1 = "8796585e716440d6dd5128b30359932a9eb74d0d", + ) + + maven_jar( name = "jackson-dataformat-cbor", artifact = "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:" + JACKSON_VER, sha1 = "c854bb2d46138198cb5d4aae86ef6c04b8bc1e70", @@ -109,8 +121,8 @@ maven_jar( name = "events-broker", - artifact = "com.gerritforge:events-broker:3.4.0.4", - sha1 = "8d361d863382290e33828116e65698190118d0f1", + artifact = "com.gerritforge:events-broker:3.5.1", + sha1 = "78b8bc6ad7fd7caadcc1c6e3484332464de0ac38", ) maven_jar( @@ -162,9 +174,55 @@ ) maven_jar( + name = "commons-lang", + artifact = "commons-lang:commons-lang:2.6", + sha1 = "0ce1edb914c94ebc388f086c6827e8bdeec71ac2", + ) + + TESTCONTAINERS_VERSION = "1.15.3" + + maven_jar( + name = "testcontainers", + artifact = "org.testcontainers:testcontainers:" + TESTCONTAINERS_VERSION, + sha1 = "95c6cfde71c2209f0c29cb14e432471e0b111880", + ) + + maven_jar( name = "testcontainer-localstack", - artifact = "org.testcontainers:localstack:1.15.2", - sha1 = "ae3c4717bc5f37410abbb490cb46d349a77990a0", + artifact = "org.testcontainers:localstack:" + TESTCONTAINERS_VERSION, + sha1 = "7aa69995bdaafb4b06e69fdab9bd98c4fddee43d", + ) + + maven_jar( + name = "duct-tape", + artifact = "org.rnorth.duct-tape:duct-tape:1.0.8", + sha1 = "92edc22a9ab2f3e17c9bf700aaee377d50e8b530", + ) + + maven_jar( + name = "visible-assertions", + artifact = "org.rnorth.visible-assertions:visible-assertions:2.1.2", + sha1 = "20d31a578030ec8e941888537267d3123c2ad1c1", + ) + + maven_jar( + name = "jna", + artifact = "net.java.dev.jna:jna:5.5.0", + sha1 = "0e0845217c4907822403912ad6828d8e0b256208", + ) + + DOCKER_JAVA_VERS = "3.2.8" + + maven_jar( + name = "docker-java-api", + artifact = "com.github.docker-java:docker-java-api:" + DOCKER_JAVA_VERS, + sha1 = "4ac22a72d546a9f3523cd4b5fabffa77c4a6ec7c", + ) + + maven_jar( + name = "docker-java-transport", + artifact = "com.github.docker-java:docker-java-transport:" + DOCKER_JAVA_VERS, + sha1 = "c3b5598c67d0a5e2e780bf48f520da26b9915eab", ) maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java index ef7e698..2a15d2c 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java +++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
@@ -30,16 +30,33 @@ @Singleton class Configuration { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); - private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6"; - private static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit"; - private static final String DEFAULT_INITIAL_POSITION = "latest"; - private static final Long DEFAULT_POLLING_INTERVAL_MS = 1000L; - private static final Integer DEFAULT_MAX_RECORDS = 100; - private static final Long DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS = 6000L; - private static final Long DEFAULT_PUBLISH_TIMEOUT_MS = 6000L; - private static final Long DEFAULT_SHUTDOWN_TIMEOUT_MS = 20000L; - private static final Level DEFAULT_AWS_LIB_LOG_LEVEL = Level.WARN; - private static final Boolean DEFAULT_SEND_ASYNC = true; + + static final String REGION_FIELD = "region"; + static final String ENDPOINT_FIELD = "endpoint"; + static final String STREAM_EVENTS_TOPIC_FIELD = "topic"; + static final String NUMBER_OF_SUBSCRIBERS_FIELD = "numberOfSubscribers"; + static final String APPLICATION_NAME_FIELD = "applicationName"; + static final String INITIAL_POSITION_FIELD = "initialPosition"; + static final String POLLING_INTERVAL_MS_FIELD = "pollingIntervalMs"; + static final String MAX_RECORDS_FIELD = "maxRecords"; + static final String PUBLISH_SINGLE_REQUEST_TIMEOUT_MS_FIELD = "publishSingleRequestTimeoutMs"; + static final String PUBLISH_TIMEOUT_MS_FIELD = "publishTimeoutMs"; + static final String SHUTDOWN_MS_FIELD = "shutdownTimeoutMs"; + static final String AWS_LIB_LOG_LEVEL_FIELD = "awsLibLogLevel"; + static final String SEND_ASYNC_FIELD = "sendAsync"; + static final String SEND_STREAM_EVENTS_FIELD = "sendStreamEvents"; + + static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6"; + static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit"; + static final String DEFAULT_INITIAL_POSITION = "latest"; + static final Long DEFAULT_POLLING_INTERVAL_MS = 1000L; + static final Integer DEFAULT_MAX_RECORDS = 100; + static final Long DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS = 6000L; + static final Long DEFAULT_PUBLISH_TIMEOUT_MS = 6000L; + static final Long DEFAULT_SHUTDOWN_TIMEOUT_MS = 20000L; + static final Level DEFAULT_AWS_LIB_LOG_LEVEL = Level.WARN; + static final Boolean DEFAULT_SEND_ASYNC = true; + static final Boolean DEFAULT_SEND_STREAM_EVENTS = false; private final String applicationName; private final String streamEventsTopic; @@ -54,57 +71,66 @@ private final Long shutdownTimeoutMs; private final Level awsLibLogLevel; private final Boolean sendAsync; + private final Boolean sendStreamEvents; @Inject public Configuration(PluginConfigFactory configFactory, @PluginName String pluginName) { PluginConfig pluginConfig = configFactory.getFromGerritConfig(pluginName); - this.region = Optional.ofNullable(getStringParam(pluginConfig, "region", null)).map(Region::of); + this.region = + Optional.ofNullable(getStringParam(pluginConfig, REGION_FIELD, null)).map(Region::of); this.endpoint = - Optional.ofNullable(getStringParam(pluginConfig, "endpoint", null)).map(URI::create); - this.streamEventsTopic = getStringParam(pluginConfig, "topic", DEFAULT_STREAM_EVENTS_TOPIC); + Optional.ofNullable(getStringParam(pluginConfig, ENDPOINT_FIELD, null)).map(URI::create); + this.streamEventsTopic = + getStringParam(pluginConfig, STREAM_EVENTS_TOPIC_FIELD, DEFAULT_STREAM_EVENTS_TOPIC); + this.sendStreamEvents = + Optional.ofNullable(getStringParam(pluginConfig, SEND_STREAM_EVENTS_FIELD, null)) + .map(Boolean::new) + .orElse(DEFAULT_SEND_STREAM_EVENTS); this.numberOfSubscribers = Integer.parseInt( - getStringParam(pluginConfig, "numberOfSubscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS)); - this.applicationName = getStringParam(pluginConfig, "applicationName", pluginName); + getStringParam( + pluginConfig, NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS)); + this.applicationName = getStringParam(pluginConfig, APPLICATION_NAME_FIELD, pluginName); this.initialPosition = InitialPositionInStream.valueOf( - getStringParam(pluginConfig, "initialPosition", DEFAULT_INITIAL_POSITION) + getStringParam(pluginConfig, INITIAL_POSITION_FIELD, DEFAULT_INITIAL_POSITION) .toUpperCase()); this.pollingIntervalMs = - Optional.ofNullable(getStringParam(pluginConfig, "pollingIntervalMs", null)) + Optional.ofNullable(getStringParam(pluginConfig, POLLING_INTERVAL_MS_FIELD, null)) .map(Long::parseLong) .orElse(DEFAULT_POLLING_INTERVAL_MS); this.maxRecords = - Optional.ofNullable(getStringParam(pluginConfig, "maxRecords", null)) + Optional.ofNullable(getStringParam(pluginConfig, MAX_RECORDS_FIELD, null)) .map(Integer::parseInt) .orElse(DEFAULT_MAX_RECORDS); this.publishSingleRequestTimeoutMs = - Optional.ofNullable(getStringParam(pluginConfig, "publishSingleRequestTimeoutMs", null)) + Optional.ofNullable( + getStringParam(pluginConfig, PUBLISH_SINGLE_REQUEST_TIMEOUT_MS_FIELD, null)) .map(Long::parseLong) .orElse(DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS); this.publishTimeoutMs = - Optional.ofNullable(getStringParam(pluginConfig, "publishTimeoutMs", null)) + Optional.ofNullable(getStringParam(pluginConfig, PUBLISH_TIMEOUT_MS_FIELD, null)) .map(Long::parseLong) .orElse(DEFAULT_PUBLISH_TIMEOUT_MS); this.shutdownTimeoutMs = - Optional.ofNullable(getStringParam(pluginConfig, "shutdownTimeoutMs", null)) + Optional.ofNullable(getStringParam(pluginConfig, SHUTDOWN_MS_FIELD, null)) .map(Long::parseLong) .orElse(DEFAULT_SHUTDOWN_TIMEOUT_MS); this.awsLibLogLevel = - Optional.ofNullable(getStringParam(pluginConfig, "awsLibLogLevel", null)) + Optional.ofNullable(getStringParam(pluginConfig, AWS_LIB_LOG_LEVEL_FIELD, null)) .map(l -> Level.toLevel(l, DEFAULT_AWS_LIB_LOG_LEVEL)) .orElse(DEFAULT_AWS_LIB_LOG_LEVEL); this.sendAsync = - Optional.ofNullable(getStringParam(pluginConfig, "sendAsync", null)) + Optional.ofNullable(getStringParam(pluginConfig, SEND_ASYNC_FIELD, null)) .map(Boolean::new) .orElse(DEFAULT_SEND_ASYNC); @@ -179,4 +205,8 @@ public Boolean isSendAsync() { return sendAsync; } + + public Boolean isSendStreamEvents() { + return sendStreamEvents; + } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/InitConfig.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/InitConfig.java new file mode 100644 index 0000000..6ab4f83 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/InitConfig.java
@@ -0,0 +1,116 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.googlesource.gerrit.plugins.kinesis; + +import static com.googlesource.gerrit.plugins.kinesis.Configuration.APPLICATION_NAME_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.AWS_LIB_LOG_LEVEL_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_AWS_LIB_LOG_LEVEL; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_INITIAL_POSITION; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_MAX_RECORDS; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_NUMBER_OF_SUBSCRIBERS; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_POLLING_INTERVAL_MS; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_PUBLISH_TIMEOUT_MS; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_SEND_ASYNC; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_SEND_STREAM_EVENTS; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_SHUTDOWN_TIMEOUT_MS; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.DEFAULT_STREAM_EVENTS_TOPIC; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.ENDPOINT_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.INITIAL_POSITION_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.MAX_RECORDS_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.NUMBER_OF_SUBSCRIBERS_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.POLLING_INTERVAL_MS_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.PUBLISH_SINGLE_REQUEST_TIMEOUT_MS_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.PUBLISH_TIMEOUT_MS_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.REGION_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.SEND_ASYNC_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.SEND_STREAM_EVENTS_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.SHUTDOWN_MS_FIELD; +import static com.googlesource.gerrit.plugins.kinesis.Configuration.STREAM_EVENTS_TOPIC_FIELD; + +import com.google.gerrit.extensions.annotations.PluginName; +import com.google.gerrit.pgm.init.api.ConsoleUI; +import com.google.gerrit.pgm.init.api.InitStep; +import com.google.gerrit.pgm.init.api.Section; +import com.google.gerrit.server.config.GerritInstanceIdProvider; +import com.google.inject.Inject; + +public class InitConfig implements InitStep { + private final Section pluginSection; + private final String pluginName; + private final ConsoleUI ui; + private final GerritInstanceIdProvider gerritInstanceIdProvider; + + @Inject + InitConfig( + Section.Factory sections, + @PluginName String pluginName, + GerritInstanceIdProvider gerritInstanceIdProvider, + ConsoleUI ui) { + this.pluginName = pluginName; + this.ui = ui; + this.gerritInstanceIdProvider = gerritInstanceIdProvider; + this.pluginSection = sections.get("plugin", pluginName); + } + + @Override + public void run() throws Exception { + ui.header(String.format("%s plugin", pluginName)); + + pluginSection.string("AWS region (leave blank for default provider chain)", REGION_FIELD, null); + pluginSection.string("AWS endpoint (dev or testing, not for production)", ENDPOINT_FIELD, null); + + boolean sendStreamEvents = ui.yesno(DEFAULT_SEND_STREAM_EVENTS, "Should send stream events?"); + pluginSection.set(SEND_STREAM_EVENTS_FIELD, Boolean.toString(sendStreamEvents)); + + if (sendStreamEvents) { + pluginSection.string( + "Stream events topic", STREAM_EVENTS_TOPIC_FIELD, DEFAULT_STREAM_EVENTS_TOPIC); + } + pluginSection.string( + "Number of subscribers", NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS); + pluginSection.string("Application name", APPLICATION_NAME_FIELD, pluginName); + pluginSection.string("Initial position", INITIAL_POSITION_FIELD, DEFAULT_INITIAL_POSITION); + pluginSection.string( + "Polling Interval (ms)", + POLLING_INTERVAL_MS_FIELD, + Long.toString(DEFAULT_POLLING_INTERVAL_MS)); + pluginSection.string( + "Maximum number of record to fetch", + MAX_RECORDS_FIELD, + Integer.toString(DEFAULT_MAX_RECORDS)); + + pluginSection.string( + "Maximum total time waiting for a publish result (ms)", + PUBLISH_SINGLE_REQUEST_TIMEOUT_MS_FIELD, + Long.toString(DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS)); + + pluginSection.string( + "Maximum total time waiting for publishing, including retries", + PUBLISH_TIMEOUT_MS_FIELD, + Long.toString(DEFAULT_PUBLISH_TIMEOUT_MS)); + + pluginSection.string( + "Maximum total time waiting when shutting down (ms)", + SHUTDOWN_MS_FIELD, + Long.toString(DEFAULT_SHUTDOWN_TIMEOUT_MS)); + pluginSection.string( + "Which level AWS libraries should log at", + AWS_LIB_LOG_LEVEL_FIELD, + DEFAULT_AWS_LIB_LOG_LEVEL.toString()); + + boolean sendAsync = ui.yesno(DEFAULT_SEND_ASYNC, "Should send messages asynchronously?"); + pluginSection.set(SEND_ASYNC_FIELD, Boolean.toString(sendAsync)); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java index 10d752c..6fcf93f 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java +++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
@@ -37,8 +37,14 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; public class Module extends LifecycleModule { + private final Configuration configuration; private Set<TopicSubscriber> activeConsumers = Sets.newHashSet(); + @Inject + Module(Configuration configuration) { + this.configuration = configuration; + } + /** * By default the events-broker library (loaded directly by the multi-site) registers a noop * implementation, which itself registers a list of topic subscribers. Since we have no control @@ -78,7 +84,9 @@ DynamicItem.bind(binder(), BrokerApi.class).to(KinesisBrokerApi.class).in(Scopes.SINGLETON); DynamicSet.bind(binder(), LifecycleListener.class).to(KinesisBrokerLifeCycleManager.class); factory(KinesisConsumer.Factory.class); - DynamicSet.bind(binder(), EventListener.class).to(KinesisPublisher.class); + if (configuration.isSendStreamEvents()) { + DynamicSet.bind(binder(), EventListener.class).to(KinesisPublisher.class); + } listener().to(AWSLogLevelListener.class); } }
diff --git a/src/main/resources/Documentation/Config.md b/src/main/resources/Documentation/Config.md index 03e3650..0683975 100644 --- a/src/main/resources/Documentation/Config.md +++ b/src/main/resources/Documentation/Config.md
@@ -96,6 +96,10 @@ plugin exposes all stream events under this topic name. Default: gerrit +`plugin.events-aws-kinesis.sendStreamEvents` +: Whether to send stream events to the `streamEventsTopic` topic. + Default: false + `plugin.events-aws-kinesis.sendAsync` : Optional. Whether to send messages to Kinesis asynchronously, without waiting for the result of the operation. @@ -109,4 +113,29 @@ ========================= Note that System properties always override and take priority over the above -gerrit.config configuration. \ No newline at end of file +gerrit.config configuration. + +Gerrit init integration +----------------------- + +The plugin provides an init step that helps to set up the configuration. + +``` +*** events-aws-kinesis plugin +*** + +AWS region (leave blank for default provider chain) : +AWS endpoint (dev or testing, not for production) : +Should send stream events? [y/N]? y +Stream events topic [gerrit]: +Number of subscribers [6]: +Application name [events-aws-kinesis]: +Initial position [latest]: +Polling Interval (ms) [1000]: +Maximum number of record to fetch [100]: +The maximum total time waiting for a publish result (ms) [6000]: +The maximum total time waiting for publishing, including retries [6000]: +The maximum total time waiting when shutting down (ms) [20000]: +Which level AWS libraries should log at [WARN]: +Should send messages asynchronously? [Y/n]? +``` \ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java index 888c7f1..a83b04b 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java
@@ -91,4 +91,25 @@ assertThat(configuration.isSendAsync()).isEqualTo(false); } + + @Test + public void shouldConfigureSendStreamEvents() { + pluginConfig.setBoolean("sendStreamEvents", true); + when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME)) + .thenReturn(pluginConfig.asPluginConfig()); + + Configuration configuration = new Configuration(pluginConfigFactoryMock, PLUGIN_NAME); + + assertThat(configuration.isSendStreamEvents()).isEqualTo(true); + } + + @Test + public void shouldDefaultSendStreamEvents() { + when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME)) + .thenReturn(pluginConfig.asPluginConfig()); + + Configuration configuration = new Configuration(pluginConfigFactoryMock, PLUGIN_NAME); + + assertThat(configuration.isSendStreamEvents()).isEqualTo(false); + } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java index f70a5be..7449440 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
@@ -57,6 +57,7 @@ private static final Duration WAIT_FOR_CONSUMPTION = Duration.ofSeconds(120); private static final Duration STREAM_CREATION_TIMEOUT = Duration.ofSeconds(10); private static final long SEND_TIMEOUT_MILLIS = 200; + private static final String STREAM_EVENTS = "stream_events"; private static final int LOCALSTACK_PORT = 4566; @@ -219,6 +220,28 @@ assertThat(result.get()).isTrue(); } + @Test + @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer") + @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon") + @GerritConfig(name = "plugin.events-aws-kinesis.topic", value = STREAM_EVENTS) + @GerritConfig(name = "plugin.events-aws-kinesis.sendStreamEvents", value = "true") + public void shouldSendStreamEventsWhenEnabled() throws Exception { + createStreamAndWait(STREAM_EVENTS, STREAM_CREATION_TIMEOUT); + + EventConsumerCounter eventConsumerCounter = new EventConsumerCounter(); + kinesisBroker().receiveAsync(STREAM_EVENTS, eventConsumerCounter); + + createChange(); + + // There are 4 events are received in the following order: + // 1. refUpdate: ref: refs/sequences/changes + // 2. refUpdate: ref: refs/changes/01/1/1 + // 3. refUpdate: ref: refs/changes/01/1/meta + // 4. patchset-created: ref: refs/changes/01/1/1 + WaitUtil.waitUntil( + () -> eventConsumerCounter.getConsumedMessages().size() == 4, WAIT_FOR_CONSUMPTION); + } + public KinesisBrokerApi kinesisBroker() { return (KinesisBrokerApi) plugin.getSysInjector().getInstance(BrokerApi.class); }