Remove publishing of stream events
From version 3.5.0-alpha-202107290338 events-broker library
provides an implementation of StreamEventPublisher which is
bound by multi-site directly in [1], removing the need for registering
any event listener in events-aws-kinesis.
[1] https://gerrit-review.googlesource.com/c/plugins/multi-site/+/312871
Bug: Issue 14837
Change-Id: I632f7b9001c9719fbc23928a4e93688d3a1070d4
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
index ef7e698..2fd72b1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
@@ -31,7 +31,6 @@
class Configuration {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
- private static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit";
private static final String DEFAULT_INITIAL_POSITION = "latest";
private static final Long DEFAULT_POLLING_INTERVAL_MS = 1000L;
private static final Integer DEFAULT_MAX_RECORDS = 100;
@@ -42,7 +41,6 @@
private static final Boolean DEFAULT_SEND_ASYNC = true;
private final String applicationName;
- private final String streamEventsTopic;
private final int numberOfSubscribers;
private final InitialPositionInStream initialPosition;
private final Optional<Region> region;
@@ -62,7 +60,6 @@
this.region = Optional.ofNullable(getStringParam(pluginConfig, "region", null)).map(Region::of);
this.endpoint =
Optional.ofNullable(getStringParam(pluginConfig, "endpoint", null)).map(URI::create);
- this.streamEventsTopic = getStringParam(pluginConfig, "topic", DEFAULT_STREAM_EVENTS_TOPIC);
this.numberOfSubscribers =
Integer.parseInt(
getStringParam(pluginConfig, "numberOfSubscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS));
@@ -117,10 +114,6 @@
endpoint.map(e -> String.format("|endpoint: %s", e.toASCIIString())).orElse(""));
}
- public String getStreamEventsTopic() {
- return streamEventsTopic;
- }
-
public int getNumberOfSubscribers() {
return numberOfSubscribers;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
index 158f8ac..292ffeb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
@@ -23,7 +23,6 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventGson;
-import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -35,7 +34,7 @@
import java.util.concurrent.TimeoutException;
@Singleton
-class KinesisPublisher implements EventListener {
+class KinesisPublisher {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final KinesisProducer kinesisProducer;
@@ -56,11 +55,6 @@
this.callBackExecutor = callBackExecutor;
}
- @Override
- public void onEvent(Event event) {
- publish(configuration.getStreamEventsTopic(), event);
- }
-
ListenableFuture<Boolean> publish(String streamName, Event event) {
if (configuration.isSendAsync()) {
return publishAsync(streamName, gson.toJson(event), event.getType());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
index 9234c67..7868d3a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
@@ -24,7 +24,6 @@
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.gerrit.server.events.EventListener;
import com.google.inject.Inject;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
@@ -75,7 +74,6 @@
DynamicItem.bind(binder(), BrokerApi.class).to(KinesisBrokerApi.class).in(Scopes.SINGLETON);
DynamicSet.bind(binder(), LifecycleListener.class).to(KinesisBrokerLifeCycleManager.class);
factory(KinesisConsumer.Factory.class);
- DynamicSet.bind(binder(), EventListener.class).to(KinesisPublisher.class);
listener().to(AWSLogLevelListener.class);
}
}
diff --git a/src/main/resources/Documentation/Config.md b/src/main/resources/Documentation/Config.md
index 03e3650..816c723 100644
--- a/src/main/resources/Documentation/Config.md
+++ b/src/main/resources/Documentation/Config.md
@@ -91,11 +91,6 @@
Default: WARN
Allowed values:OFF|FATAL|ERROR|WARN|INFO|DEBUG|TRACE|ALL
-`plugin.events-aws-kinesis.streamEventsTopic`
-: Optional. Name of the kinesis topic for stream events. events-aws-kinesis
- plugin exposes all stream events under this topic name.
- Default: gerrit
-
`plugin.events-aws-kinesis.sendAsync`
: Optional. Whether to send messages to Kinesis asynchronously, without
waiting for the result of the operation.