Only publish stream events when sendStreamEvents is set.

The publishing of stream events is only enabled when the
"sendStreamEvents' configuration is set as follows:

[plugin "events-kafka"]
   sendStreamEvents=true

Note that this constitutes a breaking change compared to the previous
stable version (stable-3.4), in which stream events were published by
default.

This is because multi-site now already publishes stream events
[see Iafe5a8155] by binding directly the StreamEventsPublisher from the
events-broker library. Thus direct publishing from events-kafka would be
redundant and cause twice as many messages to be published.

Bug: Issue 14910
Change-Id: Ie77ad7d10fe963568499c17aa6faa512006e30db
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index 2768dd3..618fea8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -21,6 +21,7 @@
 import com.google.inject.Inject;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaPublisherProperties;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -28,16 +29,21 @@
 class Module extends AbstractModule {
 
   private final KafkaApiModule kafkaBrokerModule;
+  private final KafkaPublisherProperties configuration;
 
   @Inject
-  public Module(KafkaApiModule kafkaBrokerModule) {
+  public Module(KafkaApiModule kafkaBrokerModule, KafkaPublisherProperties configuration) {
     this.kafkaBrokerModule = kafkaBrokerModule;
+    this.configuration = configuration;
   }
 
   @Override
   protected void configure() {
     DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
-    DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
+
+    if (configuration.isSendStreamEvents()) {
+      DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
+    }
 
     bind(new TypeLiteral<KafkaProducer<String, String>>() {})
         .toProvider(KafkaProducerProvider.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index 72d7f91..d7be825 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -35,6 +35,7 @@
 
   private final String topic;
   private final boolean sendAsync;
+  private final boolean sendStreamEvents;
 
   @Inject
   public KafkaProperties(PluginConfigFactory configFactory, @PluginName String pluginName) {
@@ -43,6 +44,7 @@
     PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
     topic = fromGerritConfig.getString("topic", "gerrit");
     sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
+    sendStreamEvents = fromGerritConfig.getBoolean("sendStreamEvents", false);
     applyConfig(fromGerritConfig);
     initDockerizedKafkaServer();
   }
@@ -53,6 +55,7 @@
     setDefaults();
     topic = "gerrit";
     this.sendAsync = sendAsync;
+    this.sendStreamEvents = true;
     initDockerizedKafkaServer();
   }
 
@@ -99,4 +102,8 @@
   public String getBootstrapServers() {
     return getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
   }
+
+  public boolean isSendStreamEvents() {
+    return sendStreamEvents;
+  }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 727b7e5..0ec8e2a 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -48,3 +48,12 @@
 :	Send messages to Kafka asynchronously, detaching the calling process from the
 	acknowledge of the message being sent.
 	Default: true
+
+`plugin.@PLUGIN@.topic`
+:   Send all gerrit stream events to this topic (when `sendStreamEvents` is set
+    to `true`).
+    Default: gerrit
+
+`plugin.@PLUGIN@.sendStreamEvents`
+:   Whether to send stream events to the `topic` topic.
+    Default: false
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index bd47223..6e654bf 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -86,27 +86,9 @@
   @GerritConfig(
       name = "plugin.events-kafka.valueDeserializer",
       value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(name = "plugin.events-kafka.sendStreamEvents", value = "true")
   public void consumeEvents() throws Exception {
-    PushOneCommit.Result r = createChange();
-
-    ReviewInput in = ReviewInput.recommend();
-    in.message = "LGTM";
-    gApi.changes().id(r.getChangeId()).revision("current").review(in);
-    List<ChangeMessageInfo> messages =
-        new ArrayList<>(gApi.changes().id(r.getChangeId()).get().messages);
-    assertThat(messages).hasSize(2);
-    String expectedMessage = "Patch Set 1: Code-Review+1\n\nLGTM";
-    assertThat(messages.get(1).message).isEqualTo(expectedMessage);
-
-    List<String> events = new ArrayList<>();
-    KafkaProperties kafkaProperties = kafkaProperties();
-    try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties)) {
-      consumer.subscribe(Collections.singleton(kafkaProperties.getTopic()));
-      ConsumerRecords<String, String> records = consumer.poll(KAFKA_POLL_TIMEOUT);
-      for (ConsumerRecord<String, String> record : records) {
-        events.add(record.value());
-      }
-    }
+    List<String> events = reviewNewChangeAndGetStreamEvents();
 
     // There are 6 events are received in the following order:
     // 1. refUpdate:        ref: refs/sequences/changes
@@ -124,7 +106,23 @@
     assertThat(event).isInstanceOf(CommentAddedEvent.class);
 
     CommentAddedEvent commentAddedEvent = (CommentAddedEvent) event;
-    assertThat(commentAddedEvent.comment).isEqualTo(expectedMessage);
+    assertThat(commentAddedEvent.comment).isEqualTo("Patch Set 1: Code-Review+1\n\nLGTM");
+  }
+
+  @Test
+  @UseLocalDisk
+  @GerritConfig(name = "plugin.events-kafka.groupId", value = "test-consumer-group")
+  @GerritConfig(
+      name = "plugin.events-kafka.keyDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(
+      name = "plugin.events-kafka.valueDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(name = "plugin.events-kafka.sendStreamEvents", value = "false")
+  public void shouldNotSendStreamEventsWhenDisabled() throws Exception {
+    List<String> events = reviewNewChangeAndGetStreamEvents();
+
+    assertThat(events).isEmpty();
   }
 
   @Test
@@ -169,6 +167,30 @@
     return plugin.getSysInjector().getInstance(KafkaProperties.class);
   }
 
+  private List<String> reviewNewChangeAndGetStreamEvents() throws Exception {
+    PushOneCommit.Result r = createChange();
+
+    ReviewInput in = ReviewInput.recommend();
+    in.message = "LGTM";
+    gApi.changes().id(r.getChangeId()).revision("current").review(in);
+    List<ChangeMessageInfo> messages =
+        new ArrayList<>(gApi.changes().id(r.getChangeId()).get().messages);
+    assertThat(messages).hasSize(2);
+    String expectedMessage = "Patch Set 1: Code-Review+1\n\nLGTM";
+    assertThat(messages.get(1).message).isEqualTo(expectedMessage);
+
+    List<String> events = new ArrayList<>();
+    KafkaProperties kafkaProperties = kafkaProperties();
+    try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties)) {
+      consumer.subscribe(Collections.singleton(kafkaProperties.getTopic()));
+      ConsumerRecords<String, String> records = consumer.poll(KAFKA_POLL_TIMEOUT);
+      for (ConsumerRecord<String, String> record : records) {
+        events.add(record.value());
+      }
+    }
+    return events;
+  }
+
   // XXX: Remove this method when merging into stable-3.3, since waitUntil is
   // available in Gerrit core.
   public static void waitUntil(Supplier<Boolean> waitCondition, Duration timeout)