Add support for RabbitMQ streams
With streams you will be able to use the replay
functionality. To keep track of the current message an
offset is used. The offset is periodically saved/stored and
when replayAll is called the consumer start to consume from the
beginning of the stream.
Every stream.windowSize we store
(current offset - stream.windowSize) as the offset to make
sure we replay missed messages when we restart a multi-site
instance.
Change-Id: I51a49533248ce94d1e6831071b033869c86db6d8
diff --git a/BUILD b/BUILD
index 441ad42..58248ca 100644
--- a/BUILD
+++ b/BUILD
@@ -13,6 +13,23 @@
deps = [
":events-broker-neverlink",
"@amqp_client//jar",
+ "@stream_client//jar",
+ "@zstd_jni//jar",
+ "@netty_buffer//jar",
+ "@netty_codec//jar",
+ "@netty_codec_base//jar",
+ "@netty_codec_compression//jar",
+ "@netty_codec_marshalling//jar",
+ "@netty_codec_protobuf//jar",
+ "@netty_common//jar",
+ "@netty_handler//jar",
+ "@netty_resolver//jar",
+ "@netty_transport//jar",
+ "@netty_transport_native_unix_common//jar",
+ "@proton_j//jar",
+ "@lz4_java//jar",
+ "@slf4j_api//jar",
+ "@snappy_java//jar",
"@commons-codec//jar:neverlink",
"@commons-io//jar",
"@commons-lang3//jar:neverlink",
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 2827eee..3cda8de 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -6,3 +6,106 @@
artifact = "com.rabbitmq:amqp-client:5.10.0",
sha1 = "4de351467a13b8ca4eb7e8023032f9f964a21796",
)
+
+ maven_jar(
+ name = "stream_client",
+ artifact = "com.rabbitmq:stream-client:1.1.0",
+ sha1 = "585736dd2da4d3da6e72adeef4103cdb362435eb",
+ )
+
+ # Transitive dependencies for stream-client below
+ maven_jar(
+ name = "zstd_jni",
+ artifact = "com.github.luben:zstd-jni:1.5.7-3",
+ sha1 = "dc55c256583fb810f9fe3ede271843b68132fb7c",
+ )
+
+ maven_jar(
+ name = "netty_buffer",
+ artifact = "io.netty:netty-buffer:4.2.1.Final",
+ sha1 = "095e48c931eb31614c06adf775a2118f4297fd97",
+ )
+
+ maven_jar(
+ name = "netty_codec",
+ artifact = "io.netty:netty-codec:4.2.1.Final",
+ sha1 = "55421a0c16ccd25692613e86f75b5a3f8db3ea82",
+ )
+
+ maven_jar(
+ name = "netty_codec_base",
+ artifact = "io.netty:netty-codec-base:4.2.1.Final",
+ sha1 = "ee2a21f30549d6fb0ec1da71ac748c385368d807",
+ )
+
+ maven_jar(
+ name = "netty_codec_compression",
+ artifact = "io.netty:netty-codec-compression:4.2.1.Final",
+ sha1 = "b85ef35b94d1b540c021df6bb778cc8dc086588b",
+ )
+
+ maven_jar(
+ name = "netty_codec_marshalling",
+ artifact = "io.netty:netty-codec-marshalling:4.2.1.Final",
+ sha1 = "838fd5dc1ae854879a9e553a0d217abfd5d7fda9",
+ )
+
+ maven_jar(
+ name = "netty_codec_protobuf",
+ artifact = "io.netty:netty-codec-protobuf:4.2.1.Final",
+ sha1 = "086c04147717b63edadb9e2d1bd371f6091e2ba0",
+ )
+
+ maven_jar(
+ name = "netty_common",
+ artifact = "io.netty:netty-common:4.2.1.Final",
+ sha1 = "ed99ce89380bc3e9b297c9a5d87fbb6e669e1cf4",
+ )
+
+ maven_jar(
+ name = "netty_handler",
+ artifact = "io.netty:netty-handler:4.2.1.Final",
+ sha1 = "1673ad3e66acb5bfaa2dab839ee0d6b22225fbee",
+ )
+
+ maven_jar(
+ name = "netty_resolver",
+ artifact = "io.netty:netty-resolver:4.2.1.Final",
+ sha1 = "80a111e04ee696cca0fe928b7a64787551a848e4",
+ )
+
+ maven_jar(
+ name = "netty_transport",
+ artifact = "io.netty:netty-transport:4.2.1.Final",
+ sha1 = "5c9140fffd02fe833a570dfd00c3dced88c72abd",
+ )
+
+ maven_jar(
+ name = "netty_transport_native_unix_common",
+ artifact = "io.netty:netty-transport-native-unix-common:4.2.1.Final",
+ sha1 = "9edcc29edb12cb33890e6de025fbb60cc3846d6d",
+ )
+
+ maven_jar(
+ name = "proton_j",
+ artifact = "org.apache.qpid:proton-j:0.34.1",
+ sha1 = "e0d6c62cef4929db66dd6df55bee699b2274a9cc",
+ )
+
+ maven_jar(
+ name = "lz4_java",
+ artifact = "org.lz4:lz4-java:1.8.0",
+ sha1 = "4b986a99445e49ea5fbf5d149c4b63f6ed6c6780",
+ )
+
+ maven_jar(
+ name = "slf4j_api",
+ artifact = "org.slf4j:slf4j-api:1.7.36",
+ sha1 = "6c62681a2f655b49963a5983b8b0950a6120ae14",
+ )
+
+ maven_jar(
+ name = "snappy_java",
+ artifact = "org.xerial.snappy:snappy-java:1.1.10.7",
+ sha1 = "3049f95640f4625a945cfab85715f603fa4c8f80",
+ )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
index 56acd18..6984f98 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
@@ -41,6 +41,7 @@
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Message;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Monitor;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Section;
+import com.googlesource.gerrit.plugins.rabbitmq.config.section.Stream;
import com.googlesource.gerrit.plugins.rabbitmq.message.BaseProperties;
import com.googlesource.gerrit.plugins.rabbitmq.message.BasePropertiesProvider;
import com.googlesource.gerrit.plugins.rabbitmq.message.GerritEventPublisher;
@@ -50,7 +51,7 @@
import com.googlesource.gerrit.plugins.rabbitmq.session.PublisherSession;
import com.googlesource.gerrit.plugins.rabbitmq.session.SubscriberSession;
import com.googlesource.gerrit.plugins.rabbitmq.session.type.AMQPPublisherSession;
-import com.googlesource.gerrit.plugins.rabbitmq.session.type.AMQPSubscriberSession;
+import com.googlesource.gerrit.plugins.rabbitmq.session.type.SubscriberSessionFactoryImpl;
import com.googlesource.gerrit.plugins.rabbitmq.worker.DefaultEventWorker;
import com.googlesource.gerrit.plugins.rabbitmq.worker.EventWorker;
import com.googlesource.gerrit.plugins.rabbitmq.worker.EventWorkerFactory;
@@ -97,6 +98,7 @@
sectionBinder.addBinding().to(Message.class);
sectionBinder.addBinding().to(Monitor.class);
sectionBinder.addBinding().to(General.class);
+ sectionBinder.addBinding().to(Stream.class);
install(
new FactoryModuleBuilder()
@@ -104,10 +106,6 @@
.build(AMQPPublisherSession.Factory.class));
install(
new FactoryModuleBuilder()
- .implement(SubscriberSession.class, AMQPSubscriberSession.class)
- .build(AMQPSubscriberSession.Factory.class));
- install(
- new FactoryModuleBuilder()
.implement(Publisher.class, GerritEventPublisher.class)
.build(GerritEventPublisherFactory.class));
install(
@@ -118,6 +116,9 @@
new FactoryModuleBuilder()
.implement(EventWorker.class, UserEventWorker.class)
.build(EventWorkerFactory.class));
+ bind(SubscriberSession.Factory.class)
+ .to(SubscriberSessionFactoryImpl.class)
+ .in(Singleton.class);
bind(Gson.class).toProvider(GsonProvider.class).in(Singleton.class);
DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApi.java
index ed72e1c..649ac8a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApi.java
@@ -74,8 +74,11 @@
@Override
public void replayAllEvents(String topic) {
- throw new NotImplementedException(
- "The RabbitMqBrokerApi does not support replayAllEvents yet.");
+ for (TopicSubscriber topicSubscriber : topicSubscribers) {
+ if (topicSubscriber.topic().equals(topic)) {
+ subscribers.replayAllEvents(topicSubscriber);
+ }
+ }
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Stream.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Stream.java
new file mode 100644
index 0000000..7ac3059
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Stream.java
@@ -0,0 +1,53 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.config.section;
+
+import com.google.common.flogger.FluentLogger;
+import com.googlesource.gerrit.plugins.rabbitmq.annotation.Default;
+import java.util.ArrayList;
+import java.util.List;
+
+public class Stream implements Section {
+
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ @Default("false")
+ public Boolean enabled;
+
+ @Default public String uri;
+
+ @Default public String streamPrefix;
+
+ @Default public String consumerPrefix;
+
+ @Default("500")
+ public Integer windowSize;
+
+ public boolean isValid() {
+ List<String> missingSettings = new ArrayList<>(3);
+
+ if (uri.isEmpty()) missingSettings.add("uri");
+ if (streamPrefix.isEmpty()) missingSettings.add("streamPrefix");
+ if (consumerPrefix.isEmpty()) missingSettings.add("consumerPrefix");
+
+ if (!missingSettings.isEmpty()) {
+ logger.atSevere().log(
+ "Missing mandatory stream settings: %s, please set these if you want to use streams.",
+ missingSettings);
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
index ed80334..d7acc92 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
@@ -30,8 +30,9 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
+import com.googlesource.gerrit.plugins.rabbitmq.config.section.Stream;
import com.googlesource.gerrit.plugins.rabbitmq.session.SubscriberSession;
-import com.googlesource.gerrit.plugins.rabbitmq.session.type.AMQPSubscriberSession;
+import com.googlesource.gerrit.plugins.rabbitmq.session.type.StreamSubscriberSession;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -48,7 +49,7 @@
@Inject
public BrokerApiSubscribers(
- AMQPSubscriberSession.Factory sessionFactory,
+ SubscriberSession.Factory sessionFactory,
@EventGson Gson gson,
@BrokerApiProperties Properties properties) {
this.properties = properties;
@@ -136,4 +137,17 @@
}
return session.removeSubscriber(consumerTag);
}
+
+ public boolean replayAllEvents(TopicSubscriber topicSubscriber) {
+ if (properties.getSection(Stream.class).enabled) {
+ StreamSubscriberSession streamSession = (StreamSubscriberSession) session;
+ streamSession.resetOffset(consumerTags.get(topicSubscriber), 0);
+ removeSubscriber(topicSubscriber);
+ addSubscriber(topicSubscriber);
+ return true;
+ }
+ logger.atWarning().log(
+ "Only streams support the replay functionality, please enable stream support to use this");
+ return false;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/SubscriberSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/SubscriberSession.java
index 5fd1577..bd0ff42 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/SubscriberSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/SubscriberSession.java
@@ -14,9 +14,14 @@
package com.googlesource.gerrit.plugins.rabbitmq.session;
+import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
import java.util.function.Consumer;
public interface SubscriberSession extends Session {
+ public interface Factory {
+ SubscriberSession create(Properties properties);
+ }
+
String addSubscriber(String topic, Consumer<String> messageBodyConsumer);
boolean removeSubscriber(String consumerTag);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
index dcf12e3..e06810a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
@@ -33,7 +33,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
-public abstract class AMQPSession implements Session {
+public class AMQPSession implements Session {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private volatile Connection connection;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java
index d19e023..b32537a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java
@@ -38,10 +38,6 @@
public final class AMQPSubscriberSession extends AMQPSession implements SubscriberSession {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- public interface Factory {
- SubscriberSession create(Properties properties);
- }
-
private volatile Map<String, Channel> channels = new ConcurrentHashMap<>();
@Inject
@@ -69,7 +65,7 @@
try {
channel.basicQos(amqp.consumerPrefetch > 0 ? amqp.consumerPrefetch : 0);
- } catch(IOException ex) {
+ } catch (IOException ex) {
logger.atSevere().withCause(ex).log("Error when trying to set consumer prefetch");
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSession.java
new file mode 100644
index 0000000..8cf282a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSession.java
@@ -0,0 +1,78 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.session.type;
+
+import com.google.common.flogger.FluentLogger;
+import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
+import com.googlesource.gerrit.plugins.rabbitmq.config.section.AMQP;
+import com.googlesource.gerrit.plugins.rabbitmq.config.section.Gerrit;
+import com.googlesource.gerrit.plugins.rabbitmq.config.section.Stream;
+import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
+import com.rabbitmq.stream.Environment;
+import com.rabbitmq.stream.EnvironmentBuilder;
+import org.apache.commons.lang3.StringUtils;
+
+public abstract class StreamSession implements Session {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ protected volatile Environment environment;
+
+ protected final Properties properties;
+
+ public StreamSession(Properties properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public boolean isOpen() {
+ if (environment != null) {
+ logger.atFine().log("Assuming stream session is open since Environment exists");
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized boolean connect() {
+ AMQP amqp = properties.getSection(AMQP.class);
+ Stream stream = properties.getSection(Stream.class);
+
+ if (!stream.isValid()) {
+ return false;
+ }
+ EnvironmentBuilder builder = Environment.builder();
+ builder.uri(stream.uri);
+ if (StringUtils.isNotEmpty(amqp.username)) {
+ builder.username(amqp.username);
+ }
+ Gerrit gerrit = properties.getSection(Gerrit.class);
+ String securePassword = gerrit.getAMQPUserPassword(amqp.username);
+ if (StringUtils.isNotEmpty(securePassword)) {
+ builder.password(securePassword);
+ } else if (StringUtils.isNotEmpty(amqp.password)) {
+ builder.password(amqp.password);
+ }
+
+ environment = builder.build();
+ logger.atInfo().log("Environment built for %s.", stream.uri);
+ return true;
+ }
+
+ @Override
+ public void disconnect() {
+ environment.close();
+ environment = null;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSubscriberSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSubscriberSession.java
new file mode 100644
index 0000000..a266beb
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/StreamSubscriberSession.java
@@ -0,0 +1,204 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.session.type;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
+import com.googlesource.gerrit.plugins.rabbitmq.config.section.Exchange;
+import com.googlesource.gerrit.plugins.rabbitmq.config.section.Stream;
+import com.googlesource.gerrit.plugins.rabbitmq.session.SubscriberSession;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.stream.Message;
+import com.rabbitmq.stream.MessageHandler;
+import com.rabbitmq.stream.NoOffsetException;
+import com.rabbitmq.stream.OffsetSpecification;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+public final class StreamSubscriberSession extends StreamSession implements SubscriberSession {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private volatile Consumers consumers = new Consumers();
+
+ @Inject
+ public StreamSubscriberSession(@Assisted Properties properties) {
+ super(properties);
+ }
+
+ @Override
+ public void disconnect() {
+ logger.atInfo().log("Disconnecting subscriber session...");
+ consumers.close();
+ super.disconnect();
+ }
+
+ @Override
+ public String addSubscriber(String topic, Consumer<String> messageBodyConsumer) {
+ if (environment == null) {
+ if (!connect()) {
+ logger.atSevere().log("Failed to connect to rabbitMQ with environment");
+ return null;
+ }
+ }
+ Stream streamProp = properties.getSection(Stream.class);
+
+ String streamName = streamProp.streamPrefix + "." + topic;
+ String consumerName = streamProp.consumerPrefix + "." + topic;
+ if (!environment.streamExists(streamName)) {
+ environment.streamCreator().stream(streamName).create();
+ }
+
+ bindStreamToExchange(streamName, topic);
+
+ String consumerId = UUID.randomUUID().toString();
+ com.rabbitmq.stream.Consumer consumer =
+ environment.consumerBuilder().stream(streamName)
+ .offset(OffsetSpecification.first())
+ .name(consumerName)
+ .manualTrackingStrategy()
+ .builder()
+ .messageHandler(new Handler(topic, messageBodyConsumer, consumerId))
+ .build();
+ try {
+ logger.atInfo().log(
+ "Consumer added for topic %s, consuming from stream-offset %d",
+ topic, consumer.storedOffset());
+ } catch (NoOffsetException ex) {
+ logger.atInfo().withCause(ex).log(
+ "No offset found for consumer that listens on topic %s, consuming from start of stream.",
+ topic);
+ }
+
+ consumers.put(consumerId, consumer, false);
+ return consumerId;
+ }
+
+ private void bindStreamToExchange(String streamName, String topic) {
+ String exchangeName = properties.getSection(Exchange.class).name;
+ AMQPSession session = new AMQPSession(properties);
+ try (Channel channel = session.createChannel()) {
+ channel.queueBind(streamName, exchangeName, topic);
+ } catch (IOException | TimeoutException ex) {
+ logger.atSevere().withCause(ex).log("Failed to bind stream to exchange or close channel");
+ }
+ session.disconnect();
+ }
+
+ @Override
+ public boolean removeSubscriber(String consumerId) {
+ return consumers.closeConsumer(consumerId);
+ }
+
+ public void resetOffset(String consumerId, long offset) {
+ consumers.reset(consumerId, offset);
+ }
+
+ private class Consumers {
+ private volatile Map<String, ConsumerPair> consumersMap = new ConcurrentHashMap<>();
+
+ void put(String consumerId, com.rabbitmq.stream.Consumer consumer, boolean resetOffset) {
+ consumersMap.put(consumerId, new ConsumerPair(consumer, resetOffset));
+ }
+
+ boolean isReset(String consumerId) {
+ return consumersMap.get(consumerId).resetOffset;
+ }
+
+ void reset(String consumerId, long offset) {
+ com.rabbitmq.stream.Consumer consumer = consumersMap.get(consumerId).consumer;
+ if (consumer != null) {
+ synchronized (consumer) {
+ consumersMap.get(consumerId).resetOffset = true;
+ consumer.store(offset);
+ }
+ }
+ }
+
+ boolean closeConsumer(String consumerId) {
+ ConsumerPair pair = consumersMap.remove(consumerId);
+ if (pair == null) {
+ return false;
+ }
+ pair.consumer.close();
+ return true;
+ }
+
+ void close() {
+ synchronized (consumersMap) {
+ Iterator<Entry<String, ConsumerPair>> it = consumersMap.entrySet().iterator();
+ while (it.hasNext()) {
+ it.next().getValue().consumer.close();
+ it.remove();
+ }
+ }
+ }
+
+ private class ConsumerPair {
+ com.rabbitmq.stream.Consumer consumer;
+ boolean resetOffset;
+
+ ConsumerPair(com.rabbitmq.stream.Consumer consumer, boolean resetOffset) {
+ this.consumer = consumer;
+ this.resetOffset = resetOffset;
+ }
+ }
+ }
+
+ private class Handler implements MessageHandler {
+ private AtomicInteger messageConsumed;
+ private String topic;
+ private Consumer<String> messageBodyConsumer;
+ private String consumerId;
+
+ Handler(String topic, Consumer<String> messageBodyConsumer, String consumerId) {
+ messageConsumed = new AtomicInteger(0);
+ this.topic = topic;
+ this.messageBodyConsumer = messageBodyConsumer;
+ this.consumerId = consumerId;
+ }
+
+ @Override
+ public void handle(MessageHandler.Context context, Message message) {
+ Stream prop = properties.getSection(Stream.class);
+ try {
+ logger.atFiner().log(
+ "Consume message from topic %s with offset %d", topic, context.offset());
+ messageBodyConsumer.accept(new String(message.getBodyAsBinary(), "UTF-8"));
+ if (messageConsumed.incrementAndGet() % prop.windowSize == 0) {
+ com.rabbitmq.stream.Consumer consumer = context.consumer();
+ long newOffset = Math.max(context.offset() - prop.windowSize, 0);
+ logger.atFine().log("Store new offset %d for stream with topic %s", newOffset, topic);
+ synchronized (consumer) {
+ if (!consumers.isReset(consumerId)) {
+ consumer.store(newOffset);
+ }
+ }
+ }
+ } catch (IOException ex) {
+ logger.atSevere().withCause(ex).log(
+ "Error handling stream message with id %d", message.getPublishingId());
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/SubscriberSessionFactoryImpl.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/SubscriberSessionFactoryImpl.java
new file mode 100644
index 0000000..57de967
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/SubscriberSessionFactoryImpl.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.session.type;
+
+import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
+import com.googlesource.gerrit.plugins.rabbitmq.config.section.Stream;
+import com.googlesource.gerrit.plugins.rabbitmq.session.SubscriberSession;
+
+public class SubscriberSessionFactoryImpl implements SubscriberSession.Factory {
+
+ public SubscriberSession create(Properties properties) {
+ if (properties.getSection(Stream.class).enabled) {
+ return new StreamSubscriberSession(properties);
+ }
+ return new AMQPSubscriberSession(properties);
+ }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 54cdee8..3b18e98 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -24,6 +24,12 @@
other configs with the additions of queuePrefix, durable, exclusive and autoDelete that decides
queue properties and consumerPrefetch that is a channel property related to consumers. The
event-broker API use its own publisher that is separate from the previously mentioned publisher.
+You can use streams instead of queues if you enable it with stream.enabled. There is a separate
+set of configuration options for streams.
+Note: If you get certificate host verfication failures the problem may be that the RabbitMQ cluster
+advertise the hostname without the domain. You can verify this by running "rabbitmqctl eval
+'rabbit_stream:tls_host().'" and it can be solved by setting stream.advertised_tls_host as the
+FQDN. For more info check the RabbitMQ documentation.
Secure.config
---------------------
@@ -93,6 +99,30 @@
* `exchange.name`
* The name of exchange.
+* `stream.enabled`
+ * Make the brokerApi subscribe on streams instead of queues, defaults to false. Only used in
+ broker.config.
+
+* `stream.uri`
+ * The URI of RabbitMQ server's endpoint that should be used to create a stream. This needs to
+ be set to use streams. Only used in broker.config.
+
+* `stream.streamPrefix`
+ * The streams that store the messages of the the subscribed topics will be named
+ prefix + '.' + topic. This needs to be set to use streams. Only used in
+ broker.config.
+
+* `stream.consumerPrefix`
+ * The consumers that consume from the streams will be named prefix + '.' + topic. The name is
+ used by the Rabbitmq stream library to retrieve the correct offset from the server. The prefix
+ needs to be unique for each Gerrit instance. This needs to be set to use streams. Only used in
+ broker.config.
+
+* `stream.windowSize`
+ * How many messages that needs to be consumed until we set a new offset. New offset will be
+ offset of the currently proccessed message subtracted by `windowSize`, defaults to 500. Only
+ used in broker.config.
+
* `general.publishAllGerritEvents`
* Will publish gerrit stream events to configured exchange automatically if enabled, defaults
to true.