Fix channel handling
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
index d93563d..713b25a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
@@ -36,6 +36,7 @@
private final Properties properties;
private Connection connection;
private Channel publishChannel;
+ private volatile int failureCount = 0;
@Inject
public AMQPSession(Properties properties) {
@@ -49,6 +50,25 @@
return false;
}
+ private Channel getPublishChannel() {
+ Channel pubCh = null;
+ if (connection != null) {
+ try {
+ pubCh = connection.createChannel();
+ pubCh.addShutdownListener(this);
+ failureCount = 0;
+ } catch (Exception ex) {
+ LOGGER.warn("Failed to open publish channel.");
+ failureCount++;
+ }
+ if (failureCount > properties.getInt(Keys.MONITOR_FAILURECOUNT)) {
+ LOGGER.warn("Connection has something wrong. So will be disconnected.");
+ disconnect();
+ }
+ }
+ return pubCh;
+ }
+
public void connect() {
LOGGER.info("Connect to {}...", properties.getString(Keys.AMQP_URI));
ConnectionFactory factory = new ConnectionFactory();
@@ -76,79 +96,99 @@
}
private void setUp() {
- if (connection != null && publishChannel == null) {
+ if (connection != null) {
+ if (properties.getBoolean(Keys.QUEUE_DECLARE)) {
+ LOGGER.info("Declare queue...");
+ if (StringUtils.isNotEmpty(properties.getString(Keys.QUEUE_NAME))) {
+ createQueue();
+ }
+ }
+
+ if (properties.getBoolean(Keys.EXCHANGE_DECLARE)) {
+ LOGGER.info("Declare exchange...");
+ if (StringUtils.isNotEmpty(properties.getString(Keys.EXCHANGE_NAME))) {
+ createExchange();
+ }
+ }
+
+ if (properties.getBoolean(Keys.BIND_STARTUP)) {
+ if (StringUtils.isNotEmpty(properties.getString(Keys.QUEUE_NAME)) &&
+ StringUtils.isNotEmpty(properties.getString(Keys.EXCHANGE_NAME))) {
+ bind();
+ }
+ }
+ LOGGER.info("Complete to setup channel.");
+ }
+ }
+
+ public void createQueue() {
+ Channel ch;
+ boolean needDeclaration = false;
+ try {
+ ch = connection.createChannel();
+ ch.queueDeclarePassive(properties.getString(Keys.QUEUE_NAME));
+ ch.close();
+ LOGGER.info("Queue \"{}\" already exist.", properties.getString(Keys.QUEUE_NAME));
+ } catch (Exception ex) {
+ needDeclaration = true;
+ }
+
+ if (needDeclaration) {
+ LOGGER.info("Declare queue: {}", properties.getString(Keys.QUEUE_NAME));
try {
- Channel ch;
ch = connection.createChannel();
- LOGGER.info("Open Channel.");
-
- if (properties.getBoolean(Keys.QUEUE_DECLARE)) {
- LOGGER.info("Declare queue...");
- if (StringUtils.isNotEmpty(properties.getString(Keys.QUEUE_NAME))) {
- try {
- ch.queueDeclarePassive(properties.getString(Keys.QUEUE_NAME));
- LOGGER.info("Queue \"{}\" already exist.", properties.getString(Keys.QUEUE_NAME));
- } catch (Exception ex) {
- createQueue(ch);
- }
- }
- }
-
- if (properties.getBoolean(Keys.EXCHANGE_DECLARE)) {
- LOGGER.info("Declare exchange...");
- if (StringUtils.isNotEmpty(properties.getString(Keys.EXCHANGE_NAME))) {
- try {
- ch.exchangeDeclarePassive(properties.getString(Keys.EXCHANGE_NAME));
- LOGGER.info("Exchange \"{}\" already exist.", properties.getString(Keys.EXCHANGE_NAME));
- } catch (Exception ex) {
- createExchange(ch);
- }
- }
- }
-
- if (properties.getBoolean(Keys.BIND_STARTUP)) {
- if (StringUtils.isNotEmpty(properties.getString(Keys.QUEUE_NAME)) &&
- StringUtils.isNotEmpty(properties.getString(Keys.EXCHANGE_NAME))) {
- LOGGER.info("Bind exchange and queue with key: {}", properties.getString(Keys.BIND_ROUTINGKEY));
- ch.queueBind(properties.getString(Keys.QUEUE_NAME),
- properties.getString(Keys.EXCHANGE_NAME),
- properties.getString(Keys.BIND_ROUTINGKEY));
- }
- }
-
- publishChannel = ch;
- LOGGER.info("Complete to setup channel.");
+ ch.queueDeclare(properties.getString(Keys.QUEUE_NAME),
+ properties.getBoolean(Keys.QUEUE_DURABLE),
+ properties.getBoolean(Keys.QUEUE_EXCLUSIVE),
+ properties.getBoolean(Keys.QUEUE_AUTODELETE), null);
+ ch.close();
} catch (Exception ex) {
- LOGGER.warn("Channel has something error. Connection will be disconnected.", ex);
- disconnect();
+ LOGGER.warn("Failed to declare queue.", ex);
}
}
}
- public void createQueue(Channel ch) throws IOException {
- LOGGER.info("Declare queue: {}", properties.getString(Keys.QUEUE_NAME));
- ch.queueDeclare(properties.getString(Keys.QUEUE_NAME),
- properties.getBoolean(Keys.QUEUE_DURABLE),
- properties.getBoolean(Keys.QUEUE_EXCLUSIVE),
- properties.getBoolean(Keys.QUEUE_AUTODELETE), null);
+ public void createExchange() {
+ Channel ch;
+ boolean needDeclaration = false;
+ try {
+ ch = connection.createChannel();
+ ch.exchangeDeclarePassive(properties.getString(Keys.EXCHANGE_NAME));
+ ch.close();
+ LOGGER.info("Exchange \"{}\" already exist.", properties.getString(Keys.EXCHANGE_NAME));
+ } catch (Exception ex) {
+ needDeclaration = true;
+ }
+
+ if (needDeclaration) {
+ LOGGER.info("Declare exchange: {}", properties.getString(Keys.EXCHANGE_NAME));
+ try {
+ ch = connection.createChannel();
+ ch.exchangeDeclare(properties.getString(Keys.EXCHANGE_NAME),
+ properties.getString(Keys.EXCHANGE_TYPE),
+ properties.getBoolean(Keys.EXCHANGE_DURABLE),
+ properties.getBoolean(Keys.EXCHANGE_AUTODELETE), null);
+ ch.close();
+ } catch (Exception ex) {
+ LOGGER.warn("Failed to declare exchange.", ex);
+ }
+ }
}
- public void createExchange(Channel ch) throws IOException {
- LOGGER.info("Declare exchange: {}", properties.getString(Keys.EXCHANGE_NAME));
- ch.exchangeDeclare(properties.getString(Keys.EXCHANGE_NAME),
- properties.getString(Keys.EXCHANGE_TYPE),
- properties.getBoolean(Keys.EXCHANGE_DURABLE),
- properties.getBoolean(Keys.EXCHANGE_AUTODELETE), null);
- }
-
- public void bind(Channel ch) throws IOException {
+ public void bind() {
LOGGER.info("Bind exchange \"{}\" and queue \"{}\"with key: {}", new Object[]{
properties.getString(Keys.QUEUE_NAME),
properties.getString(Keys.EXCHANGE_NAME),
properties.getString(Keys.BIND_ROUTINGKEY)});
- ch.queueBind(properties.getString(Keys.QUEUE_NAME),
- properties.getString(Keys.EXCHANGE_NAME),
- properties.getString(Keys.BIND_ROUTINGKEY));
+ try {
+ Channel ch = connection.createChannel();
+ ch.queueBind(properties.getString(Keys.QUEUE_NAME),
+ properties.getString(Keys.EXCHANGE_NAME),
+ properties.getString(Keys.BIND_ROUTINGKEY));
+ ch.close();
+ } catch (Exception ex) {
+ LOGGER.warn("Failed to declare binding.", ex);
+ }
}
public void disconnect() {
@@ -166,6 +206,9 @@
}
public void publishMessage(String message) {
+ if (publishChannel == null) {
+ publishChannel = getPublishChannel();
+ }
if (publishChannel != null && publishChannel.isOpen()) {
try {
LOGGER.debug("Send message.");
@@ -186,22 +229,9 @@
if (obj instanceof Channel) {
LOGGER.info("Channel closed.");
publishChannel = null;
- if (!exception.isInitiatedByApplication()) {
- if (connection != null && connection.isOpen()) {
- try {
- LOGGER.info("Reopen channel...");
- publishChannel = connection.createChannel();
- } catch (Exception ex) {
- LOGGER.warn("Cannot reopen channel. Connection will be disconnected.", ex);
- disconnect();
- }
- }
- }
} else if (obj instanceof Connection) {
LOGGER.info("Connection disconnected.");
- if (!exception.isInitiatedByApplication()) {
- connection = null;
- }
+ connection = null;
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Keys.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Keys.java
index 43eb3b6..7382a14 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Keys.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Keys.java
@@ -39,7 +39,8 @@
GERRIT_PORT("gerrit.port", "gerrit-port", 29418),
GERRIT_FRONT_URL("gerrit.canonicalWebUrl", "gerrit-front-url", ""),
GERRIT_VERSION("gerrit.version", "gerrit-version", null),
- MONITOR_INTERVAL("monitor.interval", null, 15000);
+ MONITOR_INTERVAL("monitor.interval", null, 15000),
+ MONITOR_FAILURECOUNT("monitor.failureCount", null, 15);
public String section;
public String name;
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 35b94f1..0ef4228 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -34,6 +34,7 @@
port = 24918
[monitor]
interval = 15000
+ failureCount = 15
```
* `amqp.uri`
@@ -116,6 +117,10 @@
* The interval time in milliseconds for connection monitor.
You can specify the value more than 5000.
+* `monitor.failureCount`
+ * The count of failure. If the command for publishing message failed in the specified number of times
+ in succession, connection will be renewed.
+
Default Values
-----------------
@@ -148,3 +153,4 @@
|gerrit.scheme | **ssh**
|gerrit.port | 29418
|monitor.interval | 15000
+|monitor.failureCount | 15