Don't block plugin unload when no events available.

Add "End Of Stream" Event if queue is empty.

CancelableRunnable is only supposed to be canceled if not started yet.
Add new interface GracefullyCancelableRunnable.

Change-Id: Id953904961fd18054db53bd5f3ddbe707b31ebaa
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index 43715d0..06da71c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -17,7 +17,6 @@
 import com.google.gerrit.common.EventListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
@@ -36,10 +35,11 @@
 public class MessagePublisher implements Publisher, LifecycleListener {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(MessagePublisher.class);
-
-  private static final int MONITOR_FIRSTTIME_DELAY = 15000;
-
   private static final int MAX_EVENTS = 16384;
+  private static final int MONITOR_FIRSTTIME_DELAY = 15000;
+  private static final String END_OF_STREAM = "END-OF-STREAM_$F7;XTSUQ(Dv#N6]g+gd,,uzRp%G-P";
+  private static final Event EOS = new Event(END_OF_STREAM) {};
+
   private final Session session;
   private final Properties properties;
   private final Gson gson;
@@ -47,7 +47,7 @@
   private final LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<>(MAX_EVENTS);
   private final Object sessionMon = new Object();
   private EventListener eventListener;
-  private CancelableRunnable publisher;
+  private GracefullyCancelableRunnable publisher;
   private Thread publisherThread;
 
   @Inject
@@ -81,15 +81,18 @@
           }
         };
     this.publisher =
-        new CancelableRunnable() {
+        new GracefullyCancelableRunnable() {
 
-          boolean canceled = false;
+          volatile boolean canceled = false;
 
           @Override
           public void run() {
             while (!canceled) {
               try {
                 Event event = queue.take();
+                if (event.getType().equals(END_OF_STREAM)) {
+                  continue;
+                }
                 while (!isConnected() && !canceled) {
                   synchronized (sessionMon) {
                     sessionMon.wait(1000);
@@ -106,7 +109,10 @@
 
           @Override
           public void cancel() {
-            this.canceled = true;
+            canceled = true;
+            if (queue.isEmpty()) {
+              queue.offer(EOS);
+            }
           }
 
           @Override
@@ -192,4 +198,9 @@
       publisherThread.start();
     }
   }
+  /** Runnable that can be gracefully canceled while running. */
+  private interface GracefullyCancelableRunnable extends Runnable {
+    /** Gracefully cancels the Runnable after completing ongoing task. */
+    public void cancel();
+  }
 }