Improve connection handling and log
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
index b28dabd..d93563d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
@@ -50,7 +50,7 @@
}
public void connect() {
- LOGGER.info("Connect to " + properties.getString(Keys.AMQP_URI) + "...");
+ LOGGER.info("Connect to {}...", properties.getString(Keys.AMQP_URI));
ConnectionFactory factory = new ConnectionFactory();
try {
if (StringUtils.isNotEmpty(properties.getString(Keys.AMQP_URI))) {
@@ -65,62 +65,92 @@
connection.addShutdownListener(this);
LOGGER.info("Connection established.");
}
- bind();
+ setUp();
} catch (URISyntaxException ex) {
- LOGGER.error("URI syntax error: " + properties.getString(Keys.AMQP_URI));
+ LOGGER.error("URI syntax error: {}", properties.getString(Keys.AMQP_URI));
} catch (IOException ex) {
LOGGER.error("Connection cannot be opened.");
} catch (Exception ex) {
- LOGGER.warn("#connect: " + ex.getClass().getName());
+ LOGGER.warn("Connection has something error. it will be disposed.", ex);
}
}
- private void bind() {
+ private void setUp() {
if (connection != null && publishChannel == null) {
try {
- Channel ch = connection.createChannel();
- LOGGER.info("Channel is opened.");
+ Channel ch;
+ ch = connection.createChannel();
+ LOGGER.info("Open Channel.");
+
if (properties.getBoolean(Keys.QUEUE_DECLARE)) {
LOGGER.info("Declare queue...");
if (StringUtils.isNotEmpty(properties.getString(Keys.QUEUE_NAME))) {
- LOGGER.info("Declare queue: " + properties.getString(Keys.QUEUE_NAME));
- ch.queueDeclare(properties.getString(Keys.QUEUE_NAME),
- properties.getBoolean(Keys.QUEUE_DURABLE),
- properties.getBoolean(Keys.QUEUE_EXCLUSIVE),
- properties.getBoolean(Keys.QUEUE_AUTODELETE), null);
- }
-
- if (properties.getBoolean(Keys.EXCHANGE_DECLARE)) {
- LOGGER.info("Declare exchange...");
- if (StringUtils.isNotEmpty(properties.getString(Keys.EXCHANGE_NAME))) {
- LOGGER.info("Declare exchange: " + properties.getString(Keys.EXCHANGE_NAME));
- ch.exchangeDeclare(properties.getString(Keys.EXCHANGE_NAME),
- properties.getString(Keys.EXCHANGE_TYPE),
- properties.getBoolean(Keys.EXCHANGE_DURABLE),
- properties.getBoolean(Keys.EXCHANGE_AUTODELETE), null);
+ try {
+ ch.queueDeclarePassive(properties.getString(Keys.QUEUE_NAME));
+ LOGGER.info("Queue \"{}\" already exist.", properties.getString(Keys.QUEUE_NAME));
+ } catch (Exception ex) {
+ createQueue(ch);
}
}
-
- if (properties.getBoolean(Keys.BIND_STARTUP)) {
- if (StringUtils.isNotEmpty(properties.getString(Keys.QUEUE_NAME)) &&
- StringUtils.isNotEmpty(properties.getString(Keys.EXCHANGE_NAME))) {
- LOGGER.info("Bind exchange and queue with key: " + properties.getString(Keys.BIND_ROUTINGKEY));
- ch.queueBind(properties.getString(Keys.QUEUE_NAME),
- properties.getString(Keys.EXCHANGE_NAME),
- properties.getString(Keys.BIND_ROUTINGKEY));
- }
- }
-
- publishChannel = ch;
- LOGGER.info("Complete to setup channel.");
}
+
+ if (properties.getBoolean(Keys.EXCHANGE_DECLARE)) {
+ LOGGER.info("Declare exchange...");
+ if (StringUtils.isNotEmpty(properties.getString(Keys.EXCHANGE_NAME))) {
+ try {
+ ch.exchangeDeclarePassive(properties.getString(Keys.EXCHANGE_NAME));
+ LOGGER.info("Exchange \"{}\" already exist.", properties.getString(Keys.EXCHANGE_NAME));
+ } catch (Exception ex) {
+ createExchange(ch);
+ }
+ }
+ }
+
+ if (properties.getBoolean(Keys.BIND_STARTUP)) {
+ if (StringUtils.isNotEmpty(properties.getString(Keys.QUEUE_NAME)) &&
+ StringUtils.isNotEmpty(properties.getString(Keys.EXCHANGE_NAME))) {
+ LOGGER.info("Bind exchange and queue with key: {}", properties.getString(Keys.BIND_ROUTINGKEY));
+ ch.queueBind(properties.getString(Keys.QUEUE_NAME),
+ properties.getString(Keys.EXCHANGE_NAME),
+ properties.getString(Keys.BIND_ROUTINGKEY));
+ }
+ }
+
+ publishChannel = ch;
+ LOGGER.info("Complete to setup channel.");
} catch (Exception ex) {
- LOGGER.warn("#bind: " + ex.getClass().getName());
+ LOGGER.warn("Channel has something error. Connection will be disconnected.", ex);
disconnect();
}
}
}
+ public void createQueue(Channel ch) throws IOException {
+ LOGGER.info("Declare queue: {}", properties.getString(Keys.QUEUE_NAME));
+ ch.queueDeclare(properties.getString(Keys.QUEUE_NAME),
+ properties.getBoolean(Keys.QUEUE_DURABLE),
+ properties.getBoolean(Keys.QUEUE_EXCLUSIVE),
+ properties.getBoolean(Keys.QUEUE_AUTODELETE), null);
+ }
+
+ public void createExchange(Channel ch) throws IOException {
+ LOGGER.info("Declare exchange: {}", properties.getString(Keys.EXCHANGE_NAME));
+ ch.exchangeDeclare(properties.getString(Keys.EXCHANGE_NAME),
+ properties.getString(Keys.EXCHANGE_TYPE),
+ properties.getBoolean(Keys.EXCHANGE_DURABLE),
+ properties.getBoolean(Keys.EXCHANGE_AUTODELETE), null);
+ }
+
+ public void bind(Channel ch) throws IOException {
+ LOGGER.info("Bind exchange \"{}\" and queue \"{}\"with key: {}", new Object[]{
+ properties.getString(Keys.QUEUE_NAME),
+ properties.getString(Keys.EXCHANGE_NAME),
+ properties.getString(Keys.BIND_ROUTINGKEY)});
+ ch.queueBind(properties.getString(Keys.QUEUE_NAME),
+ properties.getString(Keys.EXCHANGE_NAME),
+ properties.getString(Keys.BIND_ROUTINGKEY));
+ }
+
public void disconnect() {
LOGGER.info("Disconnecting...");
try {
@@ -128,7 +158,8 @@
connection.close();
}
} catch (Exception ex) {
- LOGGER.warn("#disconnect: " + ex.getClass().getName());
+ LOGGER.warn("Error when close connection." , ex);
+ } finally {
connection = null;
publishChannel = null;
}
@@ -143,15 +174,34 @@
properties.getBasicProperties(),
message.getBytes(CharEncoding.UTF_8));
} catch (Exception ex) {
- LOGGER.warn("#sendMessage: " + ex.getClass().getName());
+ LOGGER.warn("Error when sending meessage.", ex);
}
}
}
@Override
- public void shutdownCompleted(ShutdownSignalException arg0) {
- LOGGER.info("Disconnected.");
- connection = null;
- publishChannel = null;
+ public void shutdownCompleted(ShutdownSignalException exception) {
+ Object obj = exception.getReference();
+
+ if (obj instanceof Channel) {
+ LOGGER.info("Channel closed.");
+ publishChannel = null;
+ if (!exception.isInitiatedByApplication()) {
+ if (connection != null && connection.isOpen()) {
+ try {
+ LOGGER.info("Reopen channel...");
+ publishChannel = connection.createChannel();
+ } catch (Exception ex) {
+ LOGGER.warn("Cannot reopen channel. Connection will be disconnected.", ex);
+ disconnect();
+ }
+ }
+ }
+ } else if (obj instanceof Connection) {
+ LOGGER.info("Connection disconnected.");
+ if (!exception.isInitiatedByApplication()) {
+ connection = null;
+ }
+ }
}
}