Implement broker api replayAllEvents functionality

This functionality allows to reset kafka offset and redeliver all stored
messages.

Change-Id: I0aeacd79b90c21de95317b212173362a3db9e2e0
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index bfbf487..4f599f7 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -15,6 +15,6 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.0.3",
-        sha1 = "efdc5bf6897563e2f6f85bfc1b8a5d65e3393424",
+        artifact = "com.gerritforge:events-broker:3.0.4",
+        sha1 = "350b438f532678b1f9a277b7e7b6fa9da4b725b3",
     )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
index 78df908..9a7c66a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -69,4 +69,11 @@
         .map(s -> TopicSubscriber.topicSubscriber(s.getTopic(), s.getMessageProcessor()))
         .collect(Collectors.toSet());
   }
+
+  @Override
+  public void replayAllEvents(String topic) {
+    subscribers.stream()
+        .filter(subscriber -> topic.equals(subscriber.getTopic()))
+        .forEach(subscriber -> subscriber.resetOffset());
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
index 6548f2d..d925348 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -46,6 +46,7 @@
   private java.util.function.Consumer<EventMessage> messageProcessor;
 
   private String topic;
+  private AtomicBoolean resetOffset = new AtomicBoolean(false);
 
   @Inject
   public KafkaEventSubscriber(
@@ -94,6 +95,10 @@
     return topic;
   }
 
+  public void resetOffset() {
+    resetOffset.set(true);
+  }
+
   private class ReceiverJob implements Runnable {
 
     @Override
@@ -104,6 +109,9 @@
     private void consume() {
       try {
         while (!closed.get()) {
+          if (resetOffset.getAndSet(false)) {
+            consumer.seekToBeginning(consumer.assignment());
+          }
           ConsumerRecords<byte[], byte[]> consumerRecords =
               consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
           consumerRecords.forEach(