Revert "Remove publishing of stream events"

Revert "Remove publishing of stream events"

Revert "Remove publishing of stream events"

Revert submission 312871-centralize-stream-events-handling

Reason for revert: It broke existing functionality of events-* plugins
Reverted Changes:
I79a059deb:Remove publishing of stream events
I68906e9b4:Remove publishing of stream events
I632f7b900:Remove publishing of stream events
Iafe5a8155:Leverage stream events publishing from the events-...

Change-Id: I2d1e327bda1996eefcb3a775bc8be0a035318fe0
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
index 2fd72b1..ef7e698 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
@@ -31,6 +31,7 @@
 class Configuration {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
+  private static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit";
   private static final String DEFAULT_INITIAL_POSITION = "latest";
   private static final Long DEFAULT_POLLING_INTERVAL_MS = 1000L;
   private static final Integer DEFAULT_MAX_RECORDS = 100;
@@ -41,6 +42,7 @@
   private static final Boolean DEFAULT_SEND_ASYNC = true;
 
   private final String applicationName;
+  private final String streamEventsTopic;
   private final int numberOfSubscribers;
   private final InitialPositionInStream initialPosition;
   private final Optional<Region> region;
@@ -60,6 +62,7 @@
     this.region = Optional.ofNullable(getStringParam(pluginConfig, "region", null)).map(Region::of);
     this.endpoint =
         Optional.ofNullable(getStringParam(pluginConfig, "endpoint", null)).map(URI::create);
+    this.streamEventsTopic = getStringParam(pluginConfig, "topic", DEFAULT_STREAM_EVENTS_TOPIC);
     this.numberOfSubscribers =
         Integer.parseInt(
             getStringParam(pluginConfig, "numberOfSubscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS));
@@ -114,6 +117,10 @@
         endpoint.map(e -> String.format("|endpoint: %s", e.toASCIIString())).orElse(""));
   }
 
+  public String getStreamEventsTopic() {
+    return streamEventsTopic;
+  }
+
   public int getNumberOfSubscribers() {
     return numberOfSubscribers;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
index 292ffeb..158f8ac 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
@@ -23,6 +23,7 @@
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGson;
+import com.google.gerrit.server.events.EventListener;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -34,7 +35,7 @@
 import java.util.concurrent.TimeoutException;
 
 @Singleton
-class KinesisPublisher {
+class KinesisPublisher implements EventListener {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   private final KinesisProducer kinesisProducer;
@@ -55,6 +56,11 @@
     this.callBackExecutor = callBackExecutor;
   }
 
+  @Override
+  public void onEvent(Event event) {
+    publish(configuration.getStreamEventsTopic(), event);
+  }
+
   ListenableFuture<Boolean> publish(String streamName, Event event) {
     if (configuration.isSendAsync()) {
       return publishAsync(streamName, gson.toJson(event), event.getType());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
index 7868d3a..9234c67 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
@@ -24,6 +24,7 @@
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.events.EventListener;
 import com.google.inject.Inject;
 import com.google.inject.Scopes;
 import com.google.inject.TypeLiteral;
@@ -74,6 +75,7 @@
     DynamicItem.bind(binder(), BrokerApi.class).to(KinesisBrokerApi.class).in(Scopes.SINGLETON);
     DynamicSet.bind(binder(), LifecycleListener.class).to(KinesisBrokerLifeCycleManager.class);
     factory(KinesisConsumer.Factory.class);
+    DynamicSet.bind(binder(), EventListener.class).to(KinesisPublisher.class);
     listener().to(AWSLogLevelListener.class);
   }
 }
diff --git a/src/main/resources/Documentation/Config.md b/src/main/resources/Documentation/Config.md
index 816c723..03e3650 100644
--- a/src/main/resources/Documentation/Config.md
+++ b/src/main/resources/Documentation/Config.md
@@ -91,6 +91,11 @@
   Default: WARN
   Allowed values:OFF|FATAL|ERROR|WARN|INFO|DEBUG|TRACE|ALL
 
+`plugin.events-aws-kinesis.streamEventsTopic`
+:   Optional. Name of the kinesis topic for stream events. events-aws-kinesis
+    plugin exposes all stream events under this topic name.
+    Default: gerrit
+
 `plugin.events-aws-kinesis.sendAsync`
 :   Optional. Whether to send messages to Kinesis asynchronously, without
     waiting for the result of the operation.