MessagePublisher: Threads cannot be restarted

If publisherThread is not alive when a new event arrives we currently
try to restart the dead Thread which results in a
IllegalThreadStateException.
If publisherThread has died, create a new Thread and start it.

Synchronize the start-new-thread logic to avoid race conditions
that potentially could start new threads unnecessarily and leave
existing, live, threads orphaned.

Change-Id: I772a3275b16be7f2fb31c141ad9a71f162d53ce1
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index e93f6fc..43715d0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -65,8 +65,9 @@
           @Override
           public void onEvent(Event event) {
             if (!publisherThread.isAlive()) {
-              publisherThread.start();
+              ensurePublisherThreadStarted();
             }
+
             if (queue.offer(event)) {
               if (lostEventCount > 0) {
                 LOGGER.warn("Event queue is no longer full, {} events were lost", lostEventCount);
@@ -120,9 +121,7 @@
 
   @Override
   public void start() {
-    publisherThread = new Thread(publisher);
-    publisherThread.setName("rabbitmq-publisher");
-    publisherThread.start();
+    ensurePublisherThreadStarted();
     if (!isConnected()) {
       connect();
       monitorTimer.schedule(
@@ -184,4 +183,13 @@
       }
     }
   }
+
+  private synchronized void ensurePublisherThreadStarted() {
+    if (publisherThread == null || !publisherThread.isAlive()) {
+      LOGGER.info("Creating new publisher thread.");
+      publisherThread = new Thread(publisher);
+      publisherThread.setName("rabbitmq-publisher");
+      publisherThread.start();
+    }
+  }
 }