Only publish stream events when sendStreamEvents is set.
The publishing of stream events is only enabled when the
"sendStreamEvents' configuration is set as follows:
[plugin "events-kafka"]
sendStreamEvents=true
Note that this constitutes a breaking change compared to the previous
stable version (stable-3.4), in which stream events were published by
default.
This is because multi-site now already publishes stream events
[see Iafe5a8155] by binding directly the StreamEventsPublisher from the
events-broker library. Thus direct publishing from events-kafka would be
redundant and cause twice as many messages to be published.
Bug: Issue 14910
Change-Id: Ie77ad7d10fe963568499c17aa6faa512006e30db
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index 2768dd3..618fea8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -21,6 +21,7 @@
import com.google.inject.Inject;
import com.google.inject.TypeLiteral;
import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaPublisherProperties;
import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -28,16 +29,21 @@
class Module extends AbstractModule {
private final KafkaApiModule kafkaBrokerModule;
+ private final KafkaPublisherProperties configuration;
@Inject
- public Module(KafkaApiModule kafkaBrokerModule) {
+ public Module(KafkaApiModule kafkaBrokerModule, KafkaPublisherProperties configuration) {
this.kafkaBrokerModule = kafkaBrokerModule;
+ this.configuration = configuration;
}
@Override
protected void configure() {
DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
- DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
+
+ if (configuration.isSendStreamEvents()) {
+ DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
+ }
bind(new TypeLiteral<KafkaProducer<String, String>>() {})
.toProvider(KafkaProducerProvider.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index 72d7f91..d7be825 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -35,6 +35,7 @@
private final String topic;
private final boolean sendAsync;
+ private final boolean sendStreamEvents;
@Inject
public KafkaProperties(PluginConfigFactory configFactory, @PluginName String pluginName) {
@@ -43,6 +44,7 @@
PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
topic = fromGerritConfig.getString("topic", "gerrit");
sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
+ sendStreamEvents = fromGerritConfig.getBoolean("sendStreamEvents", false);
applyConfig(fromGerritConfig);
initDockerizedKafkaServer();
}
@@ -53,6 +55,7 @@
setDefaults();
topic = "gerrit";
this.sendAsync = sendAsync;
+ this.sendStreamEvents = true;
initDockerizedKafkaServer();
}
@@ -99,4 +102,8 @@
public String getBootstrapServers() {
return getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
}
+
+ public boolean isSendStreamEvents() {
+ return sendStreamEvents;
+ }
}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 727b7e5..0ec8e2a 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -48,3 +48,12 @@
: Send messages to Kafka asynchronously, detaching the calling process from the
acknowledge of the message being sent.
Default: true
+
+`plugin.@PLUGIN@.topic`
+: Send all gerrit stream events to this topic (when `sendStreamEvents` is set
+ to `true`).
+ Default: gerrit
+
+`plugin.@PLUGIN@.sendStreamEvents`
+: Whether to send stream events to the `topic` topic.
+ Default: false
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index bd47223..6e654bf 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -86,27 +86,9 @@
@GerritConfig(
name = "plugin.events-kafka.valueDeserializer",
value = "org.apache.kafka.common.serialization.StringDeserializer")
+ @GerritConfig(name = "plugin.events-kafka.sendStreamEvents", value = "true")
public void consumeEvents() throws Exception {
- PushOneCommit.Result r = createChange();
-
- ReviewInput in = ReviewInput.recommend();
- in.message = "LGTM";
- gApi.changes().id(r.getChangeId()).revision("current").review(in);
- List<ChangeMessageInfo> messages =
- new ArrayList<>(gApi.changes().id(r.getChangeId()).get().messages);
- assertThat(messages).hasSize(2);
- String expectedMessage = "Patch Set 1: Code-Review+1\n\nLGTM";
- assertThat(messages.get(1).message).isEqualTo(expectedMessage);
-
- List<String> events = new ArrayList<>();
- KafkaProperties kafkaProperties = kafkaProperties();
- try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties)) {
- consumer.subscribe(Collections.singleton(kafkaProperties.getTopic()));
- ConsumerRecords<String, String> records = consumer.poll(KAFKA_POLL_TIMEOUT);
- for (ConsumerRecord<String, String> record : records) {
- events.add(record.value());
- }
- }
+ List<String> events = reviewNewChangeAndGetStreamEvents();
// There are 6 events are received in the following order:
// 1. refUpdate: ref: refs/sequences/changes
@@ -124,7 +106,23 @@
assertThat(event).isInstanceOf(CommentAddedEvent.class);
CommentAddedEvent commentAddedEvent = (CommentAddedEvent) event;
- assertThat(commentAddedEvent.comment).isEqualTo(expectedMessage);
+ assertThat(commentAddedEvent.comment).isEqualTo("Patch Set 1: Code-Review+1\n\nLGTM");
+ }
+
+ @Test
+ @UseLocalDisk
+ @GerritConfig(name = "plugin.events-kafka.groupId", value = "test-consumer-group")
+ @GerritConfig(
+ name = "plugin.events-kafka.keyDeserializer",
+ value = "org.apache.kafka.common.serialization.StringDeserializer")
+ @GerritConfig(
+ name = "plugin.events-kafka.valueDeserializer",
+ value = "org.apache.kafka.common.serialization.StringDeserializer")
+ @GerritConfig(name = "plugin.events-kafka.sendStreamEvents", value = "false")
+ public void shouldNotSendStreamEventsWhenDisabled() throws Exception {
+ List<String> events = reviewNewChangeAndGetStreamEvents();
+
+ assertThat(events).isEmpty();
}
@Test
@@ -169,6 +167,30 @@
return plugin.getSysInjector().getInstance(KafkaProperties.class);
}
+ private List<String> reviewNewChangeAndGetStreamEvents() throws Exception {
+ PushOneCommit.Result r = createChange();
+
+ ReviewInput in = ReviewInput.recommend();
+ in.message = "LGTM";
+ gApi.changes().id(r.getChangeId()).revision("current").review(in);
+ List<ChangeMessageInfo> messages =
+ new ArrayList<>(gApi.changes().id(r.getChangeId()).get().messages);
+ assertThat(messages).hasSize(2);
+ String expectedMessage = "Patch Set 1: Code-Review+1\n\nLGTM";
+ assertThat(messages.get(1).message).isEqualTo(expectedMessage);
+
+ List<String> events = new ArrayList<>();
+ KafkaProperties kafkaProperties = kafkaProperties();
+ try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties)) {
+ consumer.subscribe(Collections.singleton(kafkaProperties.getTopic()));
+ ConsumerRecords<String, String> records = consumer.poll(KAFKA_POLL_TIMEOUT);
+ for (ConsumerRecord<String, String> record : records) {
+ events.add(record.value());
+ }
+ }
+ return events;
+ }
+
// XXX: Remove this method when merging into stable-3.3, since waitUntil is
// available in Gerrit core.
public static void waitUntil(Supplier<Boolean> waitCondition, Duration timeout)