Merge "Merge branch 'stable-2.15' into stable-2.16" into stable-2.16
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index c996e1d..0a35f54 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -18,7 +18,6 @@
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventListener;
-import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
@@ -36,9 +35,11 @@
 
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
-  private static final int MONITOR_FIRSTTIME_DELAY = 15000;
-
   private static final int MAX_EVENTS = 16384;
+  private static final int MONITOR_FIRSTTIME_DELAY = 15000;
+  private static final String END_OF_STREAM = "END-OF-STREAM_$F7;XTSUQ(Dv#N6]g+gd,,uzRp%G-P";
+  private static final Event EOS = new Event(END_OF_STREAM) {};
+
   private final Session session;
   private final Properties properties;
   private final Gson gson;
@@ -46,7 +47,7 @@
   private final LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<>(MAX_EVENTS);
   private final Object sessionMon = new Object();
   private EventListener eventListener;
-  private CancelableRunnable publisher;
+  private GracefullyCancelableRunnable publisher;
   private Thread publisherThread;
 
   @Inject
@@ -64,8 +65,9 @@
           @Override
           public void onEvent(Event event) {
             if (!publisherThread.isAlive()) {
-              publisherThread.start();
+              ensurePublisherThreadStarted();
             }
+
             if (queue.offer(event)) {
               if (lostEventCount > 0) {
                 logger.atWarning().log(
@@ -80,15 +82,18 @@
           }
         };
     this.publisher =
-        new CancelableRunnable() {
+        new GracefullyCancelableRunnable() {
 
-          boolean canceled = false;
+          volatile boolean canceled = false;
 
           @Override
           public void run() {
             while (!canceled) {
               try {
                 Event event = queue.take();
+                if (event.getType().equals(END_OF_STREAM)) {
+                  continue;
+                }
                 while (!isConnected() && !canceled) {
                   synchronized (sessionMon) {
                     sessionMon.wait(1000);
@@ -106,7 +111,10 @@
 
           @Override
           public void cancel() {
-            this.canceled = true;
+            canceled = true;
+            if (queue.isEmpty()) {
+              queue.offer(EOS);
+            }
           }
 
           @Override
@@ -121,9 +129,7 @@
 
   @Override
   public void start() {
-    publisherThread = new Thread(publisher);
-    publisherThread.setName("rabbitmq-publisher");
-    publisherThread.start();
+    ensurePublisherThreadStarted();
     if (!isConnected()) {
       connect();
       monitorTimer.schedule(
@@ -185,4 +191,18 @@
       }
     }
   }
+
+  private synchronized void ensurePublisherThreadStarted() {
+    if (publisherThread == null || !publisherThread.isAlive()) {
+      logger.atInfo().log("Creating new publisher thread.");
+      publisherThread = new Thread(publisher);
+      publisherThread.setName("rabbitmq-publisher");
+      publisherThread.start();
+    }
+  }
+  /** Runnable that can be gracefully canceled while running. */
+  private interface GracefullyCancelableRunnable extends Runnable {
+    /** Gracefully cancels the Runnable after completing ongoing task. */
+    public void cancel();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
index c1ac32c..c77ed05 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
@@ -189,8 +189,8 @@
         logger.atInfo().log(MSG("Closing Connection..."));
         connection.close();
       }
-    } catch (IOException ex) {
-      logger.atSevere().withCause(ex).log(MSG("Error when closing connection."));
+    } catch (IOException | ShutdownSignalException ex) {
+      logger.atWarning().withCause(ex).log(MSG("Error when closing connection."));
     } finally {
       connection = null;
     }