Merge branch 'stable-2.16'
* stable-2.16:
AMQPSession: Format with google-java-format
Don't block plugin unload when no events available.
Catch ShutDownSignalException when closing connection
MessagePublisher: Threads cannot be restarted
Change-Id: I67864faa188593135466105f27ea280cd2052728
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index c996e1d..0a35f54 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -18,7 +18,6 @@
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventListener;
-import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@@ -36,9 +35,11 @@
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private static final int MONITOR_FIRSTTIME_DELAY = 15000;
-
private static final int MAX_EVENTS = 16384;
+ private static final int MONITOR_FIRSTTIME_DELAY = 15000;
+ private static final String END_OF_STREAM = "END-OF-STREAM_$F7;XTSUQ(Dv#N6]g+gd,,uzRp%G-P";
+ private static final Event EOS = new Event(END_OF_STREAM) {};
+
private final Session session;
private final Properties properties;
private final Gson gson;
@@ -46,7 +47,7 @@
private final LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<>(MAX_EVENTS);
private final Object sessionMon = new Object();
private EventListener eventListener;
- private CancelableRunnable publisher;
+ private GracefullyCancelableRunnable publisher;
private Thread publisherThread;
@Inject
@@ -64,8 +65,9 @@
@Override
public void onEvent(Event event) {
if (!publisherThread.isAlive()) {
- publisherThread.start();
+ ensurePublisherThreadStarted();
}
+
if (queue.offer(event)) {
if (lostEventCount > 0) {
logger.atWarning().log(
@@ -80,15 +82,18 @@
}
};
this.publisher =
- new CancelableRunnable() {
+ new GracefullyCancelableRunnable() {
- boolean canceled = false;
+ volatile boolean canceled = false;
@Override
public void run() {
while (!canceled) {
try {
Event event = queue.take();
+ if (event.getType().equals(END_OF_STREAM)) {
+ continue;
+ }
while (!isConnected() && !canceled) {
synchronized (sessionMon) {
sessionMon.wait(1000);
@@ -106,7 +111,10 @@
@Override
public void cancel() {
- this.canceled = true;
+ canceled = true;
+ if (queue.isEmpty()) {
+ queue.offer(EOS);
+ }
}
@Override
@@ -121,9 +129,7 @@
@Override
public void start() {
- publisherThread = new Thread(publisher);
- publisherThread.setName("rabbitmq-publisher");
- publisherThread.start();
+ ensurePublisherThreadStarted();
if (!isConnected()) {
connect();
monitorTimer.schedule(
@@ -185,4 +191,18 @@
}
}
}
+
+ private synchronized void ensurePublisherThreadStarted() {
+ if (publisherThread == null || !publisherThread.isAlive()) {
+ logger.atInfo().log("Creating new publisher thread.");
+ publisherThread = new Thread(publisher);
+ publisherThread.setName("rabbitmq-publisher");
+ publisherThread.start();
+ }
+ }
+ /** Runnable that can be gracefully canceled while running. */
+ private interface GracefullyCancelableRunnable extends Runnable {
+ /** Gracefully cancels the Runnable after completing ongoing task. */
+ public void cancel();
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
index a1dd775..c77ed05 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
@@ -125,7 +125,8 @@
failureCount.incrementAndGet();
}
if (failureCount.get() > properties.getSection(Monitor.class).failureCount) {
- logger.atWarning().log("Creating channel failed %d times, closing connection.", failureCount.get());
+ logger.atWarning().log(
+ "Creating channel failed %d times, closing connection.", failureCount.get());
disconnect();
}
}
@@ -188,8 +189,8 @@
logger.atInfo().log(MSG("Closing Connection..."));
connection.close();
}
- } catch (IOException ex) {
- logger.atSevere().withCause(ex).log(MSG("Error when closing connection."));
+ } catch (IOException | ShutdownSignalException ex) {
+ logger.atWarning().withCause(ex).log(MSG("Error when closing connection."));
} finally {
connection = null;
}