Fix channel handling
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
index d93563d..713b25a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
@@ -36,6 +36,7 @@
   private final Properties properties;
   private Connection connection;
   private Channel publishChannel;
+  private volatile int failureCount = 0;
 
   @Inject
   public AMQPSession(Properties properties) {
@@ -49,6 +50,25 @@
     return false;
   }
 
+  private Channel getPublishChannel() {
+    Channel pubCh = null;
+    if (connection != null) {
+      try {
+        pubCh = connection.createChannel();
+        pubCh.addShutdownListener(this);
+        failureCount = 0;
+      } catch (Exception ex) {
+        LOGGER.warn("Failed to open publish channel.");
+        failureCount++;
+      }
+      if (failureCount > properties.getInt(Keys.MONITOR_FAILURECOUNT)) {
+        LOGGER.warn("Connection has something wrong. So will be disconnected.");
+        disconnect();
+      }
+    }
+    return pubCh;
+  }
+
   public void connect() {
     LOGGER.info("Connect to {}...", properties.getString(Keys.AMQP_URI));
     ConnectionFactory factory = new ConnectionFactory();
@@ -76,79 +96,99 @@
   }
 
   private void setUp() {
-    if (connection != null && publishChannel == null) {
+    if (connection != null) {
+      if (properties.getBoolean(Keys.QUEUE_DECLARE)) {
+        LOGGER.info("Declare queue...");
+        if (StringUtils.isNotEmpty(properties.getString(Keys.QUEUE_NAME))) {
+          createQueue();
+        }
+      }
+
+      if (properties.getBoolean(Keys.EXCHANGE_DECLARE)) {
+        LOGGER.info("Declare exchange...");
+        if (StringUtils.isNotEmpty(properties.getString(Keys.EXCHANGE_NAME))) {
+          createExchange();
+        }
+      }
+
+      if (properties.getBoolean(Keys.BIND_STARTUP)) {
+        if (StringUtils.isNotEmpty(properties.getString(Keys.QUEUE_NAME)) &&
+            StringUtils.isNotEmpty(properties.getString(Keys.EXCHANGE_NAME))) {
+          bind();
+        }
+      }
+      LOGGER.info("Complete to setup channel.");
+    }
+  }
+
+  public void createQueue() {
+    Channel ch;
+    boolean needDeclaration = false;
+    try {
+      ch = connection.createChannel();
+      ch.queueDeclarePassive(properties.getString(Keys.QUEUE_NAME));
+      ch.close();
+      LOGGER.info("Queue \"{}\" already exist.", properties.getString(Keys.QUEUE_NAME));
+    } catch (Exception ex) {
+      needDeclaration = true;
+    }
+
+    if (needDeclaration) {
+      LOGGER.info("Declare queue: {}", properties.getString(Keys.QUEUE_NAME));
       try {
-        Channel ch;
         ch = connection.createChannel();
-        LOGGER.info("Open Channel.");
-
-        if (properties.getBoolean(Keys.QUEUE_DECLARE)) {
-          LOGGER.info("Declare queue...");
-          if (StringUtils.isNotEmpty(properties.getString(Keys.QUEUE_NAME))) {
-            try {
-              ch.queueDeclarePassive(properties.getString(Keys.QUEUE_NAME));
-              LOGGER.info("Queue \"{}\" already exist.", properties.getString(Keys.QUEUE_NAME));
-            } catch (Exception ex) {
-              createQueue(ch);
-            }
-          }
-        }
-
-        if (properties.getBoolean(Keys.EXCHANGE_DECLARE)) {
-          LOGGER.info("Declare exchange...");
-          if (StringUtils.isNotEmpty(properties.getString(Keys.EXCHANGE_NAME))) {
-            try {
-              ch.exchangeDeclarePassive(properties.getString(Keys.EXCHANGE_NAME));
-              LOGGER.info("Exchange \"{}\" already exist.", properties.getString(Keys.EXCHANGE_NAME));
-            } catch (Exception ex) {
-              createExchange(ch);
-            }
-          }
-        }
-
-        if (properties.getBoolean(Keys.BIND_STARTUP)) {
-          if (StringUtils.isNotEmpty(properties.getString(Keys.QUEUE_NAME)) &&
-              StringUtils.isNotEmpty(properties.getString(Keys.EXCHANGE_NAME))) {
-            LOGGER.info("Bind exchange and queue with key: {}", properties.getString(Keys.BIND_ROUTINGKEY));
-            ch.queueBind(properties.getString(Keys.QUEUE_NAME),
-                properties.getString(Keys.EXCHANGE_NAME),
-                properties.getString(Keys.BIND_ROUTINGKEY));
-          }
-        }
-
-        publishChannel = ch;
-        LOGGER.info("Complete to setup channel.");
+        ch.queueDeclare(properties.getString(Keys.QUEUE_NAME),
+            properties.getBoolean(Keys.QUEUE_DURABLE),
+            properties.getBoolean(Keys.QUEUE_EXCLUSIVE),
+            properties.getBoolean(Keys.QUEUE_AUTODELETE), null);
+        ch.close();
       } catch (Exception ex) {
-        LOGGER.warn("Channel has something error. Connection will be disconnected.", ex);
-        disconnect();
+        LOGGER.warn("Failed to declare queue.", ex);
       }
     }
   }
 
-  public void createQueue(Channel ch) throws IOException {
-    LOGGER.info("Declare queue: {}", properties.getString(Keys.QUEUE_NAME));
-    ch.queueDeclare(properties.getString(Keys.QUEUE_NAME),
-        properties.getBoolean(Keys.QUEUE_DURABLE),
-        properties.getBoolean(Keys.QUEUE_EXCLUSIVE),
-        properties.getBoolean(Keys.QUEUE_AUTODELETE), null);
+  public void createExchange() {
+    Channel ch;
+    boolean needDeclaration = false;
+    try {
+      ch = connection.createChannel();
+      ch.exchangeDeclarePassive(properties.getString(Keys.EXCHANGE_NAME));
+      ch.close();
+      LOGGER.info("Exchange \"{}\" already exist.", properties.getString(Keys.EXCHANGE_NAME));
+    } catch (Exception ex) {
+      needDeclaration = true;
+    }
+
+    if (needDeclaration) {
+      LOGGER.info("Declare exchange: {}", properties.getString(Keys.EXCHANGE_NAME));
+      try {
+        ch = connection.createChannel();
+        ch.exchangeDeclare(properties.getString(Keys.EXCHANGE_NAME),
+            properties.getString(Keys.EXCHANGE_TYPE),
+            properties.getBoolean(Keys.EXCHANGE_DURABLE),
+            properties.getBoolean(Keys.EXCHANGE_AUTODELETE), null);
+        ch.close();
+      } catch (Exception ex) {
+        LOGGER.warn("Failed to declare exchange.", ex);
+      }
+    }
   }
 
-  public void createExchange(Channel ch) throws IOException {
-    LOGGER.info("Declare exchange: {}", properties.getString(Keys.EXCHANGE_NAME));
-    ch.exchangeDeclare(properties.getString(Keys.EXCHANGE_NAME),
-        properties.getString(Keys.EXCHANGE_TYPE),
-        properties.getBoolean(Keys.EXCHANGE_DURABLE),
-        properties.getBoolean(Keys.EXCHANGE_AUTODELETE), null);
-  }
-
-  public void bind(Channel ch) throws IOException {
+  public void bind() {
     LOGGER.info("Bind exchange \"{}\" and queue \"{}\"with key: {}", new Object[]{
         properties.getString(Keys.QUEUE_NAME),
         properties.getString(Keys.EXCHANGE_NAME),
         properties.getString(Keys.BIND_ROUTINGKEY)});
-    ch.queueBind(properties.getString(Keys.QUEUE_NAME),
-        properties.getString(Keys.EXCHANGE_NAME),
-        properties.getString(Keys.BIND_ROUTINGKEY));
+    try {
+      Channel ch = connection.createChannel();
+      ch.queueBind(properties.getString(Keys.QUEUE_NAME),
+          properties.getString(Keys.EXCHANGE_NAME),
+          properties.getString(Keys.BIND_ROUTINGKEY));
+      ch.close();
+    } catch (Exception ex) {
+      LOGGER.warn("Failed to declare binding.", ex);
+    }
   }
 
   public void disconnect() {
@@ -166,6 +206,9 @@
   }
 
   public void publishMessage(String message) {
+    if (publishChannel == null) {
+      publishChannel = getPublishChannel();
+    }
     if (publishChannel != null && publishChannel.isOpen()) {
       try {
         LOGGER.debug("Send message.");
@@ -186,22 +229,9 @@
     if (obj instanceof Channel) {
       LOGGER.info("Channel closed.");
       publishChannel = null;
-      if (!exception.isInitiatedByApplication()) {
-        if (connection != null && connection.isOpen()) {
-          try {
-            LOGGER.info("Reopen channel...");
-            publishChannel = connection.createChannel();
-          } catch (Exception ex) {
-            LOGGER.warn("Cannot reopen channel. Connection will be disconnected.", ex);
-            disconnect();
-          }
-        }
-      }
     } else if (obj instanceof Connection) {
       LOGGER.info("Connection disconnected.");
-      if (!exception.isInitiatedByApplication()) {
-        connection = null;
-      }
+      connection = null;
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Keys.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Keys.java
index 43eb3b6..7382a14 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Keys.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Keys.java
@@ -39,7 +39,8 @@
   GERRIT_PORT("gerrit.port", "gerrit-port", 29418),
   GERRIT_FRONT_URL("gerrit.canonicalWebUrl", "gerrit-front-url", ""),
   GERRIT_VERSION("gerrit.version", "gerrit-version", null),
-  MONITOR_INTERVAL("monitor.interval", null, 15000);
+  MONITOR_INTERVAL("monitor.interval", null, 15000),
+  MONITOR_FAILURECOUNT("monitor.failureCount", null, 15);
 
   public String section;
   public String name;
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 35b94f1..0ef4228 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -34,6 +34,7 @@
     port = 24918
   [monitor]
     interval = 15000
+    failureCount = 15
 ```
 
 * `amqp.uri`
@@ -116,6 +117,10 @@
     * The interval time in milliseconds for connection monitor.
       You can specify the value more than 5000.
 
+* `monitor.failureCount`
+    * The count of failure. If the command for publishing message failed in the specified number of times
+      in succession, connection will be renewed.
+
 Default Values
 -----------------
 
@@ -148,3 +153,4 @@
 |gerrit.scheme        | **ssh**
 |gerrit.port          | 29418
 |monitor.interval     | 15000
+|monitor.failureCount | 15