MessagePublisher: Threads cannot be restarted
If publisherThread is not alive when a new event arrives we currently
try to restart the dead Thread which results in a
IllegalThreadStateException.
If publisherThread has died, create a new Thread and start it.
Synchronize the start-new-thread logic to avoid race conditions
that potentially could start new threads unnecessarily and leave
existing, live, threads orphaned.
Change-Id: I772a3275b16be7f2fb31c141ad9a71f162d53ce1
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index e93f6fc..43715d0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -65,8 +65,9 @@
@Override
public void onEvent(Event event) {
if (!publisherThread.isAlive()) {
- publisherThread.start();
+ ensurePublisherThreadStarted();
}
+
if (queue.offer(event)) {
if (lostEventCount > 0) {
LOGGER.warn("Event queue is no longer full, {} events were lost", lostEventCount);
@@ -120,9 +121,7 @@
@Override
public void start() {
- publisherThread = new Thread(publisher);
- publisherThread.setName("rabbitmq-publisher");
- publisherThread.start();
+ ensurePublisherThreadStarted();
if (!isConnected()) {
connect();
monitorTimer.schedule(
@@ -184,4 +183,13 @@
}
}
}
+
+ private synchronized void ensurePublisherThreadStarted() {
+ if (publisherThread == null || !publisherThread.isAlive()) {
+ LOGGER.info("Creating new publisher thread.");
+ publisherThread = new Thread(publisher);
+ publisherThread.setName("rabbitmq-publisher");
+ publisherThread.start();
+ }
+ }
}