Remove publishing of stream events From version 3.5.0-alpha-202107290338 events-broker library provides an implementation of StreamEventPublisher which is bound by multi-site directly in [1], removing the need for registering any event listener in events-aws-kinesis. [1] https://gerrit-review.googlesource.com/c/plugins/multi-site/+/312871 Bug: Issue 14837 Change-Id: I632f7b9001c9719fbc23928a4e93688d3a1070d4
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java index ef7e698..2fd72b1 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java +++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
@@ -31,7 +31,6 @@ class Configuration { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6"; - private static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit"; private static final String DEFAULT_INITIAL_POSITION = "latest"; private static final Long DEFAULT_POLLING_INTERVAL_MS = 1000L; private static final Integer DEFAULT_MAX_RECORDS = 100; @@ -42,7 +41,6 @@ private static final Boolean DEFAULT_SEND_ASYNC = true; private final String applicationName; - private final String streamEventsTopic; private final int numberOfSubscribers; private final InitialPositionInStream initialPosition; private final Optional<Region> region; @@ -62,7 +60,6 @@ this.region = Optional.ofNullable(getStringParam(pluginConfig, "region", null)).map(Region::of); this.endpoint = Optional.ofNullable(getStringParam(pluginConfig, "endpoint", null)).map(URI::create); - this.streamEventsTopic = getStringParam(pluginConfig, "topic", DEFAULT_STREAM_EVENTS_TOPIC); this.numberOfSubscribers = Integer.parseInt( getStringParam(pluginConfig, "numberOfSubscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS)); @@ -117,10 +114,6 @@ endpoint.map(e -> String.format("|endpoint: %s", e.toASCIIString())).orElse("")); } - public String getStreamEventsTopic() { - return streamEventsTopic; - } - public int getNumberOfSubscribers() { return numberOfSubscribers; }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java index 158f8ac..292ffeb 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java +++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
@@ -23,7 +23,6 @@ import com.google.common.util.concurrent.SettableFuture; import com.google.gerrit.server.events.Event; import com.google.gerrit.server.events.EventGson; -import com.google.gerrit.server.events.EventListener; import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -35,7 +34,7 @@ import java.util.concurrent.TimeoutException; @Singleton -class KinesisPublisher implements EventListener { +class KinesisPublisher { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private final KinesisProducer kinesisProducer; @@ -56,11 +55,6 @@ this.callBackExecutor = callBackExecutor; } - @Override - public void onEvent(Event event) { - publish(configuration.getStreamEventsTopic(), event); - } - ListenableFuture<Boolean> publish(String streamName, Event event) { if (configuration.isSendAsync()) { return publishAsync(streamName, gson.toJson(event), event.getType());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java index 9234c67..7868d3a 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java +++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
@@ -24,7 +24,6 @@ import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.extensions.registration.DynamicSet; import com.google.gerrit.lifecycle.LifecycleModule; -import com.google.gerrit.server.events.EventListener; import com.google.inject.Inject; import com.google.inject.Scopes; import com.google.inject.TypeLiteral; @@ -75,7 +74,6 @@ DynamicItem.bind(binder(), BrokerApi.class).to(KinesisBrokerApi.class).in(Scopes.SINGLETON); DynamicSet.bind(binder(), LifecycleListener.class).to(KinesisBrokerLifeCycleManager.class); factory(KinesisConsumer.Factory.class); - DynamicSet.bind(binder(), EventListener.class).to(KinesisPublisher.class); listener().to(AWSLogLevelListener.class); } }
diff --git a/src/main/resources/Documentation/Config.md b/src/main/resources/Documentation/Config.md index 03e3650..816c723 100644 --- a/src/main/resources/Documentation/Config.md +++ b/src/main/resources/Documentation/Config.md
@@ -91,11 +91,6 @@ Default: WARN Allowed values:OFF|FATAL|ERROR|WARN|INFO|DEBUG|TRACE|ALL -`plugin.events-aws-kinesis.streamEventsTopic` -: Optional. Name of the kinesis topic for stream events. events-aws-kinesis - plugin exposes all stream events under this topic name. - Default: gerrit - `plugin.events-aws-kinesis.sendAsync` : Optional. Whether to send messages to Kinesis asynchronously, without waiting for the result of the operation.