Add support for consumer prefetch
If no consumer prefetch is set it becomes unlimited which
means that the rabbitmq server will send as many events as
it can, this can cause the client to be overwhelmed and make
it slow to respond.
When the client respond to slowly the server may decide to
close the connection which causes the channels and consumers
to be lost.
With a limited consumer prefetch we can prevent this.
Bug: Issue 366272560
Change-Id: I599aa024202597c468bd3f792ec0931f40a495f9
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/AMQP.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/AMQP.java
index 4efdb35..7b001fd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/AMQP.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/AMQP.java
@@ -37,4 +37,7 @@
public Boolean autoDelete;
@Default public String queuePrefix;
+
+ @Default("300")
+ public Integer consumerPrefetch;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java
index 9efeb1a..d19e023 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java
@@ -65,11 +65,18 @@
@Override
public String addSubscriber(String topic, Consumer<String> messageBodyConsumer) {
Channel channel = createChannel();
+ AMQP amqp = properties.getSection(AMQP.class);
+
+ try {
+ channel.basicQos(amqp.consumerPrefetch > 0 ? amqp.consumerPrefetch : 0);
+ } catch(IOException ex) {
+ logger.atSevere().withCause(ex).log("Error when trying to set consumer prefetch");
+ }
+
if (channel != null && channel.isOpen()) {
String exchangeName = properties.getSection(Exchange.class).name;
try {
String queueName;
- AMQP amqp = properties.getSection(AMQP.class);
if (!amqp.queuePrefix.isEmpty()) {
queueName = amqp.queuePrefix + "." + topic;
channel.queueDeclare(queueName, amqp.durable, amqp.exclusive, amqp.autoDelete, null);
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 2a0fad0..54cdee8 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -22,8 +22,8 @@
To make specific configurations for the events-broker API you do those in
`$site_path/data/@PLUGIN@/broker/broker.config`. You can use the same configuration options as the
other configs with the additions of queuePrefix, durable, exclusive and autoDelete that decides
-queue properties. The event-broker API use its own publisher that is separate from the previously
-mentioned publisher.
+queue properties and consumerPrefetch that is a channel property related to consumers. The
+event-broker API use its own publisher that is separate from the previously mentioned publisher.
Secure.config
---------------------
@@ -87,6 +87,9 @@
* Make the queues automatically deleted when their last consummer stop subscribing. Only used
in broker.config and is only used if `amqp.queuePrefix` is specified.
+* `amqp.consumerPrefetch`
+ * Decide how many events the client can queue for a consumer, defaults to 300.
+
* `exchange.name`
* The name of exchange.