Merge branch 'stable-3.0' into stable-3.1

* stable-3.0:
  Update kafka-client 2.1.0 -> 2.1.1

Change-Id: Id68de680b34b0c312edfc77701ad6a45ca699b71
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 59cc4ec..1030261 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -15,6 +15,6 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.0.5",
-        sha1 = "7abf72d2252f975baff666fbbf28b7036767aa81",
+        artifact = "com.gerritforge:events-broker:3.1.3",
+        sha1 = "a12ef44f9b75a5dbecac9f1f0acf0f236b220252",
     )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/KafkaEventsMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/KafkaEventsMetrics.java
new file mode 100644
index 0000000..2226ca2
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/KafkaEventsMetrics.java
@@ -0,0 +1,35 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka;
+
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.server.logging.PluginMetadata;
+
+public abstract class KafkaEventsMetrics {
+
+  public Field<String> stringField(String metadataKey, String description) {
+    return Field.ofString(
+            metadataKey,
+            (metadataBuilder, fieldValue) ->
+                metadataBuilder.addPluginMetadata(PluginMetadata.create(metadataKey, fieldValue)))
+        .description(description)
+        .build();
+  }
+
+  public Description rateDescription(String unit, String description) {
+    return new Description(description).setRate().setUnit(unit);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaEventsPublisherMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaEventsPublisherMetrics.java
index 083d032..fbfd1fc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaEventsPublisherMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaEventsPublisherMetrics.java
@@ -16,13 +16,13 @@
 
 import com.google.gerrit.metrics.Counter1;
 import com.google.gerrit.metrics.Description;
-import com.google.gerrit.metrics.Field;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.kafka.KafkaEventsMetrics;
 
 @Singleton
-public class KafkaEventsPublisherMetrics {
+public class KafkaEventsPublisherMetrics extends KafkaEventsMetrics {
   private static final String PUBLISHER_SUCCESS_COUNTER = "broker_msg_publisher_success_counter";
   private static final String PUBLISHER_FAILURE_COUNTER = "broker_msg_publisher_failure_counter";
 
@@ -38,14 +38,14 @@
             new Description("Number of successfully published messages by the broker publisher")
                 .setRate()
                 .setUnit("messages"),
-            Field.ofString(PUBLISHER_SUCCESS_COUNTER, "Broker message published count"));
+            stringField(PUBLISHER_SUCCESS_COUNTER, "Broker message published count"));
     this.brokerPublisherFailureCounter =
         metricMaker.newCounter(
             "kafka/broker/broker_message_publisher_failure_counter",
             new Description("Number of messages failed to publish by the broker publisher")
                 .setRate()
                 .setUnit("errors"),
-            Field.ofString(PUBLISHER_FAILURE_COUNTER, "Broker failed to publish message count"));
+            stringField(PUBLISHER_FAILURE_COUNTER, "Broker failed to publish message count"));
   }
 
   public void incrementBrokerPublishedMessage() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
index 40415a4..7ef9d7b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -133,6 +133,12 @@
       try {
         while (!closed.get()) {
           if (resetOffset.getAndSet(false)) {
+            // Make sure there is an assignment for this consumer
+            while (consumer.assignment().isEmpty() && !closed.get()) {
+              logger.atInfo().log(
+                  "Resetting offset: no partitions assigned to the consumer, request assignment.");
+              consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
+            }
             consumer.seekToBeginning(consumer.assignment());
           }
           ConsumerRecords<byte[], byte[]> consumerRecords =
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
index 81174f9..54a6590 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
@@ -2,13 +2,13 @@
 
 import com.google.gerrit.metrics.Counter1;
 import com.google.gerrit.metrics.Description;
-import com.google.gerrit.metrics.Field;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.kafka.KafkaEventsMetrics;
 
 @Singleton
-class KafkaEventSubscriberMetrics {
+class KafkaEventSubscriberMetrics extends KafkaEventsMetrics {
 
   private static final String SUBSCRIBER_POLL_FAILURE_COUNTER =
       "subscriber_msg_consumer_poll_failure_counter";
@@ -26,7 +26,7 @@
             new Description("Number of failed attempts to poll messages by the subscriber")
                 .setRate()
                 .setUnit("errors"),
-            Field.ofString(
+            stringField(
                 SUBSCRIBER_POLL_FAILURE_COUNTER, "Subscriber failed to poll messages count"));
     this.subscriberFailureCounter =
         metricMaker.newCounter(
@@ -34,8 +34,7 @@
             new Description("Number of messages failed to consume by the subscriber consumer")
                 .setRate()
                 .setUnit("errors"),
-            Field.ofString(
-                SUBSCRIBER_FAILURE_COUNTER, "Subscriber failed to consume messages count"));
+            stringField(SUBSCRIBER_FAILURE_COUNTER, "Subscriber failed to consume messages count"));
   }
 
   public void incrementSubscriberFailedToPollMessages() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index fbae0c6..73d4a24 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -15,8 +15,13 @@
 package com.googlesource.gerrit.plugins.kafka;
 
 import static com.google.common.truth.Truth.assertThat;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.fail;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterables;
 import com.google.gerrit.acceptance.GerritConfig;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
@@ -28,11 +33,15 @@
 import com.google.gerrit.extensions.common.ChangeMessageInfo;
 import com.google.gerrit.server.events.CommentAddedEvent;
 import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
 import com.google.gson.Gson;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -120,7 +129,62 @@
     assertThat(commentAddedEvent.comment).isEqualTo(expectedMessage);
   }
 
+  @Test
+  @UseLocalDisk
+  @GerritConfig(name = "plugin.events-kafka.groupId", value = "test-consumer-group")
+  @GerritConfig(
+      name = "plugin.events-kafka.keyDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(
+      name = "plugin.events-kafka.valueDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(name = "plugin.events-kafka.pollingIntervalMs", value = "500")
+  public void shouldReplayAllEvents() throws InterruptedException {
+    String topic = "a_topic";
+    EventMessage eventMessage =
+        new EventMessage(
+            new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()),
+            new ProjectCreatedEvent());
+
+    Duration WAIT_FOR_POLL_TIMEOUT = Duration.ofMillis(1000);
+
+    List<EventMessage> receivedEvents = new ArrayList<>();
+
+    BrokerApi kafkaBrokerApi = kafkaBrokerApi();
+    kafkaBrokerApi.send(topic, eventMessage);
+
+    kafkaBrokerApi.receiveAsync(topic, receivedEvents::add);
+
+    waitUntil(() -> receivedEvents.size() == 1, WAIT_FOR_POLL_TIMEOUT);
+
+    assertThat(receivedEvents.get(0).getHeader().eventId)
+        .isEqualTo(eventMessage.getHeader().eventId);
+
+    kafkaBrokerApi.replayAllEvents(topic);
+    waitUntil(() -> receivedEvents.size() == 2, WAIT_FOR_POLL_TIMEOUT);
+
+    assertThat(receivedEvents.get(1).getHeader().eventId)
+        .isEqualTo(eventMessage.getHeader().eventId);
+  }
+
+  private BrokerApi kafkaBrokerApi() {
+    return plugin.getSysInjector().getInstance(BrokerApi.class);
+  }
+
   private KafkaProperties kafkaProperties() {
     return plugin.getSysInjector().getInstance(KafkaProperties.class);
   }
+
+  // XXX: Remove this method when merging into stable-3.3, since waitUntil is
+  // available in Gerrit core.
+  public static void waitUntil(Supplier<Boolean> waitCondition, Duration timeout)
+      throws InterruptedException {
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    while (!waitCondition.get()) {
+      if (stopwatch.elapsed().compareTo(timeout) > 0) {
+        throw new InterruptedException();
+      }
+      MILLISECONDS.sleep(50);
+    }
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index eabc833..48350f9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -70,18 +70,6 @@
   private KafkaSession session;
   private Gson gson;
 
-  public static class TestKafkaContainer extends KafkaContainer {
-    public TestKafkaContainer() {
-      addFixedExposedPort(KAFKA_PORT, KAFKA_PORT);
-      addFixedExposedPort(ZOOKEEPER_PORT, ZOOKEEPER_PORT);
-    }
-
-    @Override
-    public String getBootstrapServers() {
-      return String.format("PLAINTEXT://%s:%s", getContainerIpAddress(), KAFKA_PORT);
-    }
-  }
-
   public static class TestWorkQueue extends WorkQueue {
 
     @Inject
@@ -149,7 +137,7 @@
 
   @BeforeClass
   public static void beforeClass() throws Exception {
-    kafka = new TestKafkaContainer();
+    kafka = new KafkaContainer();
     kafka.start();
     System.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
   }