Wait for confirms of published events and add them to cache

Solves: Jira GER-1561
Change-Id: Ie0a41d2db66bfd49f01e0721c1774759d0f7f8a6
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/EiffelEventHubImpl.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/EiffelEventHubImpl.java
index 8db4ce8..96e4093 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/EiffelEventHubImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/EiffelEventHubImpl.java
@@ -205,22 +205,36 @@
     final ReentrantLock takeLock = this.takeLock;
     final ReentrantLock idLookupLock = this.idLookupLock;
     int previousCount = -1;
-    takeLock.lock();
-    idLookupLock.lock();
     try {
-      taken = null;
-      previousCount = count.get();
-      for (EiffelEvent event : events) {
-        idCache.putId(event);
-        EventKey key = EventKey.fromEvent(event);
-        eventsInQueue.remove(key);
-        count.decrementAndGet();
-        logger.atFine().log("Event %s have been ack:ed by publisher", key);
+      takeLock.lockInterruptibly();
+      idLookupLock.lockInterruptibly();
+      try {
+        taken = null;
+        previousCount = count.get();
+        for (EiffelEvent event : events) {
+          idCache.putId(event);
+          EventKey key = EventKey.fromEvent(event);
+          eventsInQueue.remove(key);
+          count.decrementAndGet();
+          logger.atFine().log("Event %s have been ack:ed by publisher", key);
+        }
+        readyForTake.signal();
+      } finally {
+        takeLock.unlock();
+        idLookupLock.unlock();
       }
-      readyForTake.signal();
-    } finally {
-      takeLock.unlock();
-      idLookupLock.unlock();
+    } catch (InterruptedException ie) {
+      logger.atWarning().log("Interupted while ack:ing events, attempting to clean up.");
+      for (EiffelEvent event : events) {
+        EventKey key = EventKey.fromEvent(event);
+        if (eventsInQueue.containsKey(key)) {
+          idCache.putId(event);
+          eventsInQueue.remove(key);
+          count.decrementAndGet();
+          logger.atFine().log("Event %s have been ack:ed by publisher", key);
+        }
+      }
+      logger.atInfo().log("Cleanup complete.");
     }
     if (previousCount == MAX_SIZE) {
       signalReadyForPut();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java
index b192bb3..34f4e5d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java
@@ -41,10 +41,10 @@
     try {
       while (running) {
         events = eventQueue.take(publisher.maxBatchSize());
-        boolean unpublishedEvent = !events.isEmpty();
-        while (running && unpublishedEvent) {
-          unpublishedEvent = !publisher.publish(events);
-          if (!unpublishedEvent) {
+        boolean unpublishedEvents = !events.isEmpty();
+        while (running && unpublishedEvents) {
+          unpublishedEvents = !publisher.publish(events);
+          if (!unpublishedEvents) {
             eventQueue.ack(events);
             events = null;
           } else {
@@ -83,7 +83,11 @@
   public void stop() {
     logger.atInfo().log("Stopping publisher thread");
     this.running = false;
-    thread.interrupt();
+    try {
+      thread.join(5000);
+    } catch (InterruptedException e) {
+      // Do nothing
+    }
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/mq/RabbitMqPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/mq/RabbitMqPublisher.java
index 564c91e..7d45b66 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/mq/RabbitMqPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/mq/RabbitMqPublisher.java
@@ -26,6 +26,7 @@
 import com.rabbitmq.client.Connection;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.TimeoutException;
 
@@ -63,9 +64,20 @@
       if (config.confirmsEnabled()) {
         try {
           return channel.waitForConfirms(config.waitForConfirms());
-        } catch (InterruptedException | TimeoutException e) {
-          logger.atWarning().withCause(e).log("Failed to confirm delivery");
+        } catch (TimeoutException te) {
+          logger.atWarning().withCause(te).log("Failed to confirm delivery");
           return false;
+        } catch (InterruptedException ie) {
+          logger.atWarning().log("Interupted while waiting for confirms, new attempt");
+          try {
+            channel.waitForConfirms(2000);
+            return true;
+          } catch (InterruptedException | TimeoutException e) {
+            logger.atSevere().withCause(e).log(
+                "Failed to confirm pushed events before cleanup: %s",
+                Arrays.toString(events.toArray()));
+            return false;
+          }
         }
       }
       return true;