Revert "Remove publishing of stream events"
Revert "Remove publishing of stream events"
Revert "Remove publishing of stream events"
Revert submission 312871-centralize-stream-events-handling
Reason for revert: It broke existing functionality of events-* plugins
Reverted Changes:
I79a059deb:Remove publishing of stream events
I68906e9b4:Remove publishing of stream events
I632f7b900:Remove publishing of stream events
Iafe5a8155:Leverage stream events publishing from the events-...
Change-Id: I2d1e327bda1996eefcb3a775bc8be0a035318fe0
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
index 2fd72b1..ef7e698 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
@@ -31,6 +31,7 @@
class Configuration {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
+ private static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit";
private static final String DEFAULT_INITIAL_POSITION = "latest";
private static final Long DEFAULT_POLLING_INTERVAL_MS = 1000L;
private static final Integer DEFAULT_MAX_RECORDS = 100;
@@ -41,6 +42,7 @@
private static final Boolean DEFAULT_SEND_ASYNC = true;
private final String applicationName;
+ private final String streamEventsTopic;
private final int numberOfSubscribers;
private final InitialPositionInStream initialPosition;
private final Optional<Region> region;
@@ -60,6 +62,7 @@
this.region = Optional.ofNullable(getStringParam(pluginConfig, "region", null)).map(Region::of);
this.endpoint =
Optional.ofNullable(getStringParam(pluginConfig, "endpoint", null)).map(URI::create);
+ this.streamEventsTopic = getStringParam(pluginConfig, "topic", DEFAULT_STREAM_EVENTS_TOPIC);
this.numberOfSubscribers =
Integer.parseInt(
getStringParam(pluginConfig, "numberOfSubscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS));
@@ -114,6 +117,10 @@
endpoint.map(e -> String.format("|endpoint: %s", e.toASCIIString())).orElse(""));
}
+ public String getStreamEventsTopic() {
+ return streamEventsTopic;
+ }
+
public int getNumberOfSubscribers() {
return numberOfSubscribers;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
index 292ffeb..158f8ac 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
@@ -23,6 +23,7 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventGson;
+import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -34,7 +35,7 @@
import java.util.concurrent.TimeoutException;
@Singleton
-class KinesisPublisher {
+class KinesisPublisher implements EventListener {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final KinesisProducer kinesisProducer;
@@ -55,6 +56,11 @@
this.callBackExecutor = callBackExecutor;
}
+ @Override
+ public void onEvent(Event event) {
+ publish(configuration.getStreamEventsTopic(), event);
+ }
+
ListenableFuture<Boolean> publish(String streamName, Event event) {
if (configuration.isSendAsync()) {
return publishAsync(streamName, gson.toJson(event), event.getType());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
index 7868d3a..9234c67 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
@@ -24,6 +24,7 @@
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.events.EventListener;
import com.google.inject.Inject;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
@@ -74,6 +75,7 @@
DynamicItem.bind(binder(), BrokerApi.class).to(KinesisBrokerApi.class).in(Scopes.SINGLETON);
DynamicSet.bind(binder(), LifecycleListener.class).to(KinesisBrokerLifeCycleManager.class);
factory(KinesisConsumer.Factory.class);
+ DynamicSet.bind(binder(), EventListener.class).to(KinesisPublisher.class);
listener().to(AWSLogLevelListener.class);
}
}
diff --git a/src/main/resources/Documentation/Config.md b/src/main/resources/Documentation/Config.md
index 816c723..03e3650 100644
--- a/src/main/resources/Documentation/Config.md
+++ b/src/main/resources/Documentation/Config.md
@@ -91,6 +91,11 @@
Default: WARN
Allowed values:OFF|FATAL|ERROR|WARN|INFO|DEBUG|TRACE|ALL
+`plugin.events-aws-kinesis.streamEventsTopic`
+: Optional. Name of the kinesis topic for stream events. events-aws-kinesis
+ plugin exposes all stream events under this topic name.
+ Default: gerrit
+
`plugin.events-aws-kinesis.sendAsync`
: Optional. Whether to send messages to Kinesis asynchronously, without
waiting for the result of the operation.