Use queue to hold Events during connection glitches

Change-Id: I262ae8cf5800fea1a8c9ecdd610e385cc5ccf7ff
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index 6c5f450..81ad042 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.rabbitmq.message;
 
+import com.google.gerrit.common.EventListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.server.events.Event;
 import com.google.gson.Gson;
@@ -21,15 +22,20 @@
 import com.google.inject.assistedinject.Assisted;
 
 import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
+import com.googlesource.gerrit.plugins.rabbitmq.config.section.AMQP;
+import com.googlesource.gerrit.plugins.rabbitmq.config.section.Gerrit;
 import com.googlesource.gerrit.plugins.rabbitmq.config.section.Monitor;
 import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
 import com.googlesource.gerrit.plugins.rabbitmq.session.SessionFactoryProvider;
+import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 public class MessagePublisher implements Publisher, LifecycleListener {
 
@@ -37,24 +43,84 @@
 
   private final static int MONITOR_FIRSTTIME_DELAY = 15000;
 
+  private static final int MAX_EVENTS = 16384;
   private final Session session;
   private final Properties properties;
   private final Gson gson;
   private final Timer monitorTimer = new Timer();
   private boolean available = true;
+  private EventListener eventListener;
+  private final LinkedBlockingQueue<Event> queue =
+      new LinkedBlockingQueue<>(MAX_EVENTS);
+  private CancelableRunnable publisher;
+  private Thread publisherThread;
 
   @Inject
   public MessagePublisher(
-      @Assisted Properties properties,
+      @Assisted final Properties properties,
       SessionFactoryProvider sessionFactoryProvider,
       Gson gson) {
     this.session = sessionFactoryProvider.get().create(properties);
     this.properties = properties;
     this.gson = gson;
+    this.eventListener = new EventListener() {
+      @Override
+      public void onEvent(Event event) {
+        try {
+          if (!publisherThread.isAlive()) {
+            publisherThread.start();
+          }
+          queue.put(event);
+        } catch (InterruptedException e) {
+          LOGGER.warn("Failed to queue event", e);
+        }
+      }
+    };
+    this.publisher = new CancelableRunnable() {
+
+      boolean canceled = false;
+
+      @Override
+      public void run() {
+        while (!canceled) {
+          try {
+            if (isEnable() && session.isOpen()) {
+              Event event = queue.poll(200, TimeUnit.MILLISECONDS);
+              if (event != null) {
+                if (isEnable() && session.isOpen()) {
+                  publishEvent(event);
+                } else {
+                  queue.put(event);
+                }
+              }
+            } else {
+              Thread.sleep(1000);
+            }
+          } catch (InterruptedException e) {
+            LOGGER.warn("Interupted while taking event", e);
+          }
+        }
+      }
+
+      @Override
+      public void cancel() {
+        this.canceled = true;
+      }
+
+      @Override
+      public String toString() {
+        return "Rabbitmq publisher: "
+            + properties.getSection(Gerrit.class).listenAs
+            + "-"
+            + properties.getSection(AMQP.class).uri;
+      }
+    };
   }
 
   @Override
   public void start() {
+    publisherThread = new Thread(publisher);
+    publisherThread.start();
     if (!session.isOpen()) {
       session.connect();
       monitorTimer.schedule(new TimerTask() {
@@ -73,18 +139,19 @@
   @Override
   public void stop() {
     monitorTimer.cancel();
+    publisher.cancel();
+    if (publisherThread != null) {
+      try {
+        publisherThread.join();
+      } catch (InterruptedException e) {
+        // Do nothing
+      }
+    }
     session.disconnect();
     available = false;
   }
 
   @Override
-  public void onEvent(Event event) {
-    if (available && session.isOpen()) {
-      session.publish(gson.toJson(event));
-    }
-  }
-
-  @Override
   public void enable() {
     available = true;
   }
@@ -113,4 +180,13 @@
   public String getName() {
     return properties.getName();
   }
+
+  @Override
+  public EventListener getEventListener() {
+    return this.eventListener;
+  }
+
+  private void publishEvent(Event event) {
+    session.publish(gson.toJson(event));
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
index 8cc38cb..e6eaae3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
@@ -5,7 +5,7 @@
 import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
 import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
 
-public interface Publisher extends EventListener {
+public interface Publisher {
   public void start();
   public void stop();
   public void enable();
@@ -14,4 +14,5 @@
   public Session getSession();
   public Properties getProperties();
   public String getName();
+  public EventListener getEventListener();
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java
index 45e0e97..5241ce3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java
@@ -56,7 +56,7 @@
   @Override
   public void onEvent(Event event) {
     for (Publisher publisher : publishers) {
-      publisher.onEvent(event);
+      publisher.getEventListener().onEvent(event);
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java
index 2beee41..76ba4e8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java
@@ -111,7 +111,7 @@
           }
 
           IdentifiedUser user = userFactory.create(userAccount.getId());
-          source.addEventListener(publisher, user);
+          source.addEventListener(publisher.getEventListener(), user);
           LOGGER.info("Listen events as : {}", userName);
         } catch (OrmException e) {
           LOGGER.error("Could not query database for listenAs", e);
@@ -129,7 +129,7 @@
 
   @Override
   public void removePublisher(final Publisher publisher) {
-    source.removeEventListener(publisher);
+    source.removeEventListener(publisher.getEventListener());
   }
 
   @Override