Do not block Gerrit's event bus when queue is full

If rabbitmq goes down and the plugin cannot send messages for a long
enough time, the queue will eventually fill up. queue.put() will block
and wait for the publisher to start consuming from the queue and block
the event bus in Gerrit until queue is non-full.

Producer:
* If queue is full throw away incoming events instead of blocking.
Consumer:
* Wait for event to be available.
* If event is available: take event and wait for an open connection.
* If publish event fails and queue is full: throw away event.

Change-Id: Idbe6a344fbaa8ae8715a5ddac7a1b1e89204f436
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index afa716f..bf0c438 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -30,7 +30,6 @@
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,8 +44,9 @@
   private final Properties properties;
   private final Gson gson;
   private final Timer monitorTimer = new Timer();
-  private EventListener eventListener;
   private final LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<>(MAX_EVENTS);
+  private final Object sessionMon = new Object();
+  private EventListener eventListener;
   private CancelableRunnable publisher;
   private Thread publisherThread;
 
@@ -60,15 +60,22 @@
     this.gson = gson;
     this.eventListener =
         new EventListener() {
+          private int lostEventCount = 0;
+
           @Override
           public void onEvent(Event event) {
-            try {
-              if (!publisherThread.isAlive()) {
-                publisherThread.start();
+            if (!publisherThread.isAlive()) {
+              publisherThread.start();
+            }
+            if (queue.offer(event)) {
+              if (lostEventCount > 0) {
+                LOGGER.warn("Event queue is no longer full, {} events were lost", lostEventCount);
+                lostEventCount = 0;
               }
-              queue.put(event);
-            } catch (InterruptedException e) {
-              LOGGER.warn("Failed to queue event", e);
+            } else {
+              if (lostEventCount++ % 10 == 0) {
+                LOGGER.error("Event queue is full, lost {} event(s)", lostEventCount);
+              }
             }
           }
         };
@@ -81,20 +88,17 @@
           public void run() {
             while (!canceled) {
               try {
-                if (isConnected()) {
-                  Event event = queue.poll(200, TimeUnit.MILLISECONDS);
-                  if (event != null) {
-                    if (isConnected()) {
-                      publishEvent(event);
-                    } else {
-                      queue.put(event);
-                    }
+                Event event = queue.take();
+                while (!isConnected() && !canceled) {
+                  synchronized (sessionMon) {
+                    sessionMon.wait(1000);
                   }
-                } else {
-                  Thread.sleep(1000);
+                }
+                if (!publishEvent(event) && !queue.offer(event)) {
+                  LOGGER.error("Event lost: {}", gson.toJson(event));
                 }
               } catch (InterruptedException e) {
-                LOGGER.warn("Interupted while taking event", e);
+                LOGGER.warn("Interupted while waiting for event or connection.", e);
               }
             }
           }
@@ -119,14 +123,14 @@
     publisherThread = new Thread(publisher);
     publisherThread.start();
     if (!isConnected()) {
-      session.connect();
+      connect();
       monitorTimer.schedule(
           new TimerTask() {
             @Override
             public void run() {
               if (!isConnected()) {
                 LOGGER.info("#start: try to reconnect");
-                session.connect();
+                connect();
               }
             }
           },
@@ -168,7 +172,15 @@
     return session != null && session.isOpen();
   }
 
-  private void publishEvent(Event event) {
-    session.publish(gson.toJson(event));
+  private boolean publishEvent(Event event) {
+    return session.publish(gson.toJson(event));
+  }
+
+  private void connect() {
+    if (!isConnected() && session.connect()) {
+      synchronized (sessionMon) {
+        sessionMon.notifyAll();
+      }
+    }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
index 180aeda..7d0669f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
@@ -16,9 +16,9 @@
 public interface Session {
   boolean isOpen();
 
-  void connect();
+  boolean connect();
 
   void disconnect();
 
-  void publish(String message);
+  boolean publish(String message);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
index d0a872b..25ec9f9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
@@ -133,10 +133,10 @@
   }
 
   @Override
-  public void connect() {
+  public boolean connect() {
     if (connection != null && connection.isOpen()) {
       LOGGER.info(MSG("Already connected."));
-      return;
+      return true;
     }
     AMQP amqp = properties.getSection(AMQP.class);
     LOGGER.info(MSG("Connect to {}..."), amqp.uri);
@@ -157,14 +157,17 @@
         connection = factory.newConnection();
         connection.addShutdownListener(connectionListener);
         LOGGER.info(MSG("Connection established."));
+        return true;
       }
     } catch (URISyntaxException ex) {
       LOGGER.error(MSG("URI syntax error: {}"), amqp.uri);
+
     } catch (IOException ex) {
       LOGGER.error(MSG("Connection cannot be opened."), ex);
     } catch (KeyManagementException | NoSuchAlgorithmException ex) {
       LOGGER.error(MSG("Security error when opening connection."), ex);
     }
+    return false;
   }
 
   @Override
@@ -194,7 +197,7 @@
   }
 
   @Override
-  public void publish(String messageBody) {
+  public boolean publish(String messageBody) {
     if (channel == null || !channel.isOpen()) {
       channel = getChannel();
     }
@@ -208,9 +211,13 @@
             message.routingKey,
             properties.getAMQProperties().getBasicProperties(),
             messageBody.getBytes(CharEncoding.UTF_8));
+        return true;
       } catch (IOException ex) {
         LOGGER.error(MSG("Error when sending meessage."), ex);
+        return false;
       }
     }
+    LOGGER.error(MSG("Cannot open channel."));
+    return false;
   }
 }