Make events-rabbitmq manually ack messages after they are consumed

To avoid losing messages that has been taken from the queue
but not consummed by consumers during a restart of Gerrit.

Change-Id: I4397a04fc7971a4b8e60f8c6cc7eec3de630ccfd
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java
index 0e28e0d..9efeb1a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSubscriberSession.java
@@ -78,10 +78,11 @@
         }
         channel.queueBind(queueName, exchangeName, topic);
 
+        boolean autoAck = false;
         String consumerTag =
             channel.basicConsume(
                 queueName,
-                true,
+                autoAck,
                 new MessageConsumer(channel, queueName, topic, messageBodyConsumer));
         logger.atInfo().log("Subscribed to queue with name %s", queueName);
         if (consumerTag != null) {
@@ -139,6 +140,13 @@
         String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
         throws UnsupportedEncodingException {
       messageBodyConsumer.accept(new String(body, "UTF-8"));
+      long deliveryTag = envelope.getDeliveryTag();
+      try {
+        getChannel().basicAck(deliveryTag, false);
+      } catch (IOException ex) {
+        logger.atSevere().withCause(ex).log(
+            "Error when acknowledging message with sequence number %d", deliveryTag);
+      }
     }
 
     @Override