Fix shutdown channel/connection logic

In shutdown process, message in log is a bit strange.
To fix this, this patch adds innter class for safety shutdown.
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/impl/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/impl/AMQPSession.java
index fdfa72d..ca42547 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/impl/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/impl/AMQPSession.java
@@ -27,6 +27,7 @@
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.ShutdownListener;
+import com.rabbitmq.client.ShutdownNotifier;
 import com.rabbitmq.client.ShutdownSignalException;
 
 import org.apache.commons.codec.CharEncoding;
@@ -37,7 +38,51 @@
 import java.io.IOException;
 import java.net.URISyntaxException;
 
-public final class AMQPSession implements Session, ShutdownListener {
+public final class AMQPSession implements Session {
+
+  private class ShutdownListenerImpl implements ShutdownListener {
+
+    private final Class<?> clazz;
+
+    public <T extends ShutdownNotifier> ShutdownListenerImpl(Class<T> clazz) {
+      this.clazz = clazz;
+    }
+
+    @Override
+    public void shutdownCompleted(ShutdownSignalException cause) {
+      if (cause != null) {
+        Object obj = cause.getReference();
+        if (Channel.class.isInstance(obj)) {
+          Channel.class.cast(obj).removeShutdownListener(this);
+        } else if (Connection.class.isInstance(obj)) {
+          Connection.class.cast(obj).removeShutdownListener(this);
+        }
+        if (clazz.isInstance(obj)) {
+          if (clazz == Channel.class) {
+            Channel ch = Channel.class.cast(obj);
+            if (cause.isInitiatedByApplication()) {
+              LOGGER.info(MSG("Channel #{} closed."), ch.getChannelNumber());
+            } else {
+              LOGGER.info(MSG("Channel #{} suddenly closed."), ch.getChannelNumber());
+            }
+            if (ch.equals(AMQPSession.this.channel)) {
+              AMQPSession.this.channel = null;
+            }
+          } else if (clazz == Connection.class) {
+            Connection conn = Connection.class.cast(obj);
+            if (cause.isInitiatedByApplication()) {
+              LOGGER.info(MSG("Connection closed."));
+            } else {
+              LOGGER.info(MSG("Connection suddenly closed."));
+            }
+            if (conn.equals(AMQPSession.this.connection)) {
+              AMQPSession.this.connection = null;
+            }
+          }
+        }
+      }
+    }
+  }
 
   private static final Logger LOGGER = LoggerFactory.getLogger(AMQPSession.class);
   private final Properties properties;
@@ -45,6 +90,9 @@
   private volatile Channel channel;
   private volatile int failureCount = 0;
 
+  private final ShutdownListener connectionListener = new ShutdownListenerImpl(Connection.class);
+  private final ShutdownListener channelListener = new ShutdownListenerImpl(Channel.class);
+
   @Inject
   public AMQPSession(@Assisted Properties properties) {
     this.properties = properties;
@@ -69,7 +117,7 @@
     } else {
       try {
         ch = connection.createChannel();
-        ch.addShutdownListener(this);
+        ch.addShutdownListener(channelListener);
         failureCount = 0;
         LOGGER.info(MSG("Channel #{} opened."), ch.getChannelNumber());
       } catch (Exception ex) {
@@ -103,7 +151,7 @@
           factory.setPassword(amqp.password);
         }
         connection = factory.newConnection();
-        connection.addShutdownListener(this);
+        connection.addShutdownListener(connectionListener);
         LOGGER.info(MSG("Connection established."));
       }
     } catch (URISyntaxException ex) {
@@ -159,24 +207,4 @@
       }
     }
   }
-
-  @Override
-  public void shutdownCompleted(ShutdownSignalException exception) {
-    if (exception.isHardError()) {
-      if (exception.isInitiatedByApplication()) {
-        LOGGER.info(MSG("Connection closed."));
-      } else {
-        LOGGER.info(MSG("Connection suddenly closed."));
-        connection = null;
-      }
-    } else {
-      Channel ch = (Channel) exception.getReference();
-      if (exception.isInitiatedByApplication()) {
-        LOGGER.info(MSG("Channel #{} closed."), ch.getChannelNumber());
-      } else {
-        LOGGER.info(MSG("Channel #{} suddenly closed."), ch.getChannelNumber());
-        channel = null;
-      }
-    }
-  }
 }