Fix shutdown channel/connection logic
In shutdown process, message in log is a bit strange.
To fix this, this patch adds innter class for safety shutdown.
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/impl/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/impl/AMQPSession.java
index fdfa72d..ca42547 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/impl/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/impl/AMQPSession.java
@@ -27,6 +27,7 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownListener;
+import com.rabbitmq.client.ShutdownNotifier;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.commons.codec.CharEncoding;
@@ -37,7 +38,51 @@
import java.io.IOException;
import java.net.URISyntaxException;
-public final class AMQPSession implements Session, ShutdownListener {
+public final class AMQPSession implements Session {
+
+ private class ShutdownListenerImpl implements ShutdownListener {
+
+ private final Class<?> clazz;
+
+ public <T extends ShutdownNotifier> ShutdownListenerImpl(Class<T> clazz) {
+ this.clazz = clazz;
+ }
+
+ @Override
+ public void shutdownCompleted(ShutdownSignalException cause) {
+ if (cause != null) {
+ Object obj = cause.getReference();
+ if (Channel.class.isInstance(obj)) {
+ Channel.class.cast(obj).removeShutdownListener(this);
+ } else if (Connection.class.isInstance(obj)) {
+ Connection.class.cast(obj).removeShutdownListener(this);
+ }
+ if (clazz.isInstance(obj)) {
+ if (clazz == Channel.class) {
+ Channel ch = Channel.class.cast(obj);
+ if (cause.isInitiatedByApplication()) {
+ LOGGER.info(MSG("Channel #{} closed."), ch.getChannelNumber());
+ } else {
+ LOGGER.info(MSG("Channel #{} suddenly closed."), ch.getChannelNumber());
+ }
+ if (ch.equals(AMQPSession.this.channel)) {
+ AMQPSession.this.channel = null;
+ }
+ } else if (clazz == Connection.class) {
+ Connection conn = Connection.class.cast(obj);
+ if (cause.isInitiatedByApplication()) {
+ LOGGER.info(MSG("Connection closed."));
+ } else {
+ LOGGER.info(MSG("Connection suddenly closed."));
+ }
+ if (conn.equals(AMQPSession.this.connection)) {
+ AMQPSession.this.connection = null;
+ }
+ }
+ }
+ }
+ }
+ }
private static final Logger LOGGER = LoggerFactory.getLogger(AMQPSession.class);
private final Properties properties;
@@ -45,6 +90,9 @@
private volatile Channel channel;
private volatile int failureCount = 0;
+ private final ShutdownListener connectionListener = new ShutdownListenerImpl(Connection.class);
+ private final ShutdownListener channelListener = new ShutdownListenerImpl(Channel.class);
+
@Inject
public AMQPSession(@Assisted Properties properties) {
this.properties = properties;
@@ -69,7 +117,7 @@
} else {
try {
ch = connection.createChannel();
- ch.addShutdownListener(this);
+ ch.addShutdownListener(channelListener);
failureCount = 0;
LOGGER.info(MSG("Channel #{} opened."), ch.getChannelNumber());
} catch (Exception ex) {
@@ -103,7 +151,7 @@
factory.setPassword(amqp.password);
}
connection = factory.newConnection();
- connection.addShutdownListener(this);
+ connection.addShutdownListener(connectionListener);
LOGGER.info(MSG("Connection established."));
}
} catch (URISyntaxException ex) {
@@ -159,24 +207,4 @@
}
}
}
-
- @Override
- public void shutdownCompleted(ShutdownSignalException exception) {
- if (exception.isHardError()) {
- if (exception.isInitiatedByApplication()) {
- LOGGER.info(MSG("Connection closed."));
- } else {
- LOGGER.info(MSG("Connection suddenly closed."));
- connection = null;
- }
- } else {
- Channel ch = (Channel) exception.getReference();
- if (exception.isInitiatedByApplication()) {
- LOGGER.info(MSG("Channel #{} closed."), ch.getChannelNumber());
- } else {
- LOGGER.info(MSG("Channel #{} suddenly closed."), ch.getChannelNumber());
- channel = null;
- }
- }
- }
}