Revert "Remove publishing of stream events"

Revert submission 312871-centralize-stream-events-handling

Reason for revert: It broke existing functionality of events-* plugins
Reverted Changes:
I79a059deb:Remove publishing of stream events
I68906e9b4:Remove publishing of stream events
I632f7b900:Remove publishing of stream events
Iafe5a8155:Leverage stream events publishing from the events-...

Change-Id: Ibe7494cf802cb3f180a1b71cd0e0f3a95eacfc4e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index d285146..2768dd3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -16,10 +16,12 @@
 
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventListener;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
+import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
 import org.apache.kafka.clients.producer.KafkaProducer;
 
@@ -35,6 +37,7 @@
   @Override
   protected void configure() {
     DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
+    DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
 
     bind(new TypeLiteral<KafkaProducer<String, String>>() {})
         .toProvider(KafkaProducerProvider.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index a45549c..72d7f91 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -33,6 +33,7 @@
 
   public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
 
+  private final String topic;
   private final boolean sendAsync;
 
   @Inject
@@ -40,6 +41,7 @@
     super();
     setDefaults();
     PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
+    topic = fromGerritConfig.getString("topic", "gerrit");
     sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
     applyConfig(fromGerritConfig);
     initDockerizedKafkaServer();
@@ -49,6 +51,7 @@
   public KafkaProperties(boolean sendAsync) {
     super();
     setDefaults();
+    topic = "gerrit";
     this.sendAsync = sendAsync;
     initDockerizedKafkaServer();
   }
@@ -85,6 +88,10 @@
     }
   }
 
+  public String getTopic() {
+    return topic;
+  }
+
   public boolean isSendAsync() {
     return sendAsync;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
index 716daf6..e7670cb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
@@ -18,6 +18,7 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGson;
+import com.google.gerrit.server.events.EventListener;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import com.google.inject.Inject;
@@ -25,7 +26,7 @@
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
 
 @Singleton
-public class KafkaPublisher {
+public class KafkaPublisher implements EventListener {
 
   private final KafkaSession session;
   private final Gson gson;
@@ -46,6 +47,13 @@
     session.disconnect();
   }
 
+  @Override
+  public void onEvent(Event event) {
+    if (session.isOpen()) {
+      session.publish(gson.toJson(event));
+    }
+  }
+
   public ListenableFuture<Boolean> publish(String topic, Event event) {
     return session.publish(topic, getPayload(event));
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index 2581bda..0dc29e1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -86,6 +86,10 @@
     producer = null;
   }
 
+  public ListenableFuture<Boolean> publish(String messageBody) {
+    return publish(properties.getTopic(), messageBody);
+  }
+
   public ListenableFuture<Boolean> publish(String topic, String messageBody) {
     if (properties.isSendAsync()) {
       return publishAsync(topic, messageBody);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index ca0b01b..bd47223 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -29,15 +29,11 @@
 import com.google.gerrit.acceptance.config.GerritConfig;
 import com.google.gerrit.extensions.api.changes.ReviewInput;
 import com.google.gerrit.extensions.common.ChangeMessageInfo;
-import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.CommentAddedEvent;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGsonProvider;
-import com.google.gerrit.server.events.EventListener;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
 import com.google.gson.Gson;
-import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -53,31 +49,12 @@
 import org.testcontainers.containers.KafkaContainer;
 
 @NoHttpd
-@TestPlugin(
-    name = "events-kafka",
-    sysModule = "com.googlesource.gerrit.plugins.kafka.EventConsumerIT$TestModule")
+@TestPlugin(name = "events-kafka", sysModule = "com.googlesource.gerrit.plugins.kafka.Module")
 public class EventConsumerIT extends LightweightPluginDaemonTest {
-
   static final long KAFKA_POLL_TIMEOUT = 10000L;
-  static final String TEST_EVENTS_TOPIC = "test-events-topic";
 
   private KafkaContainer kafka;
 
-  public static class TestModule extends AbstractModule {
-    private static Module kafkaModule;
-
-    @Inject
-    TestModule(Module kafkaModule) {
-      this.kafkaModule = kafkaModule;
-    }
-
-    @Override
-    protected void configure() {
-      install(kafkaModule);
-      DynamicSet.bind(binder(), EventListener.class).to(TestKafkaEventListener.class);
-    }
-  }
-
   @Override
   public void setUpTestPlugin() throws Exception {
     try {
@@ -123,9 +100,8 @@
 
     List<String> events = new ArrayList<>();
     KafkaProperties kafkaProperties = kafkaProperties();
-
     try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties)) {
-      consumer.subscribe(Collections.singleton(TEST_EVENTS_TOPIC));
+      consumer.subscribe(Collections.singleton(kafkaProperties.getTopic()));
       ConsumerRecords<String, String> records = consumer.poll(KAFKA_POLL_TIMEOUT);
       for (ConsumerRecord<String, String> record : records) {
         events.add(record.value());
@@ -205,18 +181,4 @@
       MILLISECONDS.sleep(50);
     }
   }
-
-  public static class TestKafkaEventListener implements EventListener {
-    private final BrokerApi brokerApi;
-
-    @Inject
-    TestKafkaEventListener(BrokerApi brokerApi) {
-      this.brokerApi = brokerApi;
-    }
-
-    @Override
-    public void onEvent(Event event) {
-      brokerApi.send(TEST_EVENTS_TOPIC, event);
-    }
-  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
index f22198d..5aa9ca8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
@@ -51,6 +51,7 @@
   @Before
   public void setUp() {
     when(producerProvider.get()).thenReturn(kafkaProducer);
+    when(properties.getTopic()).thenReturn(topic);
 
     recordMetadata = new RecordMetadata(new TopicPartition(topic, 0), 0L, 0L, 0L, 0L, 0, 0);
 
@@ -62,7 +63,7 @@
   public void shouldIncrementBrokerMetricCounterWhenMessagePublishedInSyncMode() {
     when(properties.isSendAsync()).thenReturn(false);
     when(kafkaProducer.send(any())).thenReturn(Futures.immediateFuture(recordMetadata));
-    objectUnderTest.publish("anyTopic", message);
+    objectUnderTest.publish(message);
     verify(publisherMetrics, only()).incrementBrokerPublishedMessage();
   }
 
@@ -70,7 +71,7 @@
   public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInSyncMode() {
     when(properties.isSendAsync()).thenReturn(false);
     when(kafkaProducer.send(any())).thenReturn(Futures.immediateFailedFuture(new Exception()));
-    objectUnderTest.publish("anyTopic", message);
+    objectUnderTest.publish(message);
     verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
   }
 
@@ -79,7 +80,7 @@
     when(properties.isSendAsync()).thenReturn(false);
     when(kafkaProducer.send(any())).thenThrow(new RuntimeException("Unexpected runtime exception"));
     try {
-      objectUnderTest.publish("anyTopic", message);
+      objectUnderTest.publish(message);
     } catch (RuntimeException e) {
       // expected
     }
@@ -91,7 +92,7 @@
     when(properties.isSendAsync()).thenReturn(true);
     when(kafkaProducer.send(any(), any())).thenReturn(Futures.immediateFuture(recordMetadata));
 
-    objectUnderTest.publish("anyTopic", message);
+    objectUnderTest.publish(message);
 
     verify(kafkaProducer).send(any(), callbackCaptor.capture());
     callbackCaptor.getValue().onCompletion(recordMetadata, null);
@@ -104,7 +105,7 @@
     when(kafkaProducer.send(any(), any()))
         .thenReturn(Futures.immediateFailedFuture(new Exception()));
 
-    objectUnderTest.publish("anyTopic", message);
+    objectUnderTest.publish(message);
 
     verify(kafkaProducer).send(any(), callbackCaptor.capture());
     callbackCaptor.getValue().onCompletion(null, new Exception());
@@ -117,7 +118,7 @@
     when(kafkaProducer.send(any(), any()))
         .thenThrow(new RuntimeException("Unexpected runtime exception"));
     try {
-      objectUnderTest.publish("anyTopic", message);
+      objectUnderTest.publish(message);
     } catch (RuntimeException e) {
       // expected
     }