Prepare to support consume message.
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
index 713b25a..c0215f5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
@@ -15,10 +15,12 @@
package com.googlesource.gerrit.plugins.rabbitmq;
import com.google.inject.Inject;
-
+import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
@@ -36,6 +38,7 @@
private final Properties properties;
private Connection connection;
private Channel publishChannel;
+ private Channel consumeChannel;
private volatile int failureCount = 0;
@Inject
@@ -50,12 +53,12 @@
return false;
}
- private Channel getPublishChannel() {
- Channel pubCh = null;
+ private Channel getChannel() {
+ Channel ch = null;
if (connection != null) {
try {
- pubCh = connection.createChannel();
- pubCh.addShutdownListener(this);
+ ch = connection.createChannel();
+ ch.addShutdownListener(this);
failureCount = 0;
} catch (Exception ex) {
LOGGER.warn("Failed to open publish channel.");
@@ -66,7 +69,7 @@
disconnect();
}
}
- return pubCh;
+ return ch;
}
public void connect() {
@@ -117,6 +120,10 @@
bind();
}
}
+ if (properties.getBoolean(Keys.CONSUME_ENABLED)) {
+ if (StringUtils.isNotEmpty(properties.getString(Keys.CONSUME_QUEUE))) {
+ }
+ }
LOGGER.info("Complete to setup channel.");
}
}
@@ -206,8 +213,8 @@
}
public void publishMessage(String message) {
- if (publishChannel == null) {
- publishChannel = getPublishChannel();
+ if (publishChannel == null || !publishChannel.isOpen()) {
+ publishChannel = getChannel();
}
if (publishChannel != null && publishChannel.isOpen()) {
try {
@@ -222,16 +229,66 @@
}
}
+ public void consumeMessage() {
+ if (consumeChannel == null || !consumeChannel.isOpen()) {
+ consumeChannel = getChannel();
+ }
+ if (consumeChannel != null && consumeChannel.isOpen()) {
+ try {
+ consumeChannel.basicConsume(
+ properties.getString(Keys.CONSUME_QUEUE),
+ false,
+ new MessageConsumer(consumeChannel));
+ LOGGER.debug("Start consuming message.");
+ } catch (Exception ex) {
+ LOGGER.warn("Error when consuming message.", ex);
+ }
+ }
+ }
+
@Override
public void shutdownCompleted(ShutdownSignalException exception) {
Object obj = exception.getReference();
if (obj instanceof Channel) {
- LOGGER.info("Channel closed.");
- publishChannel = null;
+ Channel ch = (Channel) obj;
+ if (ch.equals(publishChannel)) {
+ LOGGER.info("Publish channel closed.");
+ publishChannel = null;
+ } else if (ch.equals(consumeChannel)) {
+ LOGGER.info("Consume channel closed.");
+ consumeChannel = null;
+ }
} else if (obj instanceof Connection) {
LOGGER.info("Connection disconnected.");
connection = null;
}
}
+
+ public class MessageConsumer extends DefaultConsumer {
+
+ public MessageConsumer(Channel channel) {
+ super(channel);
+ }
+
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties props, byte[] body)
+ throws IOException {
+ try {
+ long deliveryTag = envelope.getDeliveryTag();
+
+ if (Properties.APPROVE_APPID.equals(props.getAppId()) &&
+ Properties.CONTENT_TYPE_JSON.equals(props.getContentType())) {
+ // TODO: Get message then input as review. required 2.9 or later.
+ }
+
+ getChannel().basicAck(deliveryTag, false);
+
+ } catch (IOException ex) {
+ throw ex;
+ } catch (RuntimeException ex) {
+ LOGGER.warn("caught exception in delivery handler", ex);
+ }
+ }
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Properties.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Properties.java
index 14fd285..f87fbd9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Properties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Properties.java
@@ -37,10 +37,13 @@
public class Properties {
+ // TODO: Value will be replaced to "gerrit.event".
+ public final static String EVENT_APPID = "gerrit";
+ public final static String APPROVE_APPID = "gerrit.approve";
+ public final static String CONTENT_TYPE_JSON = "application/json";
+
private static final Logger LOGGER = LoggerFactory.getLogger(Properties.class);
private final static String CONFIG_FILENAME = "rabbitmq.config";
- private final static String GERRIT = "gerrit";
- private final static String CONTENT_TYPE_JSON = "application/json";
private final static int MINIMUM_CONNECTION_MONITOR_INTERVAL = 5000;
@@ -120,7 +123,7 @@
headers.put(Keys.GERRIT_VERSION.key, getGerritVersion());
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
- builder.appId(GERRIT);
+ builder.appId(EVENT_APPID);
builder.contentEncoding(CharEncoding.UTF_8);
builder.contentType(CONTENT_TYPE_JSON);
builder.deliveryMode(getInt(Keys.MESSAGE_DELIVERY_MODE));