Merge changes from topic "revert-312871-centralize-stream-events-handling-JLRJXKYEBH"
* changes:
Introduce InitStep
Document numberOfSubscribers configuration
Only publish stream events when sendStreamEvents is set.
Revert "Remove publishing of stream events"
diff --git a/BUILD b/BUILD
index c0eab0c..1604d58 100644
--- a/BUILD
+++ b/BUILD
@@ -11,6 +11,7 @@
srcs = glob(["src/main/java/**/*.java"]),
manifest_entries = [
"Gerrit-PluginName: events-kafka",
+ "Gerrit-InitStep: com.googlesource.gerrit.plugins.kafka.InitConfig",
"Gerrit-Module: com.googlesource.gerrit.plugins.kafka.Module",
"Implementation-Title: Gerrit Apache Kafka plugin",
"Implementation-URL: https://gerrit.googlesource.com/plugins/events-kafka",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/InitConfig.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/InitConfig.java
new file mode 100644
index 0000000..0a99fcb
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/InitConfig.java
@@ -0,0 +1,90 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.kafka;
+
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_ASYNC_DEFAULT;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_ASYNC_FIELD;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_STREAM_EVENTS_DEFAULT;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_STREAM_EVENTS_FIELD;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.STREAM_EVENTS_TOPIC_DEFAULT;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.STREAM_EVENTS_TOPIC_FIELD;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties.DEFAULT_NUMBER_OF_SUBSCRIBERS;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties.DEFAULT_POLLING_INTERVAL_MS;
+
+import com.google.common.base.Strings;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.pgm.init.api.ConsoleUI;
+import com.google.gerrit.pgm.init.api.InitStep;
+import com.google.gerrit.pgm.init.api.Section;
+import com.google.gerrit.server.config.GerritInstanceIdProvider;
+import com.google.inject.Inject;
+
+public class InitConfig implements InitStep {
+ private static final String GROUP_ID_FIELD = "groupId";
+ private static final String POLLING_INTERVAL_FIELD = "pollingIntervalMs";
+ private static final String NUMBER_OF_SUBSCRIBERS_FIELD = "numberOfSubscribers";
+
+ private final Section pluginSection;
+ private final String pluginName;
+ private final ConsoleUI ui;
+ private final GerritInstanceIdProvider gerritInstanceIdProvider;
+
+ @Inject
+ InitConfig(
+ Section.Factory sections,
+ @PluginName String pluginName,
+ GerritInstanceIdProvider gerritInstanceIdProvider,
+ ConsoleUI ui) {
+ this.pluginName = pluginName;
+ this.ui = ui;
+ this.gerritInstanceIdProvider = gerritInstanceIdProvider;
+ this.pluginSection = sections.get("plugin", pluginName);
+ }
+
+ @Override
+ public void run() throws Exception {
+ ui.header(String.format("%s plugin", pluginName));
+
+ boolean sendStreamEvents = ui.yesno(SEND_STREAM_EVENTS_DEFAULT, "Should send stream events?");
+ pluginSection.set(SEND_STREAM_EVENTS_FIELD, Boolean.toString(sendStreamEvents));
+
+ if (sendStreamEvents) {
+ pluginSection.string(
+ "Stream events topic", STREAM_EVENTS_TOPIC_FIELD, STREAM_EVENTS_TOPIC_DEFAULT);
+ }
+
+ boolean sendAsync = ui.yesno(SEND_ASYNC_DEFAULT, "Should send messages asynchronously?");
+ pluginSection.set(SEND_ASYNC_FIELD, Boolean.toString(sendAsync));
+
+ pluginSection.string(
+ "Polling interval (ms)", POLLING_INTERVAL_FIELD, DEFAULT_POLLING_INTERVAL_MS);
+
+ pluginSection.string(
+ "Number of subscribers", NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS);
+
+ String consumerGroup =
+ pluginSection.string("Consumer group", GROUP_ID_FIELD, gerritInstanceIdProvider.get());
+ while (Strings.isNullOrEmpty(consumerGroup) && !ui.isBatch()) {
+ ui.message("'%s' is mandatory. Please specify a value.", GROUP_ID_FIELD);
+ consumerGroup =
+ pluginSection.string("Consumer group", GROUP_ID_FIELD, gerritInstanceIdProvider.get());
+ }
+
+ if (Strings.isNullOrEmpty(consumerGroup) && ui.isBatch()) {
+ System.err.printf(
+ "FATAL [%s plugin]: Could not set '%s' in batch mode. %s will not work%n",
+ pluginName, GROUP_ID_FIELD, pluginName);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index d285146..618fea8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -16,26 +16,35 @@
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventListener;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.TypeLiteral;
import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaPublisherProperties;
+import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
import org.apache.kafka.clients.producer.KafkaProducer;
class Module extends AbstractModule {
private final KafkaApiModule kafkaBrokerModule;
+ private final KafkaPublisherProperties configuration;
@Inject
- public Module(KafkaApiModule kafkaBrokerModule) {
+ public Module(KafkaApiModule kafkaBrokerModule, KafkaPublisherProperties configuration) {
this.kafkaBrokerModule = kafkaBrokerModule;
+ this.configuration = configuration;
}
@Override
protected void configure() {
DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
+ if (configuration.isSendStreamEvents()) {
+ DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
+ }
+
bind(new TypeLiteral<KafkaProducer<String, String>>() {})
.toProvider(KafkaProducerProvider.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index a45549c..0c58c1b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -30,17 +30,29 @@
@Singleton
public class KafkaProperties extends java.util.Properties {
private static final long serialVersionUID = 0L;
+ public static final String SEND_STREAM_EVENTS_FIELD = "sendStreamEvents";
+ public static final String STREAM_EVENTS_TOPIC_FIELD = "topic";
+ public static final String SEND_ASYNC_FIELD = "sendAsync";
+
+ public static final Boolean SEND_STREAM_EVENTS_DEFAULT = false;
+ public static final String STREAM_EVENTS_TOPIC_DEFAULT = "gerrit";
+ public static final Boolean SEND_ASYNC_DEFAULT = true;
public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
+ private final String topic;
private final boolean sendAsync;
+ private final boolean sendStreamEvents;
@Inject
public KafkaProperties(PluginConfigFactory configFactory, @PluginName String pluginName) {
super();
setDefaults();
PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
- sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
+ topic = fromGerritConfig.getString(STREAM_EVENTS_TOPIC_FIELD, STREAM_EVENTS_TOPIC_DEFAULT);
+ sendAsync = fromGerritConfig.getBoolean(SEND_ASYNC_FIELD, SEND_ASYNC_DEFAULT);
+ sendStreamEvents =
+ fromGerritConfig.getBoolean(SEND_STREAM_EVENTS_FIELD, SEND_STREAM_EVENTS_DEFAULT);
applyConfig(fromGerritConfig);
initDockerizedKafkaServer();
}
@@ -49,7 +61,9 @@
public KafkaProperties(boolean sendAsync) {
super();
setDefaults();
+ topic = "gerrit";
this.sendAsync = sendAsync;
+ this.sendStreamEvents = true;
initDockerizedKafkaServer();
}
@@ -85,6 +99,10 @@
}
}
+ public String getTopic() {
+ return topic;
+ }
+
public boolean isSendAsync() {
return sendAsync;
}
@@ -92,4 +110,8 @@
public String getBootstrapServers() {
return getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
}
+
+ public boolean isSendStreamEvents() {
+ return sendStreamEvents;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
index 52d4726..94e8e62 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
@@ -23,8 +23,8 @@
@Singleton
public class KafkaSubscriberProperties extends KafkaProperties {
private static final long serialVersionUID = 1L;
- private static final String DEFAULT_POLLING_INTERVAL_MS = "1000";
- private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
+ public static final String DEFAULT_POLLING_INTERVAL_MS = "1000";
+ public static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
private final Integer pollingInterval;
private final String groupId;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
index 716daf6..e7670cb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
@@ -18,6 +18,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventGson;
+import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.inject.Inject;
@@ -25,7 +26,7 @@
import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
@Singleton
-public class KafkaPublisher {
+public class KafkaPublisher implements EventListener {
private final KafkaSession session;
private final Gson gson;
@@ -46,6 +47,13 @@
session.disconnect();
}
+ @Override
+ public void onEvent(Event event) {
+ if (session.isOpen()) {
+ session.publish(gson.toJson(event));
+ }
+ }
+
public ListenableFuture<Boolean> publish(String topic, Event event) {
return session.publish(topic, getPayload(event));
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index 2581bda..0dc29e1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -86,6 +86,10 @@
producer = null;
}
+ public ListenableFuture<Boolean> publish(String messageBody) {
+ return publish(properties.getTopic(), messageBody);
+ }
+
public ListenableFuture<Boolean> publish(String topic, String messageBody) {
if (properties.isSendAsync()) {
return publishAsync(topic, messageBody);
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 727b7e5..a10004f 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -44,7 +44,41 @@
: Polling interval in msec for receiving messages from Kafka topic subscription.
Default: 1000
+`plugin.@PLUGIN@.numberOfSubscribers`
+: The number of consumers that are expected to be executed. This number will
+ be used to allocate a thread pool of a suitable size.
+ Default to `6`. This is to allow enough resources to consume all relevant
+ gerrit topics in a multi-site deployment: `batchIndexEventTopic`
+ `streamEventTopic`, `gerritTopic`, `projectListEventTopic`,
+ `cacheEventTopic`, `indexEventTopic`
+
`plugin.@PLUGIN@.sendAsync`
: Send messages to Kafka asynchronously, detaching the calling process from the
acknowledge of the message being sent.
Default: true
+
+`plugin.@PLUGIN@.topic`
+: Send all gerrit stream events to this topic (when `sendStreamEvents` is set
+ to `true`).
+ Default: gerrit
+
+`plugin.@PLUGIN@.sendStreamEvents`
+: Whether to send stream events to the `topic` topic.
+ Default: false
+
+Gerrit init integration
+-----------------------
+
+The @PLUGIN@ plugin provides an init step that helps to set up the configuration.
+
+```shell
+*** events-kafka plugin
+***
+
+Should send stream events? [y/N]? y
+Stream events topic [gerrit]: gerrit_stream_events
+Should send messages asynchronously? [Y/n]? y
+Polling interval (ms) [1000]: 3000
+Number of subscribers [6]: 6
+Consumer group [my_group_id]: my_group_id
+```
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index ca0b01b..6e654bf 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -29,15 +29,11 @@
import com.google.gerrit.acceptance.config.GerritConfig;
import com.google.gerrit.extensions.api.changes.ReviewInput;
import com.google.gerrit.extensions.common.ChangeMessageInfo;
-import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.events.CommentAddedEvent;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventGsonProvider;
-import com.google.gerrit.server.events.EventListener;
import com.google.gerrit.server.events.ProjectCreatedEvent;
import com.google.gson.Gson;
-import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
import java.time.Duration;
import java.util.ArrayList;
@@ -53,31 +49,12 @@
import org.testcontainers.containers.KafkaContainer;
@NoHttpd
-@TestPlugin(
- name = "events-kafka",
- sysModule = "com.googlesource.gerrit.plugins.kafka.EventConsumerIT$TestModule")
+@TestPlugin(name = "events-kafka", sysModule = "com.googlesource.gerrit.plugins.kafka.Module")
public class EventConsumerIT extends LightweightPluginDaemonTest {
-
static final long KAFKA_POLL_TIMEOUT = 10000L;
- static final String TEST_EVENTS_TOPIC = "test-events-topic";
private KafkaContainer kafka;
- public static class TestModule extends AbstractModule {
- private static Module kafkaModule;
-
- @Inject
- TestModule(Module kafkaModule) {
- this.kafkaModule = kafkaModule;
- }
-
- @Override
- protected void configure() {
- install(kafkaModule);
- DynamicSet.bind(binder(), EventListener.class).to(TestKafkaEventListener.class);
- }
- }
-
@Override
public void setUpTestPlugin() throws Exception {
try {
@@ -109,28 +86,9 @@
@GerritConfig(
name = "plugin.events-kafka.valueDeserializer",
value = "org.apache.kafka.common.serialization.StringDeserializer")
+ @GerritConfig(name = "plugin.events-kafka.sendStreamEvents", value = "true")
public void consumeEvents() throws Exception {
- PushOneCommit.Result r = createChange();
-
- ReviewInput in = ReviewInput.recommend();
- in.message = "LGTM";
- gApi.changes().id(r.getChangeId()).revision("current").review(in);
- List<ChangeMessageInfo> messages =
- new ArrayList<>(gApi.changes().id(r.getChangeId()).get().messages);
- assertThat(messages).hasSize(2);
- String expectedMessage = "Patch Set 1: Code-Review+1\n\nLGTM";
- assertThat(messages.get(1).message).isEqualTo(expectedMessage);
-
- List<String> events = new ArrayList<>();
- KafkaProperties kafkaProperties = kafkaProperties();
-
- try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties)) {
- consumer.subscribe(Collections.singleton(TEST_EVENTS_TOPIC));
- ConsumerRecords<String, String> records = consumer.poll(KAFKA_POLL_TIMEOUT);
- for (ConsumerRecord<String, String> record : records) {
- events.add(record.value());
- }
- }
+ List<String> events = reviewNewChangeAndGetStreamEvents();
// There are 6 events are received in the following order:
// 1. refUpdate: ref: refs/sequences/changes
@@ -148,7 +106,23 @@
assertThat(event).isInstanceOf(CommentAddedEvent.class);
CommentAddedEvent commentAddedEvent = (CommentAddedEvent) event;
- assertThat(commentAddedEvent.comment).isEqualTo(expectedMessage);
+ assertThat(commentAddedEvent.comment).isEqualTo("Patch Set 1: Code-Review+1\n\nLGTM");
+ }
+
+ @Test
+ @UseLocalDisk
+ @GerritConfig(name = "plugin.events-kafka.groupId", value = "test-consumer-group")
+ @GerritConfig(
+ name = "plugin.events-kafka.keyDeserializer",
+ value = "org.apache.kafka.common.serialization.StringDeserializer")
+ @GerritConfig(
+ name = "plugin.events-kafka.valueDeserializer",
+ value = "org.apache.kafka.common.serialization.StringDeserializer")
+ @GerritConfig(name = "plugin.events-kafka.sendStreamEvents", value = "false")
+ public void shouldNotSendStreamEventsWhenDisabled() throws Exception {
+ List<String> events = reviewNewChangeAndGetStreamEvents();
+
+ assertThat(events).isEmpty();
}
@Test
@@ -193,6 +167,30 @@
return plugin.getSysInjector().getInstance(KafkaProperties.class);
}
+ private List<String> reviewNewChangeAndGetStreamEvents() throws Exception {
+ PushOneCommit.Result r = createChange();
+
+ ReviewInput in = ReviewInput.recommend();
+ in.message = "LGTM";
+ gApi.changes().id(r.getChangeId()).revision("current").review(in);
+ List<ChangeMessageInfo> messages =
+ new ArrayList<>(gApi.changes().id(r.getChangeId()).get().messages);
+ assertThat(messages).hasSize(2);
+ String expectedMessage = "Patch Set 1: Code-Review+1\n\nLGTM";
+ assertThat(messages.get(1).message).isEqualTo(expectedMessage);
+
+ List<String> events = new ArrayList<>();
+ KafkaProperties kafkaProperties = kafkaProperties();
+ try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties)) {
+ consumer.subscribe(Collections.singleton(kafkaProperties.getTopic()));
+ ConsumerRecords<String, String> records = consumer.poll(KAFKA_POLL_TIMEOUT);
+ for (ConsumerRecord<String, String> record : records) {
+ events.add(record.value());
+ }
+ }
+ return events;
+ }
+
// XXX: Remove this method when merging into stable-3.3, since waitUntil is
// available in Gerrit core.
public static void waitUntil(Supplier<Boolean> waitCondition, Duration timeout)
@@ -205,18 +203,4 @@
MILLISECONDS.sleep(50);
}
}
-
- public static class TestKafkaEventListener implements EventListener {
- private final BrokerApi brokerApi;
-
- @Inject
- TestKafkaEventListener(BrokerApi brokerApi) {
- this.brokerApi = brokerApi;
- }
-
- @Override
- public void onEvent(Event event) {
- brokerApi.send(TEST_EVENTS_TOPIC, event);
- }
- }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
index f22198d..5aa9ca8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
@@ -51,6 +51,7 @@
@Before
public void setUp() {
when(producerProvider.get()).thenReturn(kafkaProducer);
+ when(properties.getTopic()).thenReturn(topic);
recordMetadata = new RecordMetadata(new TopicPartition(topic, 0), 0L, 0L, 0L, 0L, 0, 0);
@@ -62,7 +63,7 @@
public void shouldIncrementBrokerMetricCounterWhenMessagePublishedInSyncMode() {
when(properties.isSendAsync()).thenReturn(false);
when(kafkaProducer.send(any())).thenReturn(Futures.immediateFuture(recordMetadata));
- objectUnderTest.publish("anyTopic", message);
+ objectUnderTest.publish(message);
verify(publisherMetrics, only()).incrementBrokerPublishedMessage();
}
@@ -70,7 +71,7 @@
public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInSyncMode() {
when(properties.isSendAsync()).thenReturn(false);
when(kafkaProducer.send(any())).thenReturn(Futures.immediateFailedFuture(new Exception()));
- objectUnderTest.publish("anyTopic", message);
+ objectUnderTest.publish(message);
verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
}
@@ -79,7 +80,7 @@
when(properties.isSendAsync()).thenReturn(false);
when(kafkaProducer.send(any())).thenThrow(new RuntimeException("Unexpected runtime exception"));
try {
- objectUnderTest.publish("anyTopic", message);
+ objectUnderTest.publish(message);
} catch (RuntimeException e) {
// expected
}
@@ -91,7 +92,7 @@
when(properties.isSendAsync()).thenReturn(true);
when(kafkaProducer.send(any(), any())).thenReturn(Futures.immediateFuture(recordMetadata));
- objectUnderTest.publish("anyTopic", message);
+ objectUnderTest.publish(message);
verify(kafkaProducer).send(any(), callbackCaptor.capture());
callbackCaptor.getValue().onCompletion(recordMetadata, null);
@@ -104,7 +105,7 @@
when(kafkaProducer.send(any(), any()))
.thenReturn(Futures.immediateFailedFuture(new Exception()));
- objectUnderTest.publish("anyTopic", message);
+ objectUnderTest.publish(message);
verify(kafkaProducer).send(any(), callbackCaptor.capture());
callbackCaptor.getValue().onCompletion(null, new Exception());
@@ -117,7 +118,7 @@
when(kafkaProducer.send(any(), any()))
.thenThrow(new RuntimeException("Unexpected runtime exception"));
try {
- objectUnderTest.publish("anyTopic", message);
+ objectUnderTest.publish(message);
} catch (RuntimeException e) {
// expected
}