Merge changes from topic "revert-312871-centralize-stream-events-handling-JLRJXKYEBH"

* changes:
  Introduce InitStep
  Document numberOfSubscribers configuration
  Only publish stream events when sendStreamEvents is set.
  Revert "Remove publishing of stream events"
diff --git a/BUILD b/BUILD
index c0eab0c..1604d58 100644
--- a/BUILD
+++ b/BUILD
@@ -11,6 +11,7 @@
     srcs = glob(["src/main/java/**/*.java"]),
     manifest_entries = [
         "Gerrit-PluginName: events-kafka",
+        "Gerrit-InitStep: com.googlesource.gerrit.plugins.kafka.InitConfig",
         "Gerrit-Module: com.googlesource.gerrit.plugins.kafka.Module",
         "Implementation-Title: Gerrit Apache Kafka plugin",
         "Implementation-URL: https://gerrit.googlesource.com/plugins/events-kafka",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/InitConfig.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/InitConfig.java
new file mode 100644
index 0000000..0a99fcb
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/InitConfig.java
@@ -0,0 +1,90 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.kafka;
+
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_ASYNC_DEFAULT;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_ASYNC_FIELD;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_STREAM_EVENTS_DEFAULT;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_STREAM_EVENTS_FIELD;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.STREAM_EVENTS_TOPIC_DEFAULT;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.STREAM_EVENTS_TOPIC_FIELD;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties.DEFAULT_NUMBER_OF_SUBSCRIBERS;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties.DEFAULT_POLLING_INTERVAL_MS;
+
+import com.google.common.base.Strings;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.pgm.init.api.ConsoleUI;
+import com.google.gerrit.pgm.init.api.InitStep;
+import com.google.gerrit.pgm.init.api.Section;
+import com.google.gerrit.server.config.GerritInstanceIdProvider;
+import com.google.inject.Inject;
+
+public class InitConfig implements InitStep {
+  private static final String GROUP_ID_FIELD = "groupId";
+  private static final String POLLING_INTERVAL_FIELD = "pollingIntervalMs";
+  private static final String NUMBER_OF_SUBSCRIBERS_FIELD = "numberOfSubscribers";
+
+  private final Section pluginSection;
+  private final String pluginName;
+  private final ConsoleUI ui;
+  private final GerritInstanceIdProvider gerritInstanceIdProvider;
+
+  @Inject
+  InitConfig(
+      Section.Factory sections,
+      @PluginName String pluginName,
+      GerritInstanceIdProvider gerritInstanceIdProvider,
+      ConsoleUI ui) {
+    this.pluginName = pluginName;
+    this.ui = ui;
+    this.gerritInstanceIdProvider = gerritInstanceIdProvider;
+    this.pluginSection = sections.get("plugin", pluginName);
+  }
+
+  @Override
+  public void run() throws Exception {
+    ui.header(String.format("%s plugin", pluginName));
+
+    boolean sendStreamEvents = ui.yesno(SEND_STREAM_EVENTS_DEFAULT, "Should send stream events?");
+    pluginSection.set(SEND_STREAM_EVENTS_FIELD, Boolean.toString(sendStreamEvents));
+
+    if (sendStreamEvents) {
+      pluginSection.string(
+          "Stream events topic", STREAM_EVENTS_TOPIC_FIELD, STREAM_EVENTS_TOPIC_DEFAULT);
+    }
+
+    boolean sendAsync = ui.yesno(SEND_ASYNC_DEFAULT, "Should send messages asynchronously?");
+    pluginSection.set(SEND_ASYNC_FIELD, Boolean.toString(sendAsync));
+
+    pluginSection.string(
+        "Polling interval (ms)", POLLING_INTERVAL_FIELD, DEFAULT_POLLING_INTERVAL_MS);
+
+    pluginSection.string(
+        "Number of subscribers", NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS);
+
+    String consumerGroup =
+        pluginSection.string("Consumer group", GROUP_ID_FIELD, gerritInstanceIdProvider.get());
+    while (Strings.isNullOrEmpty(consumerGroup) && !ui.isBatch()) {
+      ui.message("'%s' is mandatory. Please specify a value.", GROUP_ID_FIELD);
+      consumerGroup =
+          pluginSection.string("Consumer group", GROUP_ID_FIELD, gerritInstanceIdProvider.get());
+    }
+
+    if (Strings.isNullOrEmpty(consumerGroup) && ui.isBatch()) {
+      System.err.printf(
+          "FATAL [%s plugin]: Could not set '%s' in batch mode. %s will not work%n",
+          pluginName, GROUP_ID_FIELD, pluginName);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index d285146..618fea8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -16,26 +16,35 @@
 
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventListener;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaPublisherProperties;
+import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
 import org.apache.kafka.clients.producer.KafkaProducer;
 
 class Module extends AbstractModule {
 
   private final KafkaApiModule kafkaBrokerModule;
+  private final KafkaPublisherProperties configuration;
 
   @Inject
-  public Module(KafkaApiModule kafkaBrokerModule) {
+  public Module(KafkaApiModule kafkaBrokerModule, KafkaPublisherProperties configuration) {
     this.kafkaBrokerModule = kafkaBrokerModule;
+    this.configuration = configuration;
   }
 
   @Override
   protected void configure() {
     DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
 
+    if (configuration.isSendStreamEvents()) {
+      DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
+    }
+
     bind(new TypeLiteral<KafkaProducer<String, String>>() {})
         .toProvider(KafkaProducerProvider.class);
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index a45549c..0c58c1b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -30,17 +30,29 @@
 @Singleton
 public class KafkaProperties extends java.util.Properties {
   private static final long serialVersionUID = 0L;
+  public static final String SEND_STREAM_EVENTS_FIELD = "sendStreamEvents";
+  public static final String STREAM_EVENTS_TOPIC_FIELD = "topic";
+  public static final String SEND_ASYNC_FIELD = "sendAsync";
+
+  public static final Boolean SEND_STREAM_EVENTS_DEFAULT = false;
+  public static final String STREAM_EVENTS_TOPIC_DEFAULT = "gerrit";
+  public static final Boolean SEND_ASYNC_DEFAULT = true;
 
   public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
 
+  private final String topic;
   private final boolean sendAsync;
+  private final boolean sendStreamEvents;
 
   @Inject
   public KafkaProperties(PluginConfigFactory configFactory, @PluginName String pluginName) {
     super();
     setDefaults();
     PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
-    sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
+    topic = fromGerritConfig.getString(STREAM_EVENTS_TOPIC_FIELD, STREAM_EVENTS_TOPIC_DEFAULT);
+    sendAsync = fromGerritConfig.getBoolean(SEND_ASYNC_FIELD, SEND_ASYNC_DEFAULT);
+    sendStreamEvents =
+        fromGerritConfig.getBoolean(SEND_STREAM_EVENTS_FIELD, SEND_STREAM_EVENTS_DEFAULT);
     applyConfig(fromGerritConfig);
     initDockerizedKafkaServer();
   }
@@ -49,7 +61,9 @@
   public KafkaProperties(boolean sendAsync) {
     super();
     setDefaults();
+    topic = "gerrit";
     this.sendAsync = sendAsync;
+    this.sendStreamEvents = true;
     initDockerizedKafkaServer();
   }
 
@@ -85,6 +99,10 @@
     }
   }
 
+  public String getTopic() {
+    return topic;
+  }
+
   public boolean isSendAsync() {
     return sendAsync;
   }
@@ -92,4 +110,8 @@
   public String getBootstrapServers() {
     return getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
   }
+
+  public boolean isSendStreamEvents() {
+    return sendStreamEvents;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
index 52d4726..94e8e62 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
@@ -23,8 +23,8 @@
 @Singleton
 public class KafkaSubscriberProperties extends KafkaProperties {
   private static final long serialVersionUID = 1L;
-  private static final String DEFAULT_POLLING_INTERVAL_MS = "1000";
-  private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
+  public static final String DEFAULT_POLLING_INTERVAL_MS = "1000";
+  public static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
 
   private final Integer pollingInterval;
   private final String groupId;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
index 716daf6..e7670cb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
@@ -18,6 +18,7 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGson;
+import com.google.gerrit.server.events.EventListener;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import com.google.inject.Inject;
@@ -25,7 +26,7 @@
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
 
 @Singleton
-public class KafkaPublisher {
+public class KafkaPublisher implements EventListener {
 
   private final KafkaSession session;
   private final Gson gson;
@@ -46,6 +47,13 @@
     session.disconnect();
   }
 
+  @Override
+  public void onEvent(Event event) {
+    if (session.isOpen()) {
+      session.publish(gson.toJson(event));
+    }
+  }
+
   public ListenableFuture<Boolean> publish(String topic, Event event) {
     return session.publish(topic, getPayload(event));
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index 2581bda..0dc29e1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -86,6 +86,10 @@
     producer = null;
   }
 
+  public ListenableFuture<Boolean> publish(String messageBody) {
+    return publish(properties.getTopic(), messageBody);
+  }
+
   public ListenableFuture<Boolean> publish(String topic, String messageBody) {
     if (properties.isSendAsync()) {
       return publishAsync(topic, messageBody);
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 727b7e5..a10004f 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -44,7 +44,41 @@
 :	Polling interval in msec for receiving messages from Kafka topic subscription.
 	Default: 1000
 
+`plugin.@PLUGIN@.numberOfSubscribers`
+:   The number of consumers that are expected to be executed. This number will
+    be used to allocate a thread pool of a suitable size.
+    Default to `6`. This is to allow enough resources to consume all relevant
+    gerrit topics in a multi-site deployment: `batchIndexEventTopic`
+    `streamEventTopic`, `gerritTopic`, `projectListEventTopic`,
+    `cacheEventTopic`, `indexEventTopic`
+
 `plugin.@PLUGIN@.sendAsync`
 :	Send messages to Kafka asynchronously, detaching the calling process from the
 	acknowledge of the message being sent.
 	Default: true
+
+`plugin.@PLUGIN@.topic`
+:   Send all gerrit stream events to this topic (when `sendStreamEvents` is set
+    to `true`).
+    Default: gerrit
+
+`plugin.@PLUGIN@.sendStreamEvents`
+:   Whether to send stream events to the `topic` topic.
+    Default: false
+
+Gerrit init integration
+-----------------------
+
+The @PLUGIN@ plugin provides an init step that helps to set up the configuration.
+
+```shell
+*** events-kafka plugin
+***
+
+Should send stream events?     [y/N]? y
+Stream events topic            [gerrit]: gerrit_stream_events
+Should send messages asynchronously? [Y/n]? y
+Polling interval (ms)          [1000]: 3000
+Number of subscribers          [6]: 6
+Consumer group                 [my_group_id]: my_group_id
+```
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index ca0b01b..6e654bf 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -29,15 +29,11 @@
 import com.google.gerrit.acceptance.config.GerritConfig;
 import com.google.gerrit.extensions.api.changes.ReviewInput;
 import com.google.gerrit.extensions.common.ChangeMessageInfo;
-import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.CommentAddedEvent;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGsonProvider;
-import com.google.gerrit.server.events.EventListener;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
 import com.google.gson.Gson;
-import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -53,31 +49,12 @@
 import org.testcontainers.containers.KafkaContainer;
 
 @NoHttpd
-@TestPlugin(
-    name = "events-kafka",
-    sysModule = "com.googlesource.gerrit.plugins.kafka.EventConsumerIT$TestModule")
+@TestPlugin(name = "events-kafka", sysModule = "com.googlesource.gerrit.plugins.kafka.Module")
 public class EventConsumerIT extends LightweightPluginDaemonTest {
-
   static final long KAFKA_POLL_TIMEOUT = 10000L;
-  static final String TEST_EVENTS_TOPIC = "test-events-topic";
 
   private KafkaContainer kafka;
 
-  public static class TestModule extends AbstractModule {
-    private static Module kafkaModule;
-
-    @Inject
-    TestModule(Module kafkaModule) {
-      this.kafkaModule = kafkaModule;
-    }
-
-    @Override
-    protected void configure() {
-      install(kafkaModule);
-      DynamicSet.bind(binder(), EventListener.class).to(TestKafkaEventListener.class);
-    }
-  }
-
   @Override
   public void setUpTestPlugin() throws Exception {
     try {
@@ -109,28 +86,9 @@
   @GerritConfig(
       name = "plugin.events-kafka.valueDeserializer",
       value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(name = "plugin.events-kafka.sendStreamEvents", value = "true")
   public void consumeEvents() throws Exception {
-    PushOneCommit.Result r = createChange();
-
-    ReviewInput in = ReviewInput.recommend();
-    in.message = "LGTM";
-    gApi.changes().id(r.getChangeId()).revision("current").review(in);
-    List<ChangeMessageInfo> messages =
-        new ArrayList<>(gApi.changes().id(r.getChangeId()).get().messages);
-    assertThat(messages).hasSize(2);
-    String expectedMessage = "Patch Set 1: Code-Review+1\n\nLGTM";
-    assertThat(messages.get(1).message).isEqualTo(expectedMessage);
-
-    List<String> events = new ArrayList<>();
-    KafkaProperties kafkaProperties = kafkaProperties();
-
-    try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties)) {
-      consumer.subscribe(Collections.singleton(TEST_EVENTS_TOPIC));
-      ConsumerRecords<String, String> records = consumer.poll(KAFKA_POLL_TIMEOUT);
-      for (ConsumerRecord<String, String> record : records) {
-        events.add(record.value());
-      }
-    }
+    List<String> events = reviewNewChangeAndGetStreamEvents();
 
     // There are 6 events are received in the following order:
     // 1. refUpdate:        ref: refs/sequences/changes
@@ -148,7 +106,23 @@
     assertThat(event).isInstanceOf(CommentAddedEvent.class);
 
     CommentAddedEvent commentAddedEvent = (CommentAddedEvent) event;
-    assertThat(commentAddedEvent.comment).isEqualTo(expectedMessage);
+    assertThat(commentAddedEvent.comment).isEqualTo("Patch Set 1: Code-Review+1\n\nLGTM");
+  }
+
+  @Test
+  @UseLocalDisk
+  @GerritConfig(name = "plugin.events-kafka.groupId", value = "test-consumer-group")
+  @GerritConfig(
+      name = "plugin.events-kafka.keyDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(
+      name = "plugin.events-kafka.valueDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(name = "plugin.events-kafka.sendStreamEvents", value = "false")
+  public void shouldNotSendStreamEventsWhenDisabled() throws Exception {
+    List<String> events = reviewNewChangeAndGetStreamEvents();
+
+    assertThat(events).isEmpty();
   }
 
   @Test
@@ -193,6 +167,30 @@
     return plugin.getSysInjector().getInstance(KafkaProperties.class);
   }
 
+  private List<String> reviewNewChangeAndGetStreamEvents() throws Exception {
+    PushOneCommit.Result r = createChange();
+
+    ReviewInput in = ReviewInput.recommend();
+    in.message = "LGTM";
+    gApi.changes().id(r.getChangeId()).revision("current").review(in);
+    List<ChangeMessageInfo> messages =
+        new ArrayList<>(gApi.changes().id(r.getChangeId()).get().messages);
+    assertThat(messages).hasSize(2);
+    String expectedMessage = "Patch Set 1: Code-Review+1\n\nLGTM";
+    assertThat(messages.get(1).message).isEqualTo(expectedMessage);
+
+    List<String> events = new ArrayList<>();
+    KafkaProperties kafkaProperties = kafkaProperties();
+    try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties)) {
+      consumer.subscribe(Collections.singleton(kafkaProperties.getTopic()));
+      ConsumerRecords<String, String> records = consumer.poll(KAFKA_POLL_TIMEOUT);
+      for (ConsumerRecord<String, String> record : records) {
+        events.add(record.value());
+      }
+    }
+    return events;
+  }
+
   // XXX: Remove this method when merging into stable-3.3, since waitUntil is
   // available in Gerrit core.
   public static void waitUntil(Supplier<Boolean> waitCondition, Duration timeout)
@@ -205,18 +203,4 @@
       MILLISECONDS.sleep(50);
     }
   }
-
-  public static class TestKafkaEventListener implements EventListener {
-    private final BrokerApi brokerApi;
-
-    @Inject
-    TestKafkaEventListener(BrokerApi brokerApi) {
-      this.brokerApi = brokerApi;
-    }
-
-    @Override
-    public void onEvent(Event event) {
-      brokerApi.send(TEST_EVENTS_TOPIC, event);
-    }
-  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
index f22198d..5aa9ca8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
@@ -51,6 +51,7 @@
   @Before
   public void setUp() {
     when(producerProvider.get()).thenReturn(kafkaProducer);
+    when(properties.getTopic()).thenReturn(topic);
 
     recordMetadata = new RecordMetadata(new TopicPartition(topic, 0), 0L, 0L, 0L, 0L, 0, 0);
 
@@ -62,7 +63,7 @@
   public void shouldIncrementBrokerMetricCounterWhenMessagePublishedInSyncMode() {
     when(properties.isSendAsync()).thenReturn(false);
     when(kafkaProducer.send(any())).thenReturn(Futures.immediateFuture(recordMetadata));
-    objectUnderTest.publish("anyTopic", message);
+    objectUnderTest.publish(message);
     verify(publisherMetrics, only()).incrementBrokerPublishedMessage();
   }
 
@@ -70,7 +71,7 @@
   public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInSyncMode() {
     when(properties.isSendAsync()).thenReturn(false);
     when(kafkaProducer.send(any())).thenReturn(Futures.immediateFailedFuture(new Exception()));
-    objectUnderTest.publish("anyTopic", message);
+    objectUnderTest.publish(message);
     verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
   }
 
@@ -79,7 +80,7 @@
     when(properties.isSendAsync()).thenReturn(false);
     when(kafkaProducer.send(any())).thenThrow(new RuntimeException("Unexpected runtime exception"));
     try {
-      objectUnderTest.publish("anyTopic", message);
+      objectUnderTest.publish(message);
     } catch (RuntimeException e) {
       // expected
     }
@@ -91,7 +92,7 @@
     when(properties.isSendAsync()).thenReturn(true);
     when(kafkaProducer.send(any(), any())).thenReturn(Futures.immediateFuture(recordMetadata));
 
-    objectUnderTest.publish("anyTopic", message);
+    objectUnderTest.publish(message);
 
     verify(kafkaProducer).send(any(), callbackCaptor.capture());
     callbackCaptor.getValue().onCompletion(recordMetadata, null);
@@ -104,7 +105,7 @@
     when(kafkaProducer.send(any(), any()))
         .thenReturn(Futures.immediateFailedFuture(new Exception()));
 
-    objectUnderTest.publish("anyTopic", message);
+    objectUnderTest.publish(message);
 
     verify(kafkaProducer).send(any(), callbackCaptor.capture());
     callbackCaptor.getValue().onCompletion(null, new Exception());
@@ -117,7 +118,7 @@
     when(kafkaProducer.send(any(), any()))
         .thenThrow(new RuntimeException("Unexpected runtime exception"));
     try {
-      objectUnderTest.publish("anyTopic", message);
+      objectUnderTest.publish(message);
     } catch (RuntimeException e) {
       // expected
     }