Revert "Remove publishing of stream events"
Revert submission 312871-centralize-stream-events-handling
Reason for revert: It broke existing functionality of events-* plugins
Reverted Changes:
I79a059deb:Remove publishing of stream events
I68906e9b4:Remove publishing of stream events
I632f7b900:Remove publishing of stream events
Iafe5a8155:Leverage stream events publishing from the events-...
Change-Id: Ibe7494cf802cb3f180a1b71cd0e0f3a95eacfc4e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index d285146..2768dd3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -16,10 +16,12 @@
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventListener;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.TypeLiteral;
import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
+import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -35,6 +37,7 @@
@Override
protected void configure() {
DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
+ DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
bind(new TypeLiteral<KafkaProducer<String, String>>() {})
.toProvider(KafkaProducerProvider.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index a45549c..72d7f91 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -33,6 +33,7 @@
public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
+ private final String topic;
private final boolean sendAsync;
@Inject
@@ -40,6 +41,7 @@
super();
setDefaults();
PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
+ topic = fromGerritConfig.getString("topic", "gerrit");
sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
applyConfig(fromGerritConfig);
initDockerizedKafkaServer();
@@ -49,6 +51,7 @@
public KafkaProperties(boolean sendAsync) {
super();
setDefaults();
+ topic = "gerrit";
this.sendAsync = sendAsync;
initDockerizedKafkaServer();
}
@@ -85,6 +88,10 @@
}
}
+ public String getTopic() {
+ return topic;
+ }
+
public boolean isSendAsync() {
return sendAsync;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
index 716daf6..e7670cb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
@@ -18,6 +18,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventGson;
+import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.inject.Inject;
@@ -25,7 +26,7 @@
import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
@Singleton
-public class KafkaPublisher {
+public class KafkaPublisher implements EventListener {
private final KafkaSession session;
private final Gson gson;
@@ -46,6 +47,13 @@
session.disconnect();
}
+ @Override
+ public void onEvent(Event event) {
+ if (session.isOpen()) {
+ session.publish(gson.toJson(event));
+ }
+ }
+
public ListenableFuture<Boolean> publish(String topic, Event event) {
return session.publish(topic, getPayload(event));
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index 2581bda..0dc29e1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -86,6 +86,10 @@
producer = null;
}
+ public ListenableFuture<Boolean> publish(String messageBody) {
+ return publish(properties.getTopic(), messageBody);
+ }
+
public ListenableFuture<Boolean> publish(String topic, String messageBody) {
if (properties.isSendAsync()) {
return publishAsync(topic, messageBody);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index ca0b01b..bd47223 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -29,15 +29,11 @@
import com.google.gerrit.acceptance.config.GerritConfig;
import com.google.gerrit.extensions.api.changes.ReviewInput;
import com.google.gerrit.extensions.common.ChangeMessageInfo;
-import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.events.CommentAddedEvent;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventGsonProvider;
-import com.google.gerrit.server.events.EventListener;
import com.google.gerrit.server.events.ProjectCreatedEvent;
import com.google.gson.Gson;
-import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
import java.time.Duration;
import java.util.ArrayList;
@@ -53,31 +49,12 @@
import org.testcontainers.containers.KafkaContainer;
@NoHttpd
-@TestPlugin(
- name = "events-kafka",
- sysModule = "com.googlesource.gerrit.plugins.kafka.EventConsumerIT$TestModule")
+@TestPlugin(name = "events-kafka", sysModule = "com.googlesource.gerrit.plugins.kafka.Module")
public class EventConsumerIT extends LightweightPluginDaemonTest {
-
static final long KAFKA_POLL_TIMEOUT = 10000L;
- static final String TEST_EVENTS_TOPIC = "test-events-topic";
private KafkaContainer kafka;
- public static class TestModule extends AbstractModule {
- private static Module kafkaModule;
-
- @Inject
- TestModule(Module kafkaModule) {
- this.kafkaModule = kafkaModule;
- }
-
- @Override
- protected void configure() {
- install(kafkaModule);
- DynamicSet.bind(binder(), EventListener.class).to(TestKafkaEventListener.class);
- }
- }
-
@Override
public void setUpTestPlugin() throws Exception {
try {
@@ -123,9 +100,8 @@
List<String> events = new ArrayList<>();
KafkaProperties kafkaProperties = kafkaProperties();
-
try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties)) {
- consumer.subscribe(Collections.singleton(TEST_EVENTS_TOPIC));
+ consumer.subscribe(Collections.singleton(kafkaProperties.getTopic()));
ConsumerRecords<String, String> records = consumer.poll(KAFKA_POLL_TIMEOUT);
for (ConsumerRecord<String, String> record : records) {
events.add(record.value());
@@ -205,18 +181,4 @@
MILLISECONDS.sleep(50);
}
}
-
- public static class TestKafkaEventListener implements EventListener {
- private final BrokerApi brokerApi;
-
- @Inject
- TestKafkaEventListener(BrokerApi brokerApi) {
- this.brokerApi = brokerApi;
- }
-
- @Override
- public void onEvent(Event event) {
- brokerApi.send(TEST_EVENTS_TOPIC, event);
- }
- }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
index f22198d..5aa9ca8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
@@ -51,6 +51,7 @@
@Before
public void setUp() {
when(producerProvider.get()).thenReturn(kafkaProducer);
+ when(properties.getTopic()).thenReturn(topic);
recordMetadata = new RecordMetadata(new TopicPartition(topic, 0), 0L, 0L, 0L, 0L, 0, 0);
@@ -62,7 +63,7 @@
public void shouldIncrementBrokerMetricCounterWhenMessagePublishedInSyncMode() {
when(properties.isSendAsync()).thenReturn(false);
when(kafkaProducer.send(any())).thenReturn(Futures.immediateFuture(recordMetadata));
- objectUnderTest.publish("anyTopic", message);
+ objectUnderTest.publish(message);
verify(publisherMetrics, only()).incrementBrokerPublishedMessage();
}
@@ -70,7 +71,7 @@
public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInSyncMode() {
when(properties.isSendAsync()).thenReturn(false);
when(kafkaProducer.send(any())).thenReturn(Futures.immediateFailedFuture(new Exception()));
- objectUnderTest.publish("anyTopic", message);
+ objectUnderTest.publish(message);
verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
}
@@ -79,7 +80,7 @@
when(properties.isSendAsync()).thenReturn(false);
when(kafkaProducer.send(any())).thenThrow(new RuntimeException("Unexpected runtime exception"));
try {
- objectUnderTest.publish("anyTopic", message);
+ objectUnderTest.publish(message);
} catch (RuntimeException e) {
// expected
}
@@ -91,7 +92,7 @@
when(properties.isSendAsync()).thenReturn(true);
when(kafkaProducer.send(any(), any())).thenReturn(Futures.immediateFuture(recordMetadata));
- objectUnderTest.publish("anyTopic", message);
+ objectUnderTest.publish(message);
verify(kafkaProducer).send(any(), callbackCaptor.capture());
callbackCaptor.getValue().onCompletion(recordMetadata, null);
@@ -104,7 +105,7 @@
when(kafkaProducer.send(any(), any()))
.thenReturn(Futures.immediateFailedFuture(new Exception()));
- objectUnderTest.publish("anyTopic", message);
+ objectUnderTest.publish(message);
verify(kafkaProducer).send(any(), callbackCaptor.capture());
callbackCaptor.getValue().onCompletion(null, new Exception());
@@ -117,7 +118,7 @@
when(kafkaProducer.send(any(), any()))
.thenThrow(new RuntimeException("Unexpected runtime exception"));
try {
- objectUnderTest.publish("anyTopic", message);
+ objectUnderTest.publish(message);
} catch (RuntimeException e) {
// expected
}