Merge branch 'stable-3.4' into stable-3.5

* stable-3.4:
  Register Kafka consumers with external groupId when plugin starts
  Make KafkaBrokerApi class implement ExtendedBrokerApi interface
  Consume events-broker from source
  Add validation dependency on events-broker module
  Add .gitignore file in the project
  Pass correct amount of arguments to Malformed event log line
  Add Kafka REST API authentication
  Fix the topic events replay Kafka REST-API
  Use Kafka REST Proxy id to subscribe to the correct instance
  Fix Kafka REST Proxy accepts header for topic meta-data
  Kafka REST Client: avoid clashes between clients
  Fix threshold of HTTP wire logging
  Delete subscription at the end of ReceiverJob
  Update kafka-client 2.1.0 -> 2.1.1
  Increase patience to 30s for shouldReplayAllEvents test
  Remove unused RequestConfigProvider
  REST ClientType: Make thread pool and timeouts configuration
  Extract configuration properties into constants
  Manage Kafka clientType when starting session
  Receive messages through Kafka REST API
  Send messages through Kafka REST API
  Abstract Publisher/Subscriber into generic interfaces
  Wait at most for 5s for an empty topic
  Assert that messages are acknowledged in KafkaBrokerApiTest
  Add Kafka REST-API container in test
  Remove access to deprecated poll(long) method
  Use explicit Kafka image:tag in tests
  Do not connect KafkaSession without bootstrap servers

Change-Id: I120c9fffe052195f31b1132e0c6fc0fd35680840
diff --git a/BUILD b/BUILD
index 6e88b11..9a517b6 100644
--- a/BUILD
+++ b/BUILD
@@ -11,6 +11,7 @@
     srcs = glob(["src/main/java/**/*.java"]),
     manifest_entries = [
         "Gerrit-PluginName: events-kafka",
+        "Gerrit-InitStep: com.googlesource.gerrit.plugins.kafka.InitConfig",
         "Gerrit-Module: com.googlesource.gerrit.plugins.kafka.Module",
         "Implementation-Title: Gerrit Apache Kafka plugin",
         "Implementation-URL: https://gerrit.googlesource.com/plugins/events-kafka",
@@ -27,16 +28,16 @@
 
 junit_tests(
     name = "events_kafka_tests",
+    timeout = "long",
     srcs = glob(["src/test/java/**/*.java"]),
     resources = glob(["src/test/resources/**/*"]),
     tags = ["events-kafka"],
-    timeout = "long",
     deps = [
         ":events-kafka__plugin_test_deps",
-        "//lib/testcontainers",
         "//plugins/events-broker",
         "@kafka-client//jar",
         "@testcontainers-kafka//jar",
+        "@testcontainers//jar",
     ],
 )
 
@@ -47,10 +48,13 @@
     exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
         ":events-kafka__plugin",
         "@testcontainers-kafka//jar",
-        "//lib/jackson:jackson-annotations",
-        "//lib/testcontainers",
-        "//lib/testcontainers:docker-java-api",
-        "//lib/testcontainers:docker-java-transport",
+        "@jackson-annotations//jar",
+        "@testcontainers//jar",
+        "@docker-java-api//jar",
+        "@docker-java-transport//jar",
+        "@duct-tape//jar",
+        "@visible-assertions//jar",
+        "@jna//jar",
     ],
 )
 
diff --git a/README.md b/README.md
index 26e7ebf..432d7d8 100644
--- a/README.md
+++ b/README.md
@@ -16,7 +16,7 @@
 Events can be anything, from the traditional stream events
 to the Gerrit metrics.
 
-This plugin requires Gerrit 2.13 or laster.
+This plugin requires Gerrit 2.13 or later.
 
 Environments
 ---------------------
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index db9da3a..1f23015 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -8,7 +8,65 @@
     )
 
     maven_jar(
+        name = "httpcore-nio",
+        artifact = "org.apache.httpcomponents:httpcore-nio:4.4.12",
+        sha1 = "84cd29eca842f31db02987cfedea245af020198b",
+    )
+
+    maven_jar(
+        name = "httpasyncclient",
+        artifact = "org.apache.httpcomponents:httpasyncclient:4.1.4",
+        sha1 = "f3a3240681faae3fa46b573a4c7e50cec9db0d86",
+    )
+
+    TESTCONTAINERS_VERSION = "1.15.3"
+
+    maven_jar(
+        name = "testcontainers",
+        artifact = "org.testcontainers:testcontainers:" + TESTCONTAINERS_VERSION,
+        sha1 = "95c6cfde71c2209f0c29cb14e432471e0b111880",
+    )
+
+    maven_jar(
         name = "testcontainers-kafka",
-        artifact = "org.testcontainers:kafka:1.15.0",
-        sha1 = "d34760b11ab656e08b72c1e2e9b852f037a89f90",
+        artifact = "org.testcontainers:kafka:" + TESTCONTAINERS_VERSION,
+        sha1 = "f5aa7be56babf71228872fe9983cce9555dffa13",
+    )
+
+    maven_jar(
+        name = "duct-tape",
+        artifact = "org.rnorth.duct-tape:duct-tape:1.0.8",
+        sha1 = "92edc22a9ab2f3e17c9bf700aaee377d50e8b530",
+    )
+
+    maven_jar(
+        name = "visible-assertions",
+        artifact = "org.rnorth.visible-assertions:visible-assertions:2.1.2",
+        sha1 = "20d31a578030ec8e941888537267d3123c2ad1c1",
+    )
+
+    maven_jar(
+        name = "jna",
+        artifact = "net.java.dev.jna:jna:5.5.0",
+        sha1 = "0e0845217c4907822403912ad6828d8e0b256208",
+    )
+
+    DOCKER_JAVA_VERS = "3.2.8"
+
+    maven_jar(
+        name = "docker-java-api",
+        artifact = "com.github.docker-java:docker-java-api:" + DOCKER_JAVA_VERS,
+        sha1 = "4ac22a72d546a9f3523cd4b5fabffa77c4a6ec7c",
+    )
+
+    maven_jar(
+        name = "docker-java-transport",
+        artifact = "com.github.docker-java:docker-java-transport:" + DOCKER_JAVA_VERS,
+        sha1 = "c3b5598c67d0a5e2e780bf48f520da26b9915eab",
+    )
+
+    maven_jar(
+        name = "jackson-annotations",
+        artifact = "com.fasterxml.jackson.core:jackson-annotations:2.10.3",
+        sha1 = "0f63b3b1da563767d04d2e4d3fc1ae0cdeffebe7",
     )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/InitConfig.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/InitConfig.java
new file mode 100644
index 0000000..0a99fcb
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/InitConfig.java
@@ -0,0 +1,90 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.kafka;
+
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_ASYNC_DEFAULT;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_ASYNC_FIELD;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_STREAM_EVENTS_DEFAULT;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.SEND_STREAM_EVENTS_FIELD;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.STREAM_EVENTS_TOPIC_DEFAULT;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.STREAM_EVENTS_TOPIC_FIELD;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties.DEFAULT_NUMBER_OF_SUBSCRIBERS;
+import static com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties.DEFAULT_POLLING_INTERVAL_MS;
+
+import com.google.common.base.Strings;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.pgm.init.api.ConsoleUI;
+import com.google.gerrit.pgm.init.api.InitStep;
+import com.google.gerrit.pgm.init.api.Section;
+import com.google.gerrit.server.config.GerritInstanceIdProvider;
+import com.google.inject.Inject;
+
+public class InitConfig implements InitStep {
+  private static final String GROUP_ID_FIELD = "groupId";
+  private static final String POLLING_INTERVAL_FIELD = "pollingIntervalMs";
+  private static final String NUMBER_OF_SUBSCRIBERS_FIELD = "numberOfSubscribers";
+
+  private final Section pluginSection;
+  private final String pluginName;
+  private final ConsoleUI ui;
+  private final GerritInstanceIdProvider gerritInstanceIdProvider;
+
+  @Inject
+  InitConfig(
+      Section.Factory sections,
+      @PluginName String pluginName,
+      GerritInstanceIdProvider gerritInstanceIdProvider,
+      ConsoleUI ui) {
+    this.pluginName = pluginName;
+    this.ui = ui;
+    this.gerritInstanceIdProvider = gerritInstanceIdProvider;
+    this.pluginSection = sections.get("plugin", pluginName);
+  }
+
+  @Override
+  public void run() throws Exception {
+    ui.header(String.format("%s plugin", pluginName));
+
+    boolean sendStreamEvents = ui.yesno(SEND_STREAM_EVENTS_DEFAULT, "Should send stream events?");
+    pluginSection.set(SEND_STREAM_EVENTS_FIELD, Boolean.toString(sendStreamEvents));
+
+    if (sendStreamEvents) {
+      pluginSection.string(
+          "Stream events topic", STREAM_EVENTS_TOPIC_FIELD, STREAM_EVENTS_TOPIC_DEFAULT);
+    }
+
+    boolean sendAsync = ui.yesno(SEND_ASYNC_DEFAULT, "Should send messages asynchronously?");
+    pluginSection.set(SEND_ASYNC_FIELD, Boolean.toString(sendAsync));
+
+    pluginSection.string(
+        "Polling interval (ms)", POLLING_INTERVAL_FIELD, DEFAULT_POLLING_INTERVAL_MS);
+
+    pluginSection.string(
+        "Number of subscribers", NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS);
+
+    String consumerGroup =
+        pluginSection.string("Consumer group", GROUP_ID_FIELD, gerritInstanceIdProvider.get());
+    while (Strings.isNullOrEmpty(consumerGroup) && !ui.isBatch()) {
+      ui.message("'%s' is mandatory. Please specify a value.", GROUP_ID_FIELD);
+      consumerGroup =
+          pluginSection.string("Consumer group", GROUP_ID_FIELD, gerritInstanceIdProvider.get());
+    }
+
+    if (Strings.isNullOrEmpty(consumerGroup) && ui.isBatch()) {
+      System.err.printf(
+          "FATAL [%s plugin]: Could not set '%s' in batch mode. %s will not work%n",
+          pluginName, GROUP_ID_FIELD, pluginName);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index 57ca564..0a1ff35 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -26,6 +26,7 @@
 import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.ClientType;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaPublisherProperties;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaRestProducer;
 import com.googlesource.gerrit.plugins.kafka.rest.FutureExecutor;
@@ -40,10 +41,16 @@
   private final KafkaApiModule kafkaBrokerModule;
   private final KafkaProperties kafkaConf;
   private final WorkQueue workQueue;
+  private final KafkaPublisherProperties configuration;
 
   @Inject
-  public Module(KafkaApiModule kafkaBrokerModule, KafkaProperties kafkaConf, WorkQueue workQueue) {
+  public Module(
+      KafkaApiModule kafkaBrokerModule,
+      KafkaPublisherProperties configuration,
+      KafkaProperties kafkaConf,
+      WorkQueue workQueue) {
     this.kafkaBrokerModule = kafkaBrokerModule;
+    this.configuration = configuration;
     this.kafkaConf = kafkaConf;
     this.workQueue = workQueue;
   }
@@ -51,7 +58,10 @@
   @Override
   protected void configure() {
     DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
-    DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
+
+    if (configuration.isSendStreamEvents()) {
+      DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
+    }
 
     ClientType clientType = kafkaConf.getClientType();
     switch (clientType) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index 1b29748..05cda98 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -54,6 +54,13 @@
   private static final String DEFAULT_STREAM_EVENTS_TOPIC_NAME = "gerrit";
 
   private static final long serialVersionUID = 0L;
+  public static final String SEND_STREAM_EVENTS_FIELD = "sendStreamEvents";
+  public static final String STREAM_EVENTS_TOPIC_FIELD = "topic";
+  public static final String SEND_ASYNC_FIELD = "sendAsync";
+
+  public static final Boolean SEND_STREAM_EVENTS_DEFAULT = false;
+  public static final String STREAM_EVENTS_TOPIC_DEFAULT = "gerrit";
+  public static final Boolean SEND_ASYNC_DEFAULT = true;
 
   public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
 
@@ -64,6 +71,7 @@
 
   private final String topic;
   private final boolean sendAsync;
+  private final boolean sendStreamEvents;
   private final ClientType clientType;
   private final String restApiUriString;
   private final String restApiUsername;
@@ -77,6 +85,8 @@
     super();
     setDefaults();
     PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
+    sendStreamEvents =
+        fromGerritConfig.getBoolean(SEND_STREAM_EVENTS_FIELD, SEND_STREAM_EVENTS_DEFAULT);
     topic =
         fromGerritConfig.getString(
             PROPERTY_STREAM_EVENTS_TOPIC_NAME, DEFAULT_STREAM_EVENTS_TOPIC_NAME);
@@ -132,6 +142,7 @@
     setDefaults();
     topic = DEFAULT_STREAM_EVENTS_TOPIC_NAME;
     this.sendAsync = sendAsync;
+    this.sendStreamEvents = true;
     this.clientType = clientType;
     this.restApiUriString = restApiUriString;
     initDockerizedKafkaServer();
@@ -186,6 +197,10 @@
     return getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
   }
 
+  public boolean isSendStreamEvents() {
+    return sendStreamEvents;
+  }
+
   public ClientType getClientType() {
     return clientType;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
index 7e6a49c..f9100ef 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
@@ -23,8 +23,8 @@
 @Singleton
 public class KafkaSubscriberProperties extends KafkaProperties {
   private static final long serialVersionUID = 1L;
-  private static final String DEFAULT_POLLING_INTERVAL_MS = "1000";
-  private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
+  public static final String DEFAULT_POLLING_INTERVAL_MS = "1000";
+  public static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
 
   private final Integer pollingInterval;
   private final String groupId;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
index 469e43e..476be1e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
@@ -224,8 +224,7 @@
                   messageProcessor.accept(event);
                 } catch (Exception e) {
                   logger.atSevere().withCause(e).log(
-                      "Malformed event '%s': [Exception: %s]",
-                      new String(consumerRecord.value(), UTF_8));
+                      "Malformed event '%s'", new String(consumerRecord.value(), UTF_8));
                   subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
                 }
               });
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index bfac4c5..621d769 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -58,6 +58,14 @@
 :	Polling interval in msec for receiving messages from Kafka topic subscription.
 	Default: 1000
 
+`plugin.@PLUGIN@.numberOfSubscribers`
+:   The number of consumers that are expected to be executed. This number will
+    be used to allocate a thread pool of a suitable size.
+    Default to `6`. This is to allow enough resources to consume all relevant
+    gerrit topics in a multi-site deployment: `batchIndexEventTopic`
+    `streamEventTopic`, `gerritTopic`, `projectListEventTopic`,
+    `cacheEventTopic`, `indexEventTopic`
+
 `plugin.@PLUGIN@.restApiUri`
 :	URL of the
 	[Confluent REST-API Proxy](https://docs.confluent.io/platform/current/kafka-rest/index.html)
@@ -94,6 +102,32 @@
 	acknowledge of the message being sent.
 	Default: true
 
+`plugin.@PLUGIN@.topic`
+:   Send all gerrit stream events to this topic (when `sendStreamEvents` is set
+    to `true`).
+    Default: gerrit
+
+`plugin.@PLUGIN@.sendStreamEvents`
+:   Whether to send stream events to the `topic` topic.
+    Default: false
+
+Gerrit init integration
+-----------------------
+
+The @PLUGIN@ plugin provides an init step that helps to set up the configuration.
+
+```shell
+*** events-kafka plugin
+***
+
+Should send stream events?     [y/N]? y
+Stream events topic            [gerrit]: gerrit_stream_events
+Should send messages asynchronously? [Y/n]? y
+Polling interval (ms)          [1000]: 3000
+Number of subscribers          [6]: 6
+Consumer group                 [my_group_id]: my_group_id
+```
+
 secure.config
 --------------------
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 1a3507d..028d589 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -88,27 +88,9 @@
   @GerritConfig(
       name = "plugin.events-kafka.valueDeserializer",
       value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(name = "plugin.events-kafka.sendStreamEvents", value = "true")
   public void consumeEvents() throws Exception {
-    PushOneCommit.Result r = createChange();
-
-    ReviewInput in = ReviewInput.recommend();
-    in.message = "LGTM";
-    gApi.changes().id(r.getChangeId()).revision("current").review(in);
-    List<ChangeMessageInfo> messages =
-        new ArrayList<>(gApi.changes().id(r.getChangeId()).get().messages);
-    assertThat(messages).hasSize(2);
-    String expectedMessage = "Patch Set 1: Code-Review+1\n\nLGTM";
-    assertThat(messages.get(1).message).isEqualTo(expectedMessage);
-
-    List<String> events = new ArrayList<>();
-    KafkaProperties kafkaProperties = kafkaProperties();
-    try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties)) {
-      consumer.subscribe(Collections.singleton(kafkaProperties.getTopic()));
-      ConsumerRecords<String, String> records = consumer.poll(KAFKA_POLL_TIMEOUT);
-      for (ConsumerRecord<String, String> record : records) {
-        events.add(record.value());
-      }
-    }
+    List<String> events = reviewNewChangeAndGetStreamEvents();
 
     // There are 6 events are received in the following order:
     // 1. refUpdate:        ref: refs/sequences/changes
@@ -125,7 +107,23 @@
     assertThat(event).isInstanceOf(CommentAddedEvent.class);
 
     CommentAddedEvent commentAddedEvent = (CommentAddedEvent) event;
-    assertThat(commentAddedEvent.comment).isEqualTo(expectedMessage);
+    assertThat(commentAddedEvent.comment).isEqualTo("Patch Set 1: Code-Review+1\n\nLGTM");
+  }
+
+  @Test
+  @UseLocalDisk
+  @GerritConfig(name = "plugin.events-kafka.groupId", value = "test-consumer-group")
+  @GerritConfig(
+      name = "plugin.events-kafka.keyDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(
+      name = "plugin.events-kafka.valueDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(name = "plugin.events-kafka.sendStreamEvents", value = "false")
+  public void shouldNotSendStreamEventsWhenDisabled() throws Exception {
+    List<String> events = reviewNewChangeAndGetStreamEvents();
+
+    assertThat(events).isEmpty();
   }
 
   @Test
@@ -193,6 +191,30 @@
     return plugin.getSysInjector().getInstance(KafkaProperties.class);
   }
 
+  private List<String> reviewNewChangeAndGetStreamEvents() throws Exception {
+    PushOneCommit.Result r = createChange();
+
+    ReviewInput in = ReviewInput.recommend();
+    in.message = "LGTM";
+    gApi.changes().id(r.getChangeId()).revision("current").review(in);
+    List<ChangeMessageInfo> messages =
+        new ArrayList<>(gApi.changes().id(r.getChangeId()).get().messages);
+    assertThat(messages).hasSize(2);
+    String expectedMessage = "Patch Set 1: Code-Review+1\n\nLGTM";
+    assertThat(messages.get(1).message).isEqualTo(expectedMessage);
+
+    List<String> events = new ArrayList<>();
+    KafkaProperties kafkaProperties = kafkaProperties();
+    try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties)) {
+      consumer.subscribe(Collections.singleton(kafkaProperties.getTopic()));
+      ConsumerRecords<String, String> records = consumer.poll(KAFKA_POLL_TIMEOUT);
+      for (ConsumerRecord<String, String> record : records) {
+        events.add(record.value());
+      }
+    }
+    return events;
+  }
+
   // XXX: Remove this method when merging into stable-3.3, since waitUntil is
   // available in Gerrit core.
   public static void waitUntil(Supplier<Boolean> waitCondition, Duration timeout)