Add REST API endpoints for handling stream offsets for topics

* A GET endpoint that can be used to get current offsets
for a topic and when they were last updated
* A POST endpoint to move offsets for a specific topic
either with a absolute value or relatively with a negative
value

Change-Id: I9cce853fdd233c1324e846c84c1fd1c99971e54e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
index 6984f98..6b40a13 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
@@ -48,6 +48,7 @@
 import com.googlesource.gerrit.plugins.rabbitmq.message.GerritEventPublisherFactory;
 import com.googlesource.gerrit.plugins.rabbitmq.message.GsonProvider;
 import com.googlesource.gerrit.plugins.rabbitmq.message.Publisher;
+import com.googlesource.gerrit.plugins.rabbitmq.rest.RestModule;
 import com.googlesource.gerrit.plugins.rabbitmq.session.PublisherSession;
 import com.googlesource.gerrit.plugins.rabbitmq.session.SubscriberSession;
 import com.googlesource.gerrit.plugins.rabbitmq.session.type.AMQPPublisherSession;
@@ -116,6 +117,7 @@
         new FactoryModuleBuilder()
             .implement(EventWorker.class, UserEventWorker.class)
             .build(EventWorkerFactory.class));
+    install(new RestModule());
     bind(SubscriberSession.Factory.class)
         .to(SubscriberSessionFactoryImpl.class)
         .in(Singleton.class);
@@ -133,7 +135,8 @@
       install(rabbitMqBrokerApiModule);
     } else {
       logger.atInfo().log(
-          "The RabbitMqBrokerApi is disabled, set enableBrokerApi to true if you want to enable it");
+          "The RabbitMqBrokerApi is disabled, set enableBrokerApi to true if you want to enable"
+              + " it");
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
index d7acc92..1c474c2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
@@ -23,6 +23,8 @@
 import com.github.rholder.retry.StopStrategies;
 import com.github.rholder.retry.WaitStrategies;
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.restapi.PreconditionFailedException;
+import com.google.gerrit.extensions.restapi.ResourceNotFoundException;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGson;
 import com.google.gson.Gson;
@@ -31,9 +33,12 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
 import com.googlesource.gerrit.plugins.rabbitmq.config.section.Stream;
+import com.googlesource.gerrit.plugins.rabbitmq.session.OffsetInfo;
 import com.googlesource.gerrit.plugins.rabbitmq.session.SubscriberSession;
 import com.googlesource.gerrit.plugins.rabbitmq.session.type.StreamSubscriberSession;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -119,7 +124,8 @@
           });
     } catch (RetryException e) {
       logger.atSevere().withCause(e).log(
-          "Failed to deserialize event %s for %d minutes, stopping retries. This may be due to a plugin missing or failing to load.",
+          "Failed to deserialize event %s for %d minutes, stopping retries. This may be due to a"
+              + " plugin missing or failing to load.",
           messageBody, retryTime);
       return null;
     } catch (ExecutionException e) {
@@ -150,4 +156,52 @@
         "Only streams support the replay functionality, please enable stream support to use this");
     return false;
   }
+
+  public void replayAllEventsAt(String topic, long offset)
+      throws ResourceNotFoundException, PreconditionFailedException {
+
+    if (!properties.getSection(Stream.class).enabled) {
+      throw new PreconditionFailedException("Stream support must be enabled to use replay functionality");
+    }
+    boolean found = false;
+    StreamSubscriberSession streamSession = (StreamSubscriberSession) session;
+
+    // Create a copy to avoid ConcurrentModificationException
+    Map<TopicSubscriber, String> consumerTagsCopy = new HashMap<>(consumerTags);
+    for (Map.Entry<TopicSubscriber, String> entry : consumerTagsCopy.entrySet()) {
+      TopicSubscriber topicSubscriber = entry.getKey();
+      String consumerTag = entry.getValue();
+      if (topicSubscriber.topic().equals(topic)) {
+        streamSession.resetOffset(consumerTag, offset);
+        removeSubscriber(topicSubscriber);
+        addSubscriber(topicSubscriber);
+        found = true;
+      }
+    }
+    if (!found) {
+      throw new ResourceNotFoundException("No subscriber found for topic " + topic);
+    }
+  }
+
+  public List<OffsetInfo> getOffsetInfoForTopic(String topic)
+      throws ResourceNotFoundException, PreconditionFailedException {
+
+    if (!properties.getSection(Stream.class).enabled) {
+      throw new PreconditionFailedException("Stream support must be enabled to use replay functionality");
+    }
+    List<OffsetInfo> offsetInfos = new ArrayList<>();
+    boolean found = false;
+    StreamSubscriberSession streamSession = (StreamSubscriberSession) session;
+    // No need for copying since we're only reading, not modifying
+    for (Map.Entry<TopicSubscriber, String> entry : consumerTags.entrySet()) {
+      if (entry.getKey().topic().equals(topic)) {
+        offsetInfos.add(streamSession.getCurrentOffsetInfo(entry.getValue()));
+        found = true;
+      }
+    }
+    if (!found) {
+      throw new ResourceNotFoundException("No subscriber found for topic " + topic);
+    }
+    return offsetInfos;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/GetOffsetsForTopic.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/GetOffsetsForTopic.java
new file mode 100644
index 0000000..c33e1e2
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/GetOffsetsForTopic.java
@@ -0,0 +1,56 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.rest;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.common.data.GlobalCapability;
+import com.google.gerrit.extensions.annotations.RequiresCapability;
+import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.extensions.restapi.RestReadView;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.rabbitmq.message.BrokerApiSubscribers;
+import com.googlesource.gerrit.plugins.rabbitmq.session.OffsetInfo;
+import java.util.ArrayList;
+import java.util.List;
+
+/** REST endpoint for getting current offsets for a specific topic. */
+@Singleton
+@RequiresCapability(GlobalCapability.ADMINISTRATE_SERVER)
+public class GetOffsetsForTopic implements RestReadView<TopicResource> {
+
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  static class Output {
+    public List<OffsetInfo> offsets;
+
+    Output(List<OffsetInfo> offsets) {
+      this.offsets = offsets;
+    }
+  }
+
+  private final BrokerApiSubscribers brokerApiSubscribers;
+
+  @Inject
+  public GetOffsetsForTopic(BrokerApiSubscribers brokerApiSubscribers) {
+    this.brokerApiSubscribers = brokerApiSubscribers;
+  }
+
+  @Override
+  public Response<Output> apply(TopicResource resource) throws RestApiException {
+    return Response.ok(new Output(brokerApiSubscribers.getOffsetInfoForTopic(resource.getTopicName())));
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/ReplayEventsAtOffset.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/ReplayEventsAtOffset.java
new file mode 100644
index 0000000..41b5f8b
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/ReplayEventsAtOffset.java
@@ -0,0 +1,99 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.rest;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.common.data.GlobalCapability;
+import com.google.gerrit.extensions.annotations.RequiresCapability;
+import com.google.gerrit.extensions.restapi.BadRequestException;
+import com.google.gerrit.extensions.restapi.ResourceNotFoundException;
+import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.extensions.restapi.RestModifyView;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.rabbitmq.message.BrokerApiSubscribers;
+import com.googlesource.gerrit.plugins.rabbitmq.session.OffsetInfo;
+import java.util.List;
+
+/** REST endpoint for replaying all events from a specific offset. */
+@Singleton
+@RequiresCapability(GlobalCapability.ADMINISTRATE_SERVER)
+public class ReplayEventsAtOffset
+    implements RestModifyView<TopicResource, ReplayEventsAtOffset.Input> {
+
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  static class Input {
+    Long offset;
+  }
+
+  static class Output {
+    Long startOffset;
+
+    Output(Long startOffset) {
+      this.startOffset = startOffset;
+    }
+  }
+
+  private final BrokerApiSubscribers brokerApiSubscribers;
+
+  @Inject
+  public ReplayEventsAtOffset(BrokerApiSubscribers brokerApiSubscribers) {
+    this.brokerApiSubscribers = brokerApiSubscribers;
+  }
+
+  @Override
+  public Response<Output> apply(TopicResource resource, Input input) throws RestApiException {
+    if (input == null) {
+      throw new BadRequestException("Request body is required");
+    }
+
+    if (input.offset == null) {
+      throw new BadRequestException("offset field is required");
+    }
+
+    String topic = resource.getTopicName();
+    long replayOffset = input.offset;
+
+    // If offset is negative, calculate relative to current offset
+    if (input.offset < 0) {
+      List<OffsetInfo> offsetInfos = brokerApiSubscribers.getOffsetInfoForTopic(topic);
+
+      if (offsetInfos.isEmpty()) {
+        throw new ResourceNotFoundException("Topic not found: " + topic);
+      }
+
+      // Use the maximum current offset for calculation
+      long currentOffset = offsetInfos.stream().mapToLong(info -> info.offset).max().orElse(0);
+      replayOffset = currentOffset + input.offset;
+
+      if (replayOffset < 0) {
+        replayOffset = 0;
+      }
+
+      logger.atFine().log(
+          "Calculated replay offset %d from current offset %d with relative offset %d for topic %s",
+          replayOffset, currentOffset, input.offset, topic);
+    }
+
+    brokerApiSubscribers.replayAllEventsAt(topic, replayOffset);
+
+    logger.atFine().log(
+        "Successfully replayed events for topic %s at offset %d", topic, replayOffset);
+
+    return Response.ok(new Output(replayOffset));
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/RestModule.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/RestModule.java
new file mode 100644
index 0000000..8aad78e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/RestModule.java
@@ -0,0 +1,33 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.rest;
+
+import static com.google.gerrit.server.config.ConfigResource.CONFIG_KIND;
+
+import com.google.gerrit.extensions.registration.DynamicMap;
+import com.google.gerrit.extensions.restapi.RestApiModule;
+
+public class RestModule extends RestApiModule {
+
+  @Override
+  protected void configure() {
+    // Register the TopicResource kind so Gerrit knows about it
+    DynamicMap.mapOf(binder(), TopicResource.TOPIC_KIND);
+    // Register topics as a child collection
+    child(CONFIG_KIND, "topics").to(TopicsCollection.class);
+    get(TopicResource.TOPIC_KIND, "offsets").to(GetOffsetsForTopic.class);
+    post(TopicResource.TOPIC_KIND, "replay").to(ReplayEventsAtOffset.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicResource.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicResource.java
new file mode 100644
index 0000000..ed22b70
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicResource.java
@@ -0,0 +1,35 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.rest;
+
+import com.google.gerrit.extensions.restapi.RestResource;
+import com.google.gerrit.extensions.restapi.RestView;
+import com.google.inject.TypeLiteral;
+
+/** Resource representing a specific topic in the RabbitMQ plugin. */
+public class TopicResource implements RestResource {
+  public static final TypeLiteral<RestView<TopicResource>> TOPIC_KIND =
+      new TypeLiteral<RestView<TopicResource>>() {};
+
+  private final String topicName;
+
+  public TopicResource(String topicName) {
+    this.topicName = topicName;
+  }
+
+  public String getTopicName() {
+    return topicName;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicsCollection.java
new file mode 100644
index 0000000..d831908
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/rest/TopicsCollection.java
@@ -0,0 +1,51 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.rest;
+
+import com.google.gerrit.extensions.registration.DynamicMap;
+import com.google.gerrit.extensions.restapi.ChildCollection;
+import com.google.gerrit.extensions.restapi.IdString;
+import com.google.gerrit.extensions.restapi.ResourceNotFoundException;
+import com.google.gerrit.extensions.restapi.RestView;
+import com.google.gerrit.server.config.ConfigResource;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/** Collection of topics in the RabbitMQ plugin. */
+@Singleton
+public class TopicsCollection implements ChildCollection<ConfigResource, TopicResource> {
+
+  private final DynamicMap<RestView<TopicResource>> views;
+
+  @Inject
+  public TopicsCollection(DynamicMap<RestView<TopicResource>> views) {
+    this.views = views;
+  }
+
+  @Override
+  public RestView<ConfigResource> list() throws ResourceNotFoundException {
+    throw new ResourceNotFoundException("Topic listing not supported");
+  }
+
+  @Override
+  public TopicResource parse(ConfigResource parent, IdString id) throws ResourceNotFoundException {
+    return new TopicResource(id.get());
+  }
+
+  @Override
+  public DynamicMap<RestView<TopicResource>> views() {
+    return views;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/OffsetInfo.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/OffsetInfo.java
new file mode 100644
index 0000000..d3e0c58
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/OffsetInfo.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.session;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+/** Information about a consumer's current offset and when it was last updated. */
+public class OffsetInfo {
+  public final long offset;
+  public final String lastUpdated;
+
+  public OffsetInfo(long offset, long lastUpdatedMillis) {
+    this.offset = offset;
+    this.lastUpdated = Instant.ofEpochMilli(lastUpdatedMillis)
+        .atZone(ZoneId.systemDefault())
+        .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSubscriberSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSubscriberSession.java
index a266beb..57bfa42 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSubscriberSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSubscriberSession.java
@@ -20,6 +20,7 @@
 import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
 import com.googlesource.gerrit.plugins.rabbitmq.config.section.Exchange;
 import com.googlesource.gerrit.plugins.rabbitmq.config.section.Stream;
+import com.googlesource.gerrit.plugins.rabbitmq.session.OffsetInfo;
 import com.googlesource.gerrit.plugins.rabbitmq.session.SubscriberSession;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.stream.Message;
@@ -34,6 +35,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
 public final class StreamSubscriberSession extends StreamSession implements SubscriberSession {
@@ -72,13 +74,14 @@
     bindStreamToExchange(streamName, topic);
 
     String consumerId = UUID.randomUUID().toString();
+    Handler handler = new Handler(topic, messageBodyConsumer, consumerId);
     com.rabbitmq.stream.Consumer consumer =
         environment.consumerBuilder().stream(streamName)
             .offset(OffsetSpecification.first())
             .name(consumerName)
             .manualTrackingStrategy()
             .builder()
-            .messageHandler(new Handler(topic, messageBodyConsumer, consumerId))
+            .messageHandler(handler)
             .build();
     try {
       logger.atInfo().log(
@@ -91,6 +94,7 @@
     }
 
     consumers.put(consumerId, consumer, false);
+    handler.start();
     return consumerId;
   }
 
@@ -114,11 +118,15 @@
     consumers.reset(consumerId, offset);
   }
 
+  public OffsetInfo getCurrentOffsetInfo(String consumerId) {
+    return consumers.getCurrentOffsetInfo(consumerId);
+  }
+
   private class Consumers {
-    private volatile Map<String, ConsumerPair> consumersMap = new ConcurrentHashMap<>();
+    private volatile Map<String, ConsumerState> consumersMap = new ConcurrentHashMap<>();
 
     void put(String consumerId, com.rabbitmq.stream.Consumer consumer, boolean resetOffset) {
-      consumersMap.put(consumerId, new ConsumerPair(consumer, resetOffset));
+      consumersMap.put(consumerId, new ConsumerState(consumer, resetOffset));
     }
 
     boolean isReset(String consumerId) {
@@ -135,18 +143,37 @@
       }
     }
 
+    void updateCurrentOffset(String consumerId, long offset) {
+      ConsumerState state = consumersMap.get(consumerId);
+      if (state != null) {
+        logger.atFinest().log("Update current offset to %d for consumer %s", offset, consumerId);
+        state.currentOffset.set(offset);
+        state.lastOffsetUpdateTime.set(System.currentTimeMillis());
+      } else {
+        logger.atSevere().log("Failed to update current offset, consumer: %s does not exist", consumerId);
+      }
+    }
+
+    OffsetInfo getCurrentOffsetInfo(String consumerId) {
+      ConsumerState state = consumersMap.get(consumerId);
+      if (state != null) {
+        return new OffsetInfo(state.currentOffset.get(), state.lastOffsetUpdateTime.get());
+      }
+      return new OffsetInfo(-1L, -1L);
+    }
+
     boolean closeConsumer(String consumerId) {
-      ConsumerPair pair = consumersMap.remove(consumerId);
-      if (pair == null) {
+      ConsumerState state = consumersMap.remove(consumerId);
+      if (state == null) {
         return false;
       }
-      pair.consumer.close();
+      state.consumer.close();
       return true;
     }
 
     void close() {
       synchronized (consumersMap) {
-        Iterator<Entry<String, ConsumerPair>> it = consumersMap.entrySet().iterator();
+        Iterator<Entry<String, ConsumerState>> it = consumersMap.entrySet().iterator();
         while (it.hasNext()) {
           it.next().getValue().consumer.close();
           it.remove();
@@ -154,13 +181,24 @@
       }
     }
 
-    private class ConsumerPair {
+    private class ConsumerState {
       com.rabbitmq.stream.Consumer consumer;
       boolean resetOffset;
+      AtomicLong currentOffset;
+      AtomicLong lastOffsetUpdateTime;
 
-      ConsumerPair(com.rabbitmq.stream.Consumer consumer, boolean resetOffset) {
+      ConsumerState(com.rabbitmq.stream.Consumer consumer, boolean resetOffset) {
         this.consumer = consumer;
         this.resetOffset = resetOffset;
+        // Initialize currentOffset with the consumer's stored offset if available
+        long initialOffset = -1L;
+        try {
+          initialOffset = consumer.storedOffset();
+        } catch (Exception e) {
+          logger.atFine().withCause(e).log("Could not get initial stored offset, using -1");
+        }
+        this.currentOffset = new AtomicLong(initialOffset);
+        this.lastOffsetUpdateTime = new AtomicLong(System.currentTimeMillis());
       }
     }
   }
@@ -170,6 +208,7 @@
     private String topic;
     private Consumer<String> messageBodyConsumer;
     private String consumerId;
+    private volatile boolean run = false;
 
     Handler(String topic, Consumer<String> messageBodyConsumer, String consumerId) {
       messageConsumed = new AtomicInteger(0);
@@ -180,10 +219,15 @@
 
     @Override
     public void handle(MessageHandler.Context context, Message message) {
+      while (!run)
+        ;
+
       Stream prop = properties.getSection(Stream.class);
       try {
         logger.atFiner().log(
             "Consume message from topic %s with offset %d", topic, context.offset());
+
+        consumers.updateCurrentOffset(consumerId, context.offset());
         messageBodyConsumer.accept(new String(message.getBodyAsBinary(), "UTF-8"));
         if (messageConsumed.incrementAndGet() % prop.windowSize == 0) {
           com.rabbitmq.stream.Consumer consumer = context.consumer();
@@ -200,5 +244,9 @@
             "Error handling stream message with id %d", message.getPublishingId());
       }
     }
+
+    void start() {
+      run = true;
+    }
   }
 }
diff --git a/src/main/resources/Documentation/rest-api-replay-events.md b/src/main/resources/Documentation/rest-api-replay-events.md
new file mode 100644
index 0000000..da7091d
--- /dev/null
+++ b/src/main/resources/Documentation/rest-api-replay-events.md
@@ -0,0 +1,100 @@
+@PLUGIN@ POST topics/{topic}/replay
+====================================
+
+SYNOPSIS
+--------
+
+```
+POST /config/server/events-rabbitmq~topics/{topic}/replay
+```
+
+DESCRIPTION
+-----------
+Replays all events from a specific offset for a given RabbitMQ topic in the events-rabbitmq plugin.
+
+This endpoint allows administrators to replay messages from a specific point in the stream, which
+is useful for:
+* Recovering from message processing errors
+* Re-processing events after configuration changes
+* Debugging message handling issues
+* Backfilling data after system maintenance
+
+The replay operation will start from the specified offset and process all subsequent messages in
+the topic.
+
+ACCESS
+------
+**Administrators only.** This endpoint requires the `ADMINISTRATE_SERVER` global capability.
+
+PARAMETERS
+----------
+**topic**: The name of the RabbitMQ topic to replay events from. This is specified as a path parameter
+in the URL.
+
+REQUEST BODY
+------------
+The request must include a JSON body with the following field:
+
+* **offset** (number, required): The offset position to start replaying from. Can be:
+  - A positive number for an absolute offset position
+  - A negative number for a relative offset from the current maximum offset (e.g., -10 means
+  "10 messages back from current position")
+
+EXAMPLES
+--------
+
+Replay events from absolute offset 1000 for the "gerrit" topic:
+
+```
+curl -X POST --user admin:secret \
+  -H "Content-Type: application/json" \
+  -d '{"offset": 1000}' \
+  http://host:port/a/config/server/events-rabbitmq~topics/gerrit/replay
+```
+
+Replay starting 50 messages back from the current position for the "gerrit" topic:
+
+```
+curl -X POST --user admin:secret \
+  -H "Content-Type: application/json" \
+  -d '{"offset": -50}' \
+  http://host:port/a/config/server/events-rabbitmq~topics/gerrit/replay
+```
+
+Response:
+
+```
+)]}'
+{
+   "startOffset": 1000
+}
+```
+
+**Response Fields:**
+
+* **startOffset** (number): The calculated absolute offset position where the replay operation
+started.
+
+NOTES
+-----
+
+* **Negative offsets**: When using negative offsets, the system calculates the absolute position by
+finding the maximum current offset from all consumers for the topic and adding the negative offset
+value. If the calculated position would be less than 0, it starts from offset 0.
+* **Consumer reset**: The replay operation will reset and recreate consumers for the specified topic.
+* **Offset behavior**: The offset specifies the starting position for replay:
+  - **Offset 0**: The first event (at offset 0) will be the first event processed
+  - **Offset n**: The event at offset n+1 will be the first event processed (offset n is used as the starting point, then processing begins from the next message)
+
+SEE ALSO
+--------
+
+* [GET topics/{topic}/offsets](rest-api-topics-offsets-get.html)
+* [Plugin Configuration](config.html)
+* [Message Processing](message.html)
+* [Plugin Development](../../../Documentation/dev-plugins.html)
+* [REST API Protocol Details](../../../Documentation/rest-api.html#_protocol_details)
+
+GERRIT
+------
+Part of [Gerrit Code Review](../../../Documentation/index.html)
diff --git a/src/main/resources/Documentation/rest-api-topics-offsets-get.md b/src/main/resources/Documentation/rest-api-topics-offsets-get.md
new file mode 100644
index 0000000..d5aa275
--- /dev/null
+++ b/src/main/resources/Documentation/rest-api-topics-offsets-get.md
@@ -0,0 +1,87 @@
+@PLUGIN@ GET topics/{topic}/offsets
+=====================================
+
+SYNOPSIS
+--------
+
+```
+GET /config/server/events-rabbitmq~topics/{topic}/offsets
+```
+
+DESCRIPTION
+-----------
+Gets the current consumer offsets for a specific RabbitMQ topic in the events-rabbitmq plugin.
+
+This endpoint allows administrators to monitor the current position of consumers for a given topic,
+which is useful for monitoring message processing status and identifying potential backlogs.
+
+ACCESS
+------
+**Administrators only.** This endpoint requires the `ADMINISTRATE_SERVER` global capability.
+
+PARAMETERS
+----------
+**topic**: The name of the RabbitMQ topic to get offsets for. This is specified as a path parameter
+in the URL.
+
+EXAMPLES
+--------
+
+Get the current offsets for the "gerrit" topic:
+
+```
+curl -X GET --user admin:secret \
+  http://host:port/a/config/server/events-rabbitmq~topics/gerrit/offsets
+```
+
+Response:
+
+```
+)]}'
+{
+   "offsets": [
+     {
+       "offset": 162,
+       "lastUpdated": "2025-10-07T14:30:15.123Z"
+     },
+     {
+       "offset": 158,
+       "lastUpdated": "2025-10-07T14:30:12.456Z"
+     },
+     {
+       "offset": 145,
+       "lastUpdated": "2025-10-07T14:29:58.789Z"
+     }
+   ]
+}
+```
+
+**Response Fields:**
+
+* **offsets** (array): List of offset information objects for each consumer on this topic.
+  * **offset** (number): The current offset position of the consumer.
+  * **lastUpdated** (string): ISO 8601 timestamp indicating when this offset was last updated.
+
+NOTES
+-----
+
+* Each consumer may be at a different offset position depending on processing speed and when it was
+started.
+* Offset values represent the position of the last processed message for each consumer.
+* An offset value of -1 indicates that the consumer hasn't processed any messages yet.
+* The `lastUpdated` timestamp shows when the offset was last modified, which helps identify stale
+or inactive consumers.
+* All timestamps are returned in UTC timezone using ISO 8601 format.
+* The list contains one offset information object per consumer for the specified topic.
+
+SEE ALSO
+--------
+
+* [POST topics/{topic}/replay](rest-api-replay-events.html)
+* [Plugin Configuration](config.html)
+* [Plugin Development](../../../Documentation/dev-plugins.html)
+* [REST API Protocol Details](../../../Documentation/rest-api.html#_protocol_details)
+
+GERRIT
+------
+Part of [Gerrit Code Review](../../../Documentation/index.html)