Add REST API endpoints for handling stream offsets for topics
* A GET endpoint that can be used to get current offsets
for a topic and when they were last updated
* A POST endpoint to move offsets for a specific topic
either with a absolute value or relatively with a negative
value
Change-Id: I9cce853fdd233c1324e846c84c1fd1c99971e54e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
index 6984f98..6b40a13 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
@@ -48,6 +48,7 @@
import com.googlesource.gerrit.plugins.rabbitmq.message.GerritEventPublisherFactory;
import com.googlesource.gerrit.plugins.rabbitmq.message.GsonProvider;
import com.googlesource.gerrit.plugins.rabbitmq.message.Publisher;
+import com.googlesource.gerrit.plugins.rabbitmq.rest.RestModule;
import com.googlesource.gerrit.plugins.rabbitmq.session.PublisherSession;
import com.googlesource.gerrit.plugins.rabbitmq.session.SubscriberSession;
import com.googlesource.gerrit.plugins.rabbitmq.session.type.AMQPPublisherSession;
@@ -116,6 +117,7 @@
new FactoryModuleBuilder()
.implement(EventWorker.class, UserEventWorker.class)
.build(EventWorkerFactory.class));
+ install(new RestModule());
bind(SubscriberSession.Factory.class)
.to(SubscriberSessionFactoryImpl.class)
.in(Singleton.class);
@@ -133,7 +135,8 @@
install(rabbitMqBrokerApiModule);
} else {
logger.atInfo().log(
- "The RabbitMqBrokerApi is disabled, set enableBrokerApi to true if you want to enable it");
+ "The RabbitMqBrokerApi is disabled, set enableBrokerApi to true if you want to enable"
+ + " it");
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
index d7acc92..1c474c2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
@@ -23,6 +23,8 @@
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.restapi.PreconditionFailedException;
+import com.google.gerrit.extensions.restapi.ResourceNotFoundException;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventGson;
import com.google.gson.Gson;
@@ -31,9 +33,12 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Stream;
+import com.googlesource.gerrit.plugins.rabbitmq.session.OffsetInfo;
import com.googlesource.gerrit.plugins.rabbitmq.session.SubscriberSession;
import com.googlesource.gerrit.plugins.rabbitmq.session.type.StreamSubscriberSession;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -119,7 +124,8 @@
});
} catch (RetryException e) {
logger.atSevere().withCause(e).log(
- "Failed to deserialize event %s for %d minutes, stopping retries. This may be due to a plugin missing or failing to load.",
+ "Failed to deserialize event %s for %d minutes, stopping retries. This may be due to a"
+ + " plugin missing or failing to load.",
messageBody, retryTime);
return null;
} catch (ExecutionException e) {
@@ -150,4 +156,52 @@
"Only streams support the replay functionality, please enable stream support to use this");
return false;
}
+
+ public void replayAllEventsAt(String topic, long offset)
+ throws ResourceNotFoundException, PreconditionFailedException {
+
+ if (!properties.getSection(Stream.class).enabled) {
+ throw new PreconditionFailedException("Stream support must be enabled to use replay functionality");
+ }
+ boolean found = false;
+ StreamSubscriberSession streamSession = (StreamSubscriberSession) session;
+
+ // Create a copy to avoid ConcurrentModificationException
+ Map<TopicSubscriber, String> consumerTagsCopy = new HashMap<>(consumerTags);
+ for (Map.Entry<TopicSubscriber, String> entry : consumerTagsCopy.entrySet()) {
+ TopicSubscriber topicSubscriber = entry.getKey();
+ String consumerTag = entry.getValue();
+ if (topicSubscriber.topic().equals(topic)) {
+ streamSession.resetOffset(consumerTag, offset);
+ removeSubscriber(topicSubscriber);
+ addSubscriber(topicSubscriber);
+ found = true;
+ }
+ }
+ if (!found) {
+ throw new ResourceNotFoundException("No subscriber found for topic " + topic);
+ }
+ }
+
+ public List<OffsetInfo> getOffsetInfoForTopic(String topic)
+ throws ResourceNotFoundException, PreconditionFailedException {
+
+ if (!properties.getSection(Stream.class).enabled) {
+ throw new PreconditionFailedException("Stream support must be enabled to use replay functionality");
+ }
+ List<OffsetInfo> offsetInfos = new ArrayList<>();
+ boolean found = false;
+ StreamSubscriberSession streamSession = (StreamSubscriberSession) session;
+ // No need for copying since we're only reading, not modifying
+ for (Map.Entry<TopicSubscriber, String> entry : consumerTags.entrySet()) {
+ if (entry.getKey().topic().equals(topic)) {
+ offsetInfos.add(streamSession.getCurrentOffsetInfo(entry.getValue()));
+ found = true;
+ }
+ }
+ if (!found) {
+ throw new ResourceNotFoundException("No subscriber found for topic " + topic);
+ }
+ return offsetInfos;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/GetOffsetsForTopic.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/GetOffsetsForTopic.java
new file mode 100644
index 0000000..c33e1e2
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/GetOffsetsForTopic.java
@@ -0,0 +1,56 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.rest;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.common.data.GlobalCapability;
+import com.google.gerrit.extensions.annotations.RequiresCapability;
+import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.extensions.restapi.RestReadView;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.rabbitmq.message.BrokerApiSubscribers;
+import com.googlesource.gerrit.plugins.rabbitmq.session.OffsetInfo;
+import java.util.ArrayList;
+import java.util.List;
+
+/** REST endpoint for getting current offsets for a specific topic. */
+@Singleton
+@RequiresCapability(GlobalCapability.ADMINISTRATE_SERVER)
+public class GetOffsetsForTopic implements RestReadView<TopicResource> {
+
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ static class Output {
+ public List<OffsetInfo> offsets;
+
+ Output(List<OffsetInfo> offsets) {
+ this.offsets = offsets;
+ }
+ }
+
+ private final BrokerApiSubscribers brokerApiSubscribers;
+
+ @Inject
+ public GetOffsetsForTopic(BrokerApiSubscribers brokerApiSubscribers) {
+ this.brokerApiSubscribers = brokerApiSubscribers;
+ }
+
+ @Override
+ public Response<Output> apply(TopicResource resource) throws RestApiException {
+ return Response.ok(new Output(brokerApiSubscribers.getOffsetInfoForTopic(resource.getTopicName())));
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/ReplayEventsAtOffset.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/ReplayEventsAtOffset.java
new file mode 100644
index 0000000..41b5f8b
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/ReplayEventsAtOffset.java
@@ -0,0 +1,99 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.rest;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.common.data.GlobalCapability;
+import com.google.gerrit.extensions.annotations.RequiresCapability;
+import com.google.gerrit.extensions.restapi.BadRequestException;
+import com.google.gerrit.extensions.restapi.ResourceNotFoundException;
+import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.extensions.restapi.RestModifyView;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.rabbitmq.message.BrokerApiSubscribers;
+import com.googlesource.gerrit.plugins.rabbitmq.session.OffsetInfo;
+import java.util.List;
+
+/** REST endpoint for replaying all events from a specific offset. */
+@Singleton
+@RequiresCapability(GlobalCapability.ADMINISTRATE_SERVER)
+public class ReplayEventsAtOffset
+ implements RestModifyView<TopicResource, ReplayEventsAtOffset.Input> {
+
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ static class Input {
+ Long offset;
+ }
+
+ static class Output {
+ Long startOffset;
+
+ Output(Long startOffset) {
+ this.startOffset = startOffset;
+ }
+ }
+
+ private final BrokerApiSubscribers brokerApiSubscribers;
+
+ @Inject
+ public ReplayEventsAtOffset(BrokerApiSubscribers brokerApiSubscribers) {
+ this.brokerApiSubscribers = brokerApiSubscribers;
+ }
+
+ @Override
+ public Response<Output> apply(TopicResource resource, Input input) throws RestApiException {
+ if (input == null) {
+ throw new BadRequestException("Request body is required");
+ }
+
+ if (input.offset == null) {
+ throw new BadRequestException("offset field is required");
+ }
+
+ String topic = resource.getTopicName();
+ long replayOffset = input.offset;
+
+ // If offset is negative, calculate relative to current offset
+ if (input.offset < 0) {
+ List<OffsetInfo> offsetInfos = brokerApiSubscribers.getOffsetInfoForTopic(topic);
+
+ if (offsetInfos.isEmpty()) {
+ throw new ResourceNotFoundException("Topic not found: " + topic);
+ }
+
+ // Use the maximum current offset for calculation
+ long currentOffset = offsetInfos.stream().mapToLong(info -> info.offset).max().orElse(0);
+ replayOffset = currentOffset + input.offset;
+
+ if (replayOffset < 0) {
+ replayOffset = 0;
+ }
+
+ logger.atFine().log(
+ "Calculated replay offset %d from current offset %d with relative offset %d for topic %s",
+ replayOffset, currentOffset, input.offset, topic);
+ }
+
+ brokerApiSubscribers.replayAllEventsAt(topic, replayOffset);
+
+ logger.atFine().log(
+ "Successfully replayed events for topic %s at offset %d", topic, replayOffset);
+
+ return Response.ok(new Output(replayOffset));
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/RestModule.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/RestModule.java
new file mode 100644
index 0000000..8aad78e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/RestModule.java
@@ -0,0 +1,33 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.rest;
+
+import static com.google.gerrit.server.config.ConfigResource.CONFIG_KIND;
+
+import com.google.gerrit.extensions.registration.DynamicMap;
+import com.google.gerrit.extensions.restapi.RestApiModule;
+
+public class RestModule extends RestApiModule {
+
+ @Override
+ protected void configure() {
+ // Register the TopicResource kind so Gerrit knows about it
+ DynamicMap.mapOf(binder(), TopicResource.TOPIC_KIND);
+ // Register topics as a child collection
+ child(CONFIG_KIND, "topics").to(TopicsCollection.class);
+ get(TopicResource.TOPIC_KIND, "offsets").to(GetOffsetsForTopic.class);
+ post(TopicResource.TOPIC_KIND, "replay").to(ReplayEventsAtOffset.class);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicResource.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicResource.java
new file mode 100644
index 0000000..ed22b70
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicResource.java
@@ -0,0 +1,35 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.rest;
+
+import com.google.gerrit.extensions.restapi.RestResource;
+import com.google.gerrit.extensions.restapi.RestView;
+import com.google.inject.TypeLiteral;
+
+/** Resource representing a specific topic in the RabbitMQ plugin. */
+public class TopicResource implements RestResource {
+ public static final TypeLiteral<RestView<TopicResource>> TOPIC_KIND =
+ new TypeLiteral<RestView<TopicResource>>() {};
+
+ private final String topicName;
+
+ public TopicResource(String topicName) {
+ this.topicName = topicName;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicsCollection.java
new file mode 100644
index 0000000..d831908
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicsCollection.java
@@ -0,0 +1,51 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.rest;
+
+import com.google.gerrit.extensions.registration.DynamicMap;
+import com.google.gerrit.extensions.restapi.ChildCollection;
+import com.google.gerrit.extensions.restapi.IdString;
+import com.google.gerrit.extensions.restapi.ResourceNotFoundException;
+import com.google.gerrit.extensions.restapi.RestView;
+import com.google.gerrit.server.config.ConfigResource;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/** Collection of topics in the RabbitMQ plugin. */
+@Singleton
+public class TopicsCollection implements ChildCollection<ConfigResource, TopicResource> {
+
+ private final DynamicMap<RestView<TopicResource>> views;
+
+ @Inject
+ public TopicsCollection(DynamicMap<RestView<TopicResource>> views) {
+ this.views = views;
+ }
+
+ @Override
+ public RestView<ConfigResource> list() throws ResourceNotFoundException {
+ throw new ResourceNotFoundException("Topic listing not supported");
+ }
+
+ @Override
+ public TopicResource parse(ConfigResource parent, IdString id) throws ResourceNotFoundException {
+ return new TopicResource(id.get());
+ }
+
+ @Override
+ public DynamicMap<RestView<TopicResource>> views() {
+ return views;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/OffsetInfo.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/OffsetInfo.java
new file mode 100644
index 0000000..d3e0c58
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/OffsetInfo.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.session;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+/** Information about a consumer's current offset and when it was last updated. */
+public class OffsetInfo {
+ public final long offset;
+ public final String lastUpdated;
+
+ public OffsetInfo(long offset, long lastUpdatedMillis) {
+ this.offset = offset;
+ this.lastUpdated = Instant.ofEpochMilli(lastUpdatedMillis)
+ .atZone(ZoneId.systemDefault())
+ .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSubscriberSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSubscriberSession.java
index a266beb..57bfa42 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSubscriberSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSubscriberSession.java
@@ -20,6 +20,7 @@
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Exchange;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Stream;
+import com.googlesource.gerrit.plugins.rabbitmq.session.OffsetInfo;
import com.googlesource.gerrit.plugins.rabbitmq.session.SubscriberSession;
import com.rabbitmq.client.Channel;
import com.rabbitmq.stream.Message;
@@ -34,6 +35,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
public final class StreamSubscriberSession extends StreamSession implements SubscriberSession {
@@ -72,13 +74,14 @@
bindStreamToExchange(streamName, topic);
String consumerId = UUID.randomUUID().toString();
+ Handler handler = new Handler(topic, messageBodyConsumer, consumerId);
com.rabbitmq.stream.Consumer consumer =
environment.consumerBuilder().stream(streamName)
.offset(OffsetSpecification.first())
.name(consumerName)
.manualTrackingStrategy()
.builder()
- .messageHandler(new Handler(topic, messageBodyConsumer, consumerId))
+ .messageHandler(handler)
.build();
try {
logger.atInfo().log(
@@ -91,6 +94,7 @@
}
consumers.put(consumerId, consumer, false);
+ handler.start();
return consumerId;
}
@@ -114,11 +118,15 @@
consumers.reset(consumerId, offset);
}
+ public OffsetInfo getCurrentOffsetInfo(String consumerId) {
+ return consumers.getCurrentOffsetInfo(consumerId);
+ }
+
private class Consumers {
- private volatile Map<String, ConsumerPair> consumersMap = new ConcurrentHashMap<>();
+ private volatile Map<String, ConsumerState> consumersMap = new ConcurrentHashMap<>();
void put(String consumerId, com.rabbitmq.stream.Consumer consumer, boolean resetOffset) {
- consumersMap.put(consumerId, new ConsumerPair(consumer, resetOffset));
+ consumersMap.put(consumerId, new ConsumerState(consumer, resetOffset));
}
boolean isReset(String consumerId) {
@@ -135,18 +143,37 @@
}
}
+ void updateCurrentOffset(String consumerId, long offset) {
+ ConsumerState state = consumersMap.get(consumerId);
+ if (state != null) {
+ logger.atFinest().log("Update current offset to %d for consumer %s", offset, consumerId);
+ state.currentOffset.set(offset);
+ state.lastOffsetUpdateTime.set(System.currentTimeMillis());
+ } else {
+ logger.atSevere().log("Failed to update current offset, consumer: %s does not exist", consumerId);
+ }
+ }
+
+ OffsetInfo getCurrentOffsetInfo(String consumerId) {
+ ConsumerState state = consumersMap.get(consumerId);
+ if (state != null) {
+ return new OffsetInfo(state.currentOffset.get(), state.lastOffsetUpdateTime.get());
+ }
+ return new OffsetInfo(-1L, -1L);
+ }
+
boolean closeConsumer(String consumerId) {
- ConsumerPair pair = consumersMap.remove(consumerId);
- if (pair == null) {
+ ConsumerState state = consumersMap.remove(consumerId);
+ if (state == null) {
return false;
}
- pair.consumer.close();
+ state.consumer.close();
return true;
}
void close() {
synchronized (consumersMap) {
- Iterator<Entry<String, ConsumerPair>> it = consumersMap.entrySet().iterator();
+ Iterator<Entry<String, ConsumerState>> it = consumersMap.entrySet().iterator();
while (it.hasNext()) {
it.next().getValue().consumer.close();
it.remove();
@@ -154,13 +181,24 @@
}
}
- private class ConsumerPair {
+ private class ConsumerState {
com.rabbitmq.stream.Consumer consumer;
boolean resetOffset;
+ AtomicLong currentOffset;
+ AtomicLong lastOffsetUpdateTime;
- ConsumerPair(com.rabbitmq.stream.Consumer consumer, boolean resetOffset) {
+ ConsumerState(com.rabbitmq.stream.Consumer consumer, boolean resetOffset) {
this.consumer = consumer;
this.resetOffset = resetOffset;
+ // Initialize currentOffset with the consumer's stored offset if available
+ long initialOffset = -1L;
+ try {
+ initialOffset = consumer.storedOffset();
+ } catch (Exception e) {
+ logger.atFine().withCause(e).log("Could not get initial stored offset, using -1");
+ }
+ this.currentOffset = new AtomicLong(initialOffset);
+ this.lastOffsetUpdateTime = new AtomicLong(System.currentTimeMillis());
}
}
}
@@ -170,6 +208,7 @@
private String topic;
private Consumer<String> messageBodyConsumer;
private String consumerId;
+ private volatile boolean run = false;
Handler(String topic, Consumer<String> messageBodyConsumer, String consumerId) {
messageConsumed = new AtomicInteger(0);
@@ -180,10 +219,15 @@
@Override
public void handle(MessageHandler.Context context, Message message) {
+ while (!run)
+ ;
+
Stream prop = properties.getSection(Stream.class);
try {
logger.atFiner().log(
"Consume message from topic %s with offset %d", topic, context.offset());
+
+ consumers.updateCurrentOffset(consumerId, context.offset());
messageBodyConsumer.accept(new String(message.getBodyAsBinary(), "UTF-8"));
if (messageConsumed.incrementAndGet() % prop.windowSize == 0) {
com.rabbitmq.stream.Consumer consumer = context.consumer();
@@ -200,5 +244,9 @@
"Error handling stream message with id %d", message.getPublishingId());
}
}
+
+ void start() {
+ run = true;
+ }
}
}
diff --git a/src/main/resources/Documentation/rest-api-replay-events.md b/src/main/resources/Documentation/rest-api-replay-events.md
new file mode 100644
index 0000000..da7091d
--- /dev/null
+++ b/src/main/resources/Documentation/rest-api-replay-events.md
@@ -0,0 +1,100 @@
+@PLUGIN@ POST topics/{topic}/replay
+====================================
+
+SYNOPSIS
+--------
+
+```
+POST /config/server/events-rabbitmq~topics/{topic}/replay
+```
+
+DESCRIPTION
+-----------
+Replays all events from a specific offset for a given RabbitMQ topic in the events-rabbitmq plugin.
+
+This endpoint allows administrators to replay messages from a specific point in the stream, which
+is useful for:
+* Recovering from message processing errors
+* Re-processing events after configuration changes
+* Debugging message handling issues
+* Backfilling data after system maintenance
+
+The replay operation will start from the specified offset and process all subsequent messages in
+the topic.
+
+ACCESS
+------
+**Administrators only.** This endpoint requires the `ADMINISTRATE_SERVER` global capability.
+
+PARAMETERS
+----------
+**topic**: The name of the RabbitMQ topic to replay events from. This is specified as a path parameter
+in the URL.
+
+REQUEST BODY
+------------
+The request must include a JSON body with the following field:
+
+* **offset** (number, required): The offset position to start replaying from. Can be:
+ - A positive number for an absolute offset position
+ - A negative number for a relative offset from the current maximum offset (e.g., -10 means
+ "10 messages back from current position")
+
+EXAMPLES
+--------
+
+Replay events from absolute offset 1000 for the "gerrit" topic:
+
+```
+curl -X POST --user admin:secret \
+ -H "Content-Type: application/json" \
+ -d '{"offset": 1000}' \
+ http://host:port/a/config/server/events-rabbitmq~topics/gerrit/replay
+```
+
+Replay starting 50 messages back from the current position for the "gerrit" topic:
+
+```
+curl -X POST --user admin:secret \
+ -H "Content-Type: application/json" \
+ -d '{"offset": -50}' \
+ http://host:port/a/config/server/events-rabbitmq~topics/gerrit/replay
+```
+
+Response:
+
+```
+)]}'
+{
+ "startOffset": 1000
+}
+```
+
+**Response Fields:**
+
+* **startOffset** (number): The calculated absolute offset position where the replay operation
+started.
+
+NOTES
+-----
+
+* **Negative offsets**: When using negative offsets, the system calculates the absolute position by
+finding the maximum current offset from all consumers for the topic and adding the negative offset
+value. If the calculated position would be less than 0, it starts from offset 0.
+* **Consumer reset**: The replay operation will reset and recreate consumers for the specified topic.
+* **Offset behavior**: The offset specifies the starting position for replay:
+ - **Offset 0**: The first event (at offset 0) will be the first event processed
+ - **Offset n**: The event at offset n+1 will be the first event processed (offset n is used as the starting point, then processing begins from the next message)
+
+SEE ALSO
+--------
+
+* [GET topics/{topic}/offsets](rest-api-topics-offsets-get.html)
+* [Plugin Configuration](config.html)
+* [Message Processing](message.html)
+* [Plugin Development](../../../Documentation/dev-plugins.html)
+* [REST API Protocol Details](../../../Documentation/rest-api.html#_protocol_details)
+
+GERRIT
+------
+Part of [Gerrit Code Review](../../../Documentation/index.html)
diff --git a/src/main/resources/Documentation/rest-api-topics-offsets-get.md b/src/main/resources/Documentation/rest-api-topics-offsets-get.md
new file mode 100644
index 0000000..d5aa275
--- /dev/null
+++ b/src/main/resources/Documentation/rest-api-topics-offsets-get.md
@@ -0,0 +1,87 @@
+@PLUGIN@ GET topics/{topic}/offsets
+=====================================
+
+SYNOPSIS
+--------
+
+```
+GET /config/server/events-rabbitmq~topics/{topic}/offsets
+```
+
+DESCRIPTION
+-----------
+Gets the current consumer offsets for a specific RabbitMQ topic in the events-rabbitmq plugin.
+
+This endpoint allows administrators to monitor the current position of consumers for a given topic,
+which is useful for monitoring message processing status and identifying potential backlogs.
+
+ACCESS
+------
+**Administrators only.** This endpoint requires the `ADMINISTRATE_SERVER` global capability.
+
+PARAMETERS
+----------
+**topic**: The name of the RabbitMQ topic to get offsets for. This is specified as a path parameter
+in the URL.
+
+EXAMPLES
+--------
+
+Get the current offsets for the "gerrit" topic:
+
+```
+curl -X GET --user admin:secret \
+ http://host:port/a/config/server/events-rabbitmq~topics/gerrit/offsets
+```
+
+Response:
+
+```
+)]}'
+{
+ "offsets": [
+ {
+ "offset": 162,
+ "lastUpdated": "2025-10-07T14:30:15.123Z"
+ },
+ {
+ "offset": 158,
+ "lastUpdated": "2025-10-07T14:30:12.456Z"
+ },
+ {
+ "offset": 145,
+ "lastUpdated": "2025-10-07T14:29:58.789Z"
+ }
+ ]
+}
+```
+
+**Response Fields:**
+
+* **offsets** (array): List of offset information objects for each consumer on this topic.
+ * **offset** (number): The current offset position of the consumer.
+ * **lastUpdated** (string): ISO 8601 timestamp indicating when this offset was last updated.
+
+NOTES
+-----
+
+* Each consumer may be at a different offset position depending on processing speed and when it was
+started.
+* Offset values represent the position of the last processed message for each consumer.
+* An offset value of -1 indicates that the consumer hasn't processed any messages yet.
+* The `lastUpdated` timestamp shows when the offset was last modified, which helps identify stale
+or inactive consumers.
+* All timestamps are returned in UTC timezone using ISO 8601 format.
+* The list contains one offset information object per consumer for the specified topic.
+
+SEE ALSO
+--------
+
+* [POST topics/{topic}/replay](rest-api-replay-events.html)
+* [Plugin Configuration](config.html)
+* [Plugin Development](../../../Documentation/dev-plugins.html)
+* [REST API Protocol Details](../../../Documentation/rest-api.html#_protocol_details)
+
+GERRIT
+------
+Part of [Gerrit Code Review](../../../Documentation/index.html)