Add REST API endpoints for handling stream offsets for topics * A GET endpoint that can be used to get current offsets for a topic and when they were last updated * A POST endpoint to move offsets for a specific topic either with a absolute value or relatively with a negative value Change-Id: I9cce853fdd233c1324e846c84c1fd1c99971e54e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java index 6984f98..6b40a13 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java +++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
@@ -48,6 +48,7 @@ import com.googlesource.gerrit.plugins.rabbitmq.message.GerritEventPublisherFactory; import com.googlesource.gerrit.plugins.rabbitmq.message.GsonProvider; import com.googlesource.gerrit.plugins.rabbitmq.message.Publisher; +import com.googlesource.gerrit.plugins.rabbitmq.rest.RestModule; import com.googlesource.gerrit.plugins.rabbitmq.session.PublisherSession; import com.googlesource.gerrit.plugins.rabbitmq.session.SubscriberSession; import com.googlesource.gerrit.plugins.rabbitmq.session.type.AMQPPublisherSession; @@ -116,6 +117,7 @@ new FactoryModuleBuilder() .implement(EventWorker.class, UserEventWorker.class) .build(EventWorkerFactory.class)); + install(new RestModule()); bind(SubscriberSession.Factory.class) .to(SubscriberSessionFactoryImpl.class) .in(Singleton.class); @@ -133,7 +135,8 @@ install(rabbitMqBrokerApiModule); } else { logger.atInfo().log( - "The RabbitMqBrokerApi is disabled, set enableBrokerApi to true if you want to enable it"); + "The RabbitMqBrokerApi is disabled, set enableBrokerApi to true if you want to enable" + + " it"); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java index d7acc92..1c474c2 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java +++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
@@ -23,6 +23,8 @@ import com.github.rholder.retry.StopStrategies; import com.github.rholder.retry.WaitStrategies; import com.google.common.flogger.FluentLogger; +import com.google.gerrit.extensions.restapi.PreconditionFailedException; +import com.google.gerrit.extensions.restapi.ResourceNotFoundException; import com.google.gerrit.server.events.Event; import com.google.gerrit.server.events.EventGson; import com.google.gson.Gson; @@ -31,9 +33,12 @@ import com.google.inject.Singleton; import com.googlesource.gerrit.plugins.rabbitmq.config.Properties; import com.googlesource.gerrit.plugins.rabbitmq.config.section.Stream; +import com.googlesource.gerrit.plugins.rabbitmq.session.OffsetInfo; import com.googlesource.gerrit.plugins.rabbitmq.session.SubscriberSession; import com.googlesource.gerrit.plugins.rabbitmq.session.type.StreamSubscriberSession; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -119,7 +124,8 @@ }); } catch (RetryException e) { logger.atSevere().withCause(e).log( - "Failed to deserialize event %s for %d minutes, stopping retries. This may be due to a plugin missing or failing to load.", + "Failed to deserialize event %s for %d minutes, stopping retries. This may be due to a" + + " plugin missing or failing to load.", messageBody, retryTime); return null; } catch (ExecutionException e) { @@ -150,4 +156,52 @@ "Only streams support the replay functionality, please enable stream support to use this"); return false; } + + public void replayAllEventsAt(String topic, long offset) + throws ResourceNotFoundException, PreconditionFailedException { + + if (!properties.getSection(Stream.class).enabled) { + throw new PreconditionFailedException("Stream support must be enabled to use replay functionality"); + } + boolean found = false; + StreamSubscriberSession streamSession = (StreamSubscriberSession) session; + + // Create a copy to avoid ConcurrentModificationException + Map<TopicSubscriber, String> consumerTagsCopy = new HashMap<>(consumerTags); + for (Map.Entry<TopicSubscriber, String> entry : consumerTagsCopy.entrySet()) { + TopicSubscriber topicSubscriber = entry.getKey(); + String consumerTag = entry.getValue(); + if (topicSubscriber.topic().equals(topic)) { + streamSession.resetOffset(consumerTag, offset); + removeSubscriber(topicSubscriber); + addSubscriber(topicSubscriber); + found = true; + } + } + if (!found) { + throw new ResourceNotFoundException("No subscriber found for topic " + topic); + } + } + + public List<OffsetInfo> getOffsetInfoForTopic(String topic) + throws ResourceNotFoundException, PreconditionFailedException { + + if (!properties.getSection(Stream.class).enabled) { + throw new PreconditionFailedException("Stream support must be enabled to use replay functionality"); + } + List<OffsetInfo> offsetInfos = new ArrayList<>(); + boolean found = false; + StreamSubscriberSession streamSession = (StreamSubscriberSession) session; + // No need for copying since we're only reading, not modifying + for (Map.Entry<TopicSubscriber, String> entry : consumerTags.entrySet()) { + if (entry.getKey().topic().equals(topic)) { + offsetInfos.add(streamSession.getCurrentOffsetInfo(entry.getValue())); + found = true; + } + } + if (!found) { + throw new ResourceNotFoundException("No subscriber found for topic " + topic); + } + return offsetInfos; + } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/GetOffsetsForTopic.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/GetOffsetsForTopic.java new file mode 100644 index 0000000..c33e1e2 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/GetOffsetsForTopic.java
@@ -0,0 +1,56 @@ +// Copyright (C) 2025 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.rabbitmq.rest; + +import com.google.common.flogger.FluentLogger; +import com.google.gerrit.common.data.GlobalCapability; +import com.google.gerrit.extensions.annotations.RequiresCapability; +import com.google.gerrit.extensions.restapi.Response; +import com.google.gerrit.extensions.restapi.RestApiException; +import com.google.gerrit.extensions.restapi.RestReadView; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.googlesource.gerrit.plugins.rabbitmq.message.BrokerApiSubscribers; +import com.googlesource.gerrit.plugins.rabbitmq.session.OffsetInfo; +import java.util.ArrayList; +import java.util.List; + +/** REST endpoint for getting current offsets for a specific topic. */ +@Singleton +@RequiresCapability(GlobalCapability.ADMINISTRATE_SERVER) +public class GetOffsetsForTopic implements RestReadView<TopicResource> { + + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + static class Output { + public List<OffsetInfo> offsets; + + Output(List<OffsetInfo> offsets) { + this.offsets = offsets; + } + } + + private final BrokerApiSubscribers brokerApiSubscribers; + + @Inject + public GetOffsetsForTopic(BrokerApiSubscribers brokerApiSubscribers) { + this.brokerApiSubscribers = brokerApiSubscribers; + } + + @Override + public Response<Output> apply(TopicResource resource) throws RestApiException { + return Response.ok(new Output(brokerApiSubscribers.getOffsetInfoForTopic(resource.getTopicName()))); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/ReplayEventsAtOffset.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/ReplayEventsAtOffset.java new file mode 100644 index 0000000..41b5f8b --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/ReplayEventsAtOffset.java
@@ -0,0 +1,99 @@ +// Copyright (C) 2025 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.rabbitmq.rest; + +import com.google.common.flogger.FluentLogger; +import com.google.gerrit.common.data.GlobalCapability; +import com.google.gerrit.extensions.annotations.RequiresCapability; +import com.google.gerrit.extensions.restapi.BadRequestException; +import com.google.gerrit.extensions.restapi.ResourceNotFoundException; +import com.google.gerrit.extensions.restapi.Response; +import com.google.gerrit.extensions.restapi.RestApiException; +import com.google.gerrit.extensions.restapi.RestModifyView; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.googlesource.gerrit.plugins.rabbitmq.message.BrokerApiSubscribers; +import com.googlesource.gerrit.plugins.rabbitmq.session.OffsetInfo; +import java.util.List; + +/** REST endpoint for replaying all events from a specific offset. */ +@Singleton +@RequiresCapability(GlobalCapability.ADMINISTRATE_SERVER) +public class ReplayEventsAtOffset + implements RestModifyView<TopicResource, ReplayEventsAtOffset.Input> { + + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + static class Input { + Long offset; + } + + static class Output { + Long startOffset; + + Output(Long startOffset) { + this.startOffset = startOffset; + } + } + + private final BrokerApiSubscribers brokerApiSubscribers; + + @Inject + public ReplayEventsAtOffset(BrokerApiSubscribers brokerApiSubscribers) { + this.brokerApiSubscribers = brokerApiSubscribers; + } + + @Override + public Response<Output> apply(TopicResource resource, Input input) throws RestApiException { + if (input == null) { + throw new BadRequestException("Request body is required"); + } + + if (input.offset == null) { + throw new BadRequestException("offset field is required"); + } + + String topic = resource.getTopicName(); + long replayOffset = input.offset; + + // If offset is negative, calculate relative to current offset + if (input.offset < 0) { + List<OffsetInfo> offsetInfos = brokerApiSubscribers.getOffsetInfoForTopic(topic); + + if (offsetInfos.isEmpty()) { + throw new ResourceNotFoundException("Topic not found: " + topic); + } + + // Use the maximum current offset for calculation + long currentOffset = offsetInfos.stream().mapToLong(info -> info.offset).max().orElse(0); + replayOffset = currentOffset + input.offset; + + if (replayOffset < 0) { + replayOffset = 0; + } + + logger.atFine().log( + "Calculated replay offset %d from current offset %d with relative offset %d for topic %s", + replayOffset, currentOffset, input.offset, topic); + } + + brokerApiSubscribers.replayAllEventsAt(topic, replayOffset); + + logger.atFine().log( + "Successfully replayed events for topic %s at offset %d", topic, replayOffset); + + return Response.ok(new Output(replayOffset)); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/RestModule.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/RestModule.java new file mode 100644 index 0000000..8aad78e --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/RestModule.java
@@ -0,0 +1,33 @@ +// Copyright (C) 2025 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.rabbitmq.rest; + +import static com.google.gerrit.server.config.ConfigResource.CONFIG_KIND; + +import com.google.gerrit.extensions.registration.DynamicMap; +import com.google.gerrit.extensions.restapi.RestApiModule; + +public class RestModule extends RestApiModule { + + @Override + protected void configure() { + // Register the TopicResource kind so Gerrit knows about it + DynamicMap.mapOf(binder(), TopicResource.TOPIC_KIND); + // Register topics as a child collection + child(CONFIG_KIND, "topics").to(TopicsCollection.class); + get(TopicResource.TOPIC_KIND, "offsets").to(GetOffsetsForTopic.class); + post(TopicResource.TOPIC_KIND, "replay").to(ReplayEventsAtOffset.class); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicResource.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicResource.java new file mode 100644 index 0000000..ed22b70 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicResource.java
@@ -0,0 +1,35 @@ +// Copyright (C) 2025 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.rabbitmq.rest; + +import com.google.gerrit.extensions.restapi.RestResource; +import com.google.gerrit.extensions.restapi.RestView; +import com.google.inject.TypeLiteral; + +/** Resource representing a specific topic in the RabbitMQ plugin. */ +public class TopicResource implements RestResource { + public static final TypeLiteral<RestView<TopicResource>> TOPIC_KIND = + new TypeLiteral<RestView<TopicResource>>() {}; + + private final String topicName; + + public TopicResource(String topicName) { + this.topicName = topicName; + } + + public String getTopicName() { + return topicName; + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicsCollection.java new file mode 100644 index 0000000..d831908 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicsCollection.java
@@ -0,0 +1,51 @@ +// Copyright (C) 2025 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.rabbitmq.rest; + +import com.google.gerrit.extensions.registration.DynamicMap; +import com.google.gerrit.extensions.restapi.ChildCollection; +import com.google.gerrit.extensions.restapi.IdString; +import com.google.gerrit.extensions.restapi.ResourceNotFoundException; +import com.google.gerrit.extensions.restapi.RestView; +import com.google.gerrit.server.config.ConfigResource; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +/** Collection of topics in the RabbitMQ plugin. */ +@Singleton +public class TopicsCollection implements ChildCollection<ConfigResource, TopicResource> { + + private final DynamicMap<RestView<TopicResource>> views; + + @Inject + public TopicsCollection(DynamicMap<RestView<TopicResource>> views) { + this.views = views; + } + + @Override + public RestView<ConfigResource> list() throws ResourceNotFoundException { + throw new ResourceNotFoundException("Topic listing not supported"); + } + + @Override + public TopicResource parse(ConfigResource parent, IdString id) throws ResourceNotFoundException { + return new TopicResource(id.get()); + } + + @Override + public DynamicMap<RestView<TopicResource>> views() { + return views; + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/OffsetInfo.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/OffsetInfo.java new file mode 100644 index 0000000..d3e0c58 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/OffsetInfo.java
@@ -0,0 +1,32 @@ +// Copyright (C) 2025 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.rabbitmq.session; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; + +/** Information about a consumer's current offset and when it was last updated. */ +public class OffsetInfo { + public final long offset; + public final String lastUpdated; + + public OffsetInfo(long offset, long lastUpdatedMillis) { + this.offset = offset; + this.lastUpdated = Instant.ofEpochMilli(lastUpdatedMillis) + .atZone(ZoneId.systemDefault()) + .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSubscriberSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSubscriberSession.java index a266beb..57bfa42 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSubscriberSession.java +++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSubscriberSession.java
@@ -20,6 +20,7 @@ import com.googlesource.gerrit.plugins.rabbitmq.config.Properties; import com.googlesource.gerrit.plugins.rabbitmq.config.section.Exchange; import com.googlesource.gerrit.plugins.rabbitmq.config.section.Stream; +import com.googlesource.gerrit.plugins.rabbitmq.session.OffsetInfo; import com.googlesource.gerrit.plugins.rabbitmq.session.SubscriberSession; import com.rabbitmq.client.Channel; import com.rabbitmq.stream.Message; @@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; public final class StreamSubscriberSession extends StreamSession implements SubscriberSession { @@ -72,13 +74,14 @@ bindStreamToExchange(streamName, topic); String consumerId = UUID.randomUUID().toString(); + Handler handler = new Handler(topic, messageBodyConsumer, consumerId); com.rabbitmq.stream.Consumer consumer = environment.consumerBuilder().stream(streamName) .offset(OffsetSpecification.first()) .name(consumerName) .manualTrackingStrategy() .builder() - .messageHandler(new Handler(topic, messageBodyConsumer, consumerId)) + .messageHandler(handler) .build(); try { logger.atInfo().log( @@ -91,6 +94,7 @@ } consumers.put(consumerId, consumer, false); + handler.start(); return consumerId; } @@ -114,11 +118,15 @@ consumers.reset(consumerId, offset); } + public OffsetInfo getCurrentOffsetInfo(String consumerId) { + return consumers.getCurrentOffsetInfo(consumerId); + } + private class Consumers { - private volatile Map<String, ConsumerPair> consumersMap = new ConcurrentHashMap<>(); + private volatile Map<String, ConsumerState> consumersMap = new ConcurrentHashMap<>(); void put(String consumerId, com.rabbitmq.stream.Consumer consumer, boolean resetOffset) { - consumersMap.put(consumerId, new ConsumerPair(consumer, resetOffset)); + consumersMap.put(consumerId, new ConsumerState(consumer, resetOffset)); } boolean isReset(String consumerId) { @@ -135,18 +143,37 @@ } } + void updateCurrentOffset(String consumerId, long offset) { + ConsumerState state = consumersMap.get(consumerId); + if (state != null) { + logger.atFinest().log("Update current offset to %d for consumer %s", offset, consumerId); + state.currentOffset.set(offset); + state.lastOffsetUpdateTime.set(System.currentTimeMillis()); + } else { + logger.atSevere().log("Failed to update current offset, consumer: %s does not exist", consumerId); + } + } + + OffsetInfo getCurrentOffsetInfo(String consumerId) { + ConsumerState state = consumersMap.get(consumerId); + if (state != null) { + return new OffsetInfo(state.currentOffset.get(), state.lastOffsetUpdateTime.get()); + } + return new OffsetInfo(-1L, -1L); + } + boolean closeConsumer(String consumerId) { - ConsumerPair pair = consumersMap.remove(consumerId); - if (pair == null) { + ConsumerState state = consumersMap.remove(consumerId); + if (state == null) { return false; } - pair.consumer.close(); + state.consumer.close(); return true; } void close() { synchronized (consumersMap) { - Iterator<Entry<String, ConsumerPair>> it = consumersMap.entrySet().iterator(); + Iterator<Entry<String, ConsumerState>> it = consumersMap.entrySet().iterator(); while (it.hasNext()) { it.next().getValue().consumer.close(); it.remove(); @@ -154,13 +181,24 @@ } } - private class ConsumerPair { + private class ConsumerState { com.rabbitmq.stream.Consumer consumer; boolean resetOffset; + AtomicLong currentOffset; + AtomicLong lastOffsetUpdateTime; - ConsumerPair(com.rabbitmq.stream.Consumer consumer, boolean resetOffset) { + ConsumerState(com.rabbitmq.stream.Consumer consumer, boolean resetOffset) { this.consumer = consumer; this.resetOffset = resetOffset; + // Initialize currentOffset with the consumer's stored offset if available + long initialOffset = -1L; + try { + initialOffset = consumer.storedOffset(); + } catch (Exception e) { + logger.atFine().withCause(e).log("Could not get initial stored offset, using -1"); + } + this.currentOffset = new AtomicLong(initialOffset); + this.lastOffsetUpdateTime = new AtomicLong(System.currentTimeMillis()); } } } @@ -170,6 +208,7 @@ private String topic; private Consumer<String> messageBodyConsumer; private String consumerId; + private volatile boolean run = false; Handler(String topic, Consumer<String> messageBodyConsumer, String consumerId) { messageConsumed = new AtomicInteger(0); @@ -180,10 +219,15 @@ @Override public void handle(MessageHandler.Context context, Message message) { + while (!run) + ; + Stream prop = properties.getSection(Stream.class); try { logger.atFiner().log( "Consume message from topic %s with offset %d", topic, context.offset()); + + consumers.updateCurrentOffset(consumerId, context.offset()); messageBodyConsumer.accept(new String(message.getBodyAsBinary(), "UTF-8")); if (messageConsumed.incrementAndGet() % prop.windowSize == 0) { com.rabbitmq.stream.Consumer consumer = context.consumer(); @@ -200,5 +244,9 @@ "Error handling stream message with id %d", message.getPublishingId()); } } + + void start() { + run = true; + } } }
diff --git a/src/main/resources/Documentation/rest-api-replay-events.md b/src/main/resources/Documentation/rest-api-replay-events.md new file mode 100644 index 0000000..da7091d --- /dev/null +++ b/src/main/resources/Documentation/rest-api-replay-events.md
@@ -0,0 +1,100 @@ +@PLUGIN@ POST topics/{topic}/replay +==================================== + +SYNOPSIS +-------- + +``` +POST /config/server/events-rabbitmq~topics/{topic}/replay +``` + +DESCRIPTION +----------- +Replays all events from a specific offset for a given RabbitMQ topic in the events-rabbitmq plugin. + +This endpoint allows administrators to replay messages from a specific point in the stream, which +is useful for: +* Recovering from message processing errors +* Re-processing events after configuration changes +* Debugging message handling issues +* Backfilling data after system maintenance + +The replay operation will start from the specified offset and process all subsequent messages in +the topic. + +ACCESS +------ +**Administrators only.** This endpoint requires the `ADMINISTRATE_SERVER` global capability. + +PARAMETERS +---------- +**topic**: The name of the RabbitMQ topic to replay events from. This is specified as a path parameter +in the URL. + +REQUEST BODY +------------ +The request must include a JSON body with the following field: + +* **offset** (number, required): The offset position to start replaying from. Can be: + - A positive number for an absolute offset position + - A negative number for a relative offset from the current maximum offset (e.g., -10 means + "10 messages back from current position") + +EXAMPLES +-------- + +Replay events from absolute offset 1000 for the "gerrit" topic: + +``` +curl -X POST --user admin:secret \ + -H "Content-Type: application/json" \ + -d '{"offset": 1000}' \ + http://host:port/a/config/server/events-rabbitmq~topics/gerrit/replay +``` + +Replay starting 50 messages back from the current position for the "gerrit" topic: + +``` +curl -X POST --user admin:secret \ + -H "Content-Type: application/json" \ + -d '{"offset": -50}' \ + http://host:port/a/config/server/events-rabbitmq~topics/gerrit/replay +``` + +Response: + +``` +)]}' +{ + "startOffset": 1000 +} +``` + +**Response Fields:** + +* **startOffset** (number): The calculated absolute offset position where the replay operation +started. + +NOTES +----- + +* **Negative offsets**: When using negative offsets, the system calculates the absolute position by +finding the maximum current offset from all consumers for the topic and adding the negative offset +value. If the calculated position would be less than 0, it starts from offset 0. +* **Consumer reset**: The replay operation will reset and recreate consumers for the specified topic. +* **Offset behavior**: The offset specifies the starting position for replay: + - **Offset 0**: The first event (at offset 0) will be the first event processed + - **Offset n**: The event at offset n+1 will be the first event processed (offset n is used as the starting point, then processing begins from the next message) + +SEE ALSO +-------- + +* [GET topics/{topic}/offsets](rest-api-topics-offsets-get.html) +* [Plugin Configuration](config.html) +* [Message Processing](message.html) +* [Plugin Development](../../../Documentation/dev-plugins.html) +* [REST API Protocol Details](../../../Documentation/rest-api.html#_protocol_details) + +GERRIT +------ +Part of [Gerrit Code Review](../../../Documentation/index.html)
diff --git a/src/main/resources/Documentation/rest-api-topics-offsets-get.md b/src/main/resources/Documentation/rest-api-topics-offsets-get.md new file mode 100644 index 0000000..d5aa275 --- /dev/null +++ b/src/main/resources/Documentation/rest-api-topics-offsets-get.md
@@ -0,0 +1,87 @@ +@PLUGIN@ GET topics/{topic}/offsets +===================================== + +SYNOPSIS +-------- + +``` +GET /config/server/events-rabbitmq~topics/{topic}/offsets +``` + +DESCRIPTION +----------- +Gets the current consumer offsets for a specific RabbitMQ topic in the events-rabbitmq plugin. + +This endpoint allows administrators to monitor the current position of consumers for a given topic, +which is useful for monitoring message processing status and identifying potential backlogs. + +ACCESS +------ +**Administrators only.** This endpoint requires the `ADMINISTRATE_SERVER` global capability. + +PARAMETERS +---------- +**topic**: The name of the RabbitMQ topic to get offsets for. This is specified as a path parameter +in the URL. + +EXAMPLES +-------- + +Get the current offsets for the "gerrit" topic: + +``` +curl -X GET --user admin:secret \ + http://host:port/a/config/server/events-rabbitmq~topics/gerrit/offsets +``` + +Response: + +``` +)]}' +{ + "offsets": [ + { + "offset": 162, + "lastUpdated": "2025-10-07T14:30:15.123Z" + }, + { + "offset": 158, + "lastUpdated": "2025-10-07T14:30:12.456Z" + }, + { + "offset": 145, + "lastUpdated": "2025-10-07T14:29:58.789Z" + } + ] +} +``` + +**Response Fields:** + +* **offsets** (array): List of offset information objects for each consumer on this topic. + * **offset** (number): The current offset position of the consumer. + * **lastUpdated** (string): ISO 8601 timestamp indicating when this offset was last updated. + +NOTES +----- + +* Each consumer may be at a different offset position depending on processing speed and when it was +started. +* Offset values represent the position of the last processed message for each consumer. +* An offset value of -1 indicates that the consumer hasn't processed any messages yet. +* The `lastUpdated` timestamp shows when the offset was last modified, which helps identify stale +or inactive consumers. +* All timestamps are returned in UTC timezone using ISO 8601 format. +* The list contains one offset information object per consumer for the specified topic. + +SEE ALSO +-------- + +* [POST topics/{topic}/replay](rest-api-replay-events.html) +* [Plugin Configuration](config.html) +* [Plugin Development](../../../Documentation/dev-plugins.html) +* [REST API Protocol Details](../../../Documentation/rest-api.html#_protocol_details) + +GERRIT +------ +Part of [Gerrit Code Review](../../../Documentation/index.html)