Remove queue/exchange/bind configuration feature
To dedicate feature to sending/receiveing message, all configration features
regarding queue/exchange/bind in RabbitMQ are removed.
diff --git a/README.md b/README.md
index 034fe39..56ffe4d 100644
--- a/README.md
+++ b/README.md
@@ -62,6 +62,14 @@
hostname = www.foobar.com
```
+If you would like to consume messages in queue, you need to add the below.
+
+```
+[queue]
+ name = queue-for-review
+ consume = true
+```
+
History
---------------------
@@ -81,7 +89,7 @@
* 1.1
* Fix channel handling
* Add property: `monitor.failureCount`
- * Update README and documents
+ * Update README and documents
* 1.0
* First release
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
index ce0f729..d625b84 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
@@ -103,104 +103,15 @@
private void setUp() {
if (connection != null) {
- if (properties.getBoolean(Keys.QUEUE_DECLARE)) {
- LOGGER.info("Declare queue...");
+ if (properties.getBoolean(Keys.QUEUE_CONSUME)) {
if (StringUtils.isNotEmpty(properties.getString(Keys.QUEUE_NAME))) {
- createQueue();
- }
- }
-
- if (properties.getBoolean(Keys.EXCHANGE_DECLARE)) {
- LOGGER.info("Declare exchange...");
- if (StringUtils.isNotEmpty(properties.getString(Keys.EXCHANGE_NAME))) {
- createExchange();
- }
- }
-
- if (properties.getBoolean(Keys.BIND_STARTUP)) {
- if (StringUtils.isNotEmpty(properties.getString(Keys.QUEUE_NAME)) &&
- StringUtils.isNotEmpty(properties.getString(Keys.EXCHANGE_NAME))) {
- bind();
- }
- }
- if (properties.getBoolean(Keys.CONSUME_ENABLED)) {
- if (StringUtils.isNotEmpty(properties.getString(Keys.CONSUME_QUEUE))) {
+ consumeMessage();
}
}
LOGGER.info("Complete to setup channel.");
}
}
- public void createQueue() {
- Channel ch;
- boolean needDeclaration = false;
- try {
- ch = connection.createChannel();
- ch.queueDeclarePassive(properties.getString(Keys.QUEUE_NAME));
- ch.close();
- LOGGER.info("Queue \"{}\" already exist.", properties.getString(Keys.QUEUE_NAME));
- } catch (Exception ex) {
- needDeclaration = true;
- }
-
- if (needDeclaration) {
- LOGGER.info("Declare queue: {}", properties.getString(Keys.QUEUE_NAME));
- try {
- ch = connection.createChannel();
- ch.queueDeclare(properties.getString(Keys.QUEUE_NAME),
- properties.getBoolean(Keys.QUEUE_DURABLE),
- properties.getBoolean(Keys.QUEUE_EXCLUSIVE),
- properties.getBoolean(Keys.QUEUE_AUTODELETE), null);
- ch.close();
- } catch (Exception ex) {
- LOGGER.warn("Failed to declare queue.", ex);
- }
- }
- }
-
- public void createExchange() {
- Channel ch;
- boolean needDeclaration = false;
- try {
- ch = connection.createChannel();
- ch.exchangeDeclarePassive(properties.getString(Keys.EXCHANGE_NAME));
- ch.close();
- LOGGER.info("Exchange \"{}\" already exist.", properties.getString(Keys.EXCHANGE_NAME));
- } catch (Exception ex) {
- needDeclaration = true;
- }
-
- if (needDeclaration) {
- LOGGER.info("Declare exchange: {}", properties.getString(Keys.EXCHANGE_NAME));
- try {
- ch = connection.createChannel();
- ch.exchangeDeclare(properties.getString(Keys.EXCHANGE_NAME),
- properties.getString(Keys.EXCHANGE_TYPE),
- properties.getBoolean(Keys.EXCHANGE_DURABLE),
- properties.getBoolean(Keys.EXCHANGE_AUTODELETE), null);
- ch.close();
- } catch (Exception ex) {
- LOGGER.warn("Failed to declare exchange.", ex);
- }
- }
- }
-
- public void bind() {
- LOGGER.info("Bind exchange \"{}\" and queue \"{}\"with key: {}", new Object[]{
- properties.getString(Keys.QUEUE_NAME),
- properties.getString(Keys.EXCHANGE_NAME),
- properties.getString(Keys.BIND_ROUTINGKEY)});
- try {
- Channel ch = connection.createChannel();
- ch.queueBind(properties.getString(Keys.QUEUE_NAME),
- properties.getString(Keys.EXCHANGE_NAME),
- properties.getString(Keys.BIND_ROUTINGKEY));
- ch.close();
- } catch (Exception ex) {
- LOGGER.warn("Failed to declare binding.", ex);
- }
- }
-
public void disconnect() {
LOGGER.info("Disconnecting...");
try {
@@ -239,7 +150,7 @@
if (consumeChannel != null && consumeChannel.isOpen()) {
try {
consumeChannel.basicConsume(
- properties.getString(Keys.CONSUME_QUEUE),
+ properties.getString(Keys.QUEUE_NAME),
false,
new MessageConsumer(consumeChannel));
LOGGER.debug("Start consuming message.");
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Keys.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Keys.java
index 3d82b6a..bbf62a0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Keys.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Keys.java
@@ -19,22 +19,11 @@
AMQP_USERNAME("amqp.username", null, "guest"),
AMQP_PASSWORD("amqp.password", null, "guest"),
QUEUE_NAME("queue.name", null, "gerrit.events"),
- QUEUE_DECLARE("queue.declare", null, false),
- QUEUE_DURABLE("queue.durable", null, true),
- QUEUE_AUTODELETE("queue.autoDelete", null, false),
- QUEUE_EXCLUSIVE("queue.exclusive", null, false),
+ QUEUE_CONSUME("queue.consume", null, false),
EXCHANGE_NAME("exchange.name", null, "gerrit.publish"),
- EXCHANGE_DECLARE("exchange.declare", null, false),
- EXCHANGE_TYPE("exchange.type", null, "fanout"),
- EXCHANGE_DURABLE("exchange.durable", null, false),
- EXCHANGE_AUTODELETE("exchange.autoDelete", null, false),
- BIND_STARTUP("bind.startup", null, false),
- BIND_ROUTINGKEY("bind.routingKey", null, ""),
MESSAGE_DELIVERY_MODE("message.deliveryMode", null, 1),
MESSAGE_PRIORITY("message.priority", null, 0),
MESSAGE_ROUTINGKEY("message.routingKey", null, ""),
- CONSUME_ENABLED("consume.enabled", null, false),
- CONSUME_QUEUE("consume.queue", null, "gerrit.review"),
GERRIT_NAME("gerrit.name", "gerrit-name", ""),
GERRIT_HOSTNAME("gerrit.hostname", "gerrit-host", ""),
GERRIT_SCHEME("gerrit.scheme", "gerrit-scheme", "ssh"),
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 0ef4228..5fbc123 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -10,19 +10,9 @@
password = guest
[queue]
name = gerrit-queue
- decralation = false
- durable = true
- autoDelete = false
- exclusive = false
+ consume = false
[exchange]
name = exchange-for-gerrit-queue
- declaration = false
- type = fanout
- durable = false
- autoDelete = false
- [bind]
- startUp = false
- routingKey = com.foobar.www.gerrit
[message]
deliveryMode = 1
priority = 0
@@ -49,43 +39,12 @@
* `queue.name`
* The name of queue.
-* `queue.declare`
- * true if you want to declare queue on startup.
-
-* `queue.durable`
- * true if you want to declare a drable queue.
-
-* `queue.autoDelete`
- * true if you want to declare an autodelete queue.
-
-* `queue.exclusive`
- * true if you want to declare an exclusive queue.
+* `queue.consume`
+ * true if consume messages in queue.
* `exchange.name`
* The name of exchange.
-* `exchange.declare`
- * true if you want to declare exchange on startup.
-
-* `exchange.type`
- * The type of exchange. You can specify the following value:
- * "direct"
- * "fanout"
- * "topic"
-
-* `exchange.durable`
- * true if you want to declare a durable exchange.
-
-* `exchange.autoDelete`
- * true if you want to declare an autodelete exchange.
-
-* `bind.startUp`
- * true if you want to bind queue to exchange on startup.
- Also need to specify `queue.name` and `exchange.name`.
-
-* `bind.routingKey`
- * The name of routing key. This is used to bind queue to exchange.
-
* `message.deliveryMode`
* The delivery mode. if not specified, defaults to 1.
* 1 - non-persistent
@@ -134,17 +93,8 @@
|amqp.username | **guest**
|amqp.password | **guest**
|queue.name | **gerrit.events**
-|queue.declare | false
-|queue.durable | true
-|queue.autoDelete | false
-|queue.exclusive | false
+|queue.consume | false
|exchange.name | **gerrit.publish**
-|exchange.declare | false
-|exchange.type | **fanout**
-|exchange.durable | false
-|exchange.autoDelete | false
-|bind.startUp | false
-|bind.routingKey | *Empty*
|message.deliveryMode | 1
|message.priority | 0
|message.routingKey | *Empty*