Make events-rabbitmq manually ack messages after they are consumed
To avoid losing messages that has been taken from the queue
but not consummed by consumers during a restart of Gerrit.
Change-Id: I4397a04fc7971a4b8e60f8c6cc7eec3de630ccfd
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java
index 0e28e0d..9efeb1a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java
@@ -78,10 +78,11 @@
}
channel.queueBind(queueName, exchangeName, topic);
+ boolean autoAck = false;
String consumerTag =
channel.basicConsume(
queueName,
- true,
+ autoAck,
new MessageConsumer(channel, queueName, topic, messageBodyConsumer));
logger.atInfo().log("Subscribed to queue with name %s", queueName);
if (consumerTag != null) {
@@ -139,6 +140,13 @@
String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws UnsupportedEncodingException {
messageBodyConsumer.accept(new String(body, "UTF-8"));
+ long deliveryTag = envelope.getDeliveryTag();
+ try {
+ getChannel().basicAck(deliveryTag, false);
+ } catch (IOException ex) {
+ logger.atSevere().withCause(ex).log(
+ "Error when acknowledging message with sequence number %d", deliveryTag);
+ }
}
@Override