Implement broker api replayAllEvents functionality
This functionality allows to reset kafka offset and redeliver all stored
messages.
Change-Id: I0aeacd79b90c21de95317b212173362a3db9e2e0
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index bfbf487..4f599f7 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -15,6 +15,6 @@
maven_jar(
name = "events-broker",
- artifact = "com.gerritforge:events-broker:3.0.3",
- sha1 = "efdc5bf6897563e2f6f85bfc1b8a5d65e3393424",
+ artifact = "com.gerritforge:events-broker:3.0.4",
+ sha1 = "350b438f532678b1f9a277b7e7b6fa9da4b725b3",
)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
index 78df908..9a7c66a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -69,4 +69,11 @@
.map(s -> TopicSubscriber.topicSubscriber(s.getTopic(), s.getMessageProcessor()))
.collect(Collectors.toSet());
}
+
+ @Override
+ public void replayAllEvents(String topic) {
+ subscribers.stream()
+ .filter(subscriber -> topic.equals(subscriber.getTopic()))
+ .forEach(subscriber -> subscriber.resetOffset());
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
index 6548f2d..d925348 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -46,6 +46,7 @@
private java.util.function.Consumer<EventMessage> messageProcessor;
private String topic;
+ private AtomicBoolean resetOffset = new AtomicBoolean(false);
@Inject
public KafkaEventSubscriber(
@@ -94,6 +95,10 @@
return topic;
}
+ public void resetOffset() {
+ resetOffset.set(true);
+ }
+
private class ReceiverJob implements Runnable {
@Override
@@ -104,6 +109,9 @@
private void consume() {
try {
while (!closed.get()) {
+ if (resetOffset.getAndSet(false)) {
+ consumer.seekToBeginning(consumer.assignment());
+ }
ConsumerRecords<byte[], byte[]> consumerRecords =
consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
consumerRecords.forEach(