Merge branch 'stable-3.3' into master

* stable-3.3:
  Fix kafka-events replay messages feature

Change-Id: I22db5cbcff4dc63752be9d419f5415e76abc6acb
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
index 40415a4..7ef9d7b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -133,6 +133,12 @@
       try {
         while (!closed.get()) {
           if (resetOffset.getAndSet(false)) {
+            // Make sure there is an assignment for this consumer
+            while (consumer.assignment().isEmpty() && !closed.get()) {
+              logger.atInfo().log(
+                  "Resetting offset: no partitions assigned to the consumer, request assignment.");
+              consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
+            }
             consumer.seekToBeginning(consumer.assignment());
           }
           ConsumerRecords<byte[], byte[]> consumerRecords =
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 1c999db..f74eb0a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -15,9 +15,13 @@
 package com.googlesource.gerrit.plugins.kafka;
 
 import static com.google.common.truth.Truth.assertThat;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.junit.Assert.fail;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterables;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.NoHttpd;
@@ -29,11 +33,15 @@
 import com.google.gerrit.extensions.common.ChangeMessageInfo;
 import com.google.gerrit.server.events.CommentAddedEvent;
 import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
 import com.google.gson.Gson;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -121,7 +129,62 @@
     assertThat(commentAddedEvent.comment).isEqualTo(expectedMessage);
   }
 
+  @Test
+  @UseLocalDisk
+  @GerritConfig(name = "plugin.kafka-events.groupId", value = "test-consumer-group")
+  @GerritConfig(
+      name = "plugin.kafka-events.keyDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(
+      name = "plugin.kafka-events.valueDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(name = "plugin.kafka-events.pollingIntervalMs", value = "500")
+  public void shouldReplayAllEvents() throws InterruptedException {
+    String topic = "a_topic";
+    EventMessage eventMessage =
+        new EventMessage(
+            new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()),
+            new ProjectCreatedEvent());
+
+    Duration WAIT_FOR_POLL_TIMEOUT = Duration.ofMillis(1000);
+
+    List<EventMessage> receivedEvents = new ArrayList<>();
+
+    BrokerApi kafkaBrokerApi = kafkaBrokerApi();
+    kafkaBrokerApi.send(topic, eventMessage);
+
+    kafkaBrokerApi.receiveAsync(topic, receivedEvents::add);
+
+    waitUntil(() -> receivedEvents.size() == 1, WAIT_FOR_POLL_TIMEOUT);
+
+    assertThat(receivedEvents.get(0).getHeader().eventId)
+        .isEqualTo(eventMessage.getHeader().eventId);
+
+    kafkaBrokerApi.replayAllEvents(topic);
+    waitUntil(() -> receivedEvents.size() == 2, WAIT_FOR_POLL_TIMEOUT);
+
+    assertThat(receivedEvents.get(1).getHeader().eventId)
+        .isEqualTo(eventMessage.getHeader().eventId);
+  }
+
+  private BrokerApi kafkaBrokerApi() {
+    return plugin.getSysInjector().getInstance(BrokerApi.class);
+  }
+
   private KafkaProperties kafkaProperties() {
     return plugin.getSysInjector().getInstance(KafkaProperties.class);
   }
+
+  // XXX: Remove this method when merging into stable-3.3, since waitUntil is
+  // available in Gerrit core.
+  public static void waitUntil(Supplier<Boolean> waitCondition, Duration timeout)
+      throws InterruptedException {
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    while (!waitCondition.get()) {
+      if (stopwatch.elapsed().compareTo(timeout) > 0) {
+        throw new InterruptedException();
+      }
+      MILLISECONDS.sleep(50);
+    }
+  }
 }