Prepare to support consume message.
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
index 713b25a..c0215f5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
@@ -15,10 +15,12 @@
 package com.googlesource.gerrit.plugins.rabbitmq;
 
 import com.google.inject.Inject;
-
+import com.rabbitmq.client.AMQP.BasicProperties;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.ShutdownListener;
 import com.rabbitmq.client.ShutdownSignalException;
 
@@ -36,6 +38,7 @@
   private final Properties properties;
   private Connection connection;
   private Channel publishChannel;
+  private Channel consumeChannel;
   private volatile int failureCount = 0;
 
   @Inject
@@ -50,12 +53,12 @@
     return false;
   }
 
-  private Channel getPublishChannel() {
-    Channel pubCh = null;
+  private Channel getChannel() {
+    Channel ch = null;
     if (connection != null) {
       try {
-        pubCh = connection.createChannel();
-        pubCh.addShutdownListener(this);
+        ch = connection.createChannel();
+        ch.addShutdownListener(this);
         failureCount = 0;
       } catch (Exception ex) {
         LOGGER.warn("Failed to open publish channel.");
@@ -66,7 +69,7 @@
         disconnect();
       }
     }
-    return pubCh;
+    return ch;
   }
 
   public void connect() {
@@ -117,6 +120,10 @@
           bind();
         }
       }
+      if (properties.getBoolean(Keys.CONSUME_ENABLED)) {
+        if (StringUtils.isNotEmpty(properties.getString(Keys.CONSUME_QUEUE))) {
+        }
+      }
       LOGGER.info("Complete to setup channel.");
     }
   }
@@ -206,8 +213,8 @@
   }
 
   public void publishMessage(String message) {
-    if (publishChannel == null) {
-      publishChannel = getPublishChannel();
+    if (publishChannel == null || !publishChannel.isOpen()) {
+      publishChannel = getChannel();
     }
     if (publishChannel != null && publishChannel.isOpen()) {
       try {
@@ -222,16 +229,66 @@
     }
   }
 
+  public void consumeMessage() {
+    if (consumeChannel == null || !consumeChannel.isOpen()) {
+      consumeChannel = getChannel();
+    }
+    if (consumeChannel != null && consumeChannel.isOpen()) {
+      try {
+        consumeChannel.basicConsume(
+            properties.getString(Keys.CONSUME_QUEUE),
+            false,
+            new MessageConsumer(consumeChannel));
+        LOGGER.debug("Start consuming message.");
+      } catch (Exception ex) {
+        LOGGER.warn("Error when consuming message.", ex);
+      }
+    }
+  }
+
   @Override
   public void shutdownCompleted(ShutdownSignalException exception) {
     Object obj = exception.getReference();
 
     if (obj instanceof Channel) {
-      LOGGER.info("Channel closed.");
-      publishChannel = null;
+      Channel ch = (Channel) obj;
+      if (ch.equals(publishChannel)) {
+        LOGGER.info("Publish channel closed.");
+        publishChannel = null;
+      } else if (ch.equals(consumeChannel)) {
+        LOGGER.info("Consume channel closed.");
+        consumeChannel = null;
+      }
     } else if (obj instanceof Connection) {
       LOGGER.info("Connection disconnected.");
       connection = null;
     }
   }
+
+  public class MessageConsumer extends DefaultConsumer {
+
+    public MessageConsumer(Channel channel) {
+        super(channel);
+    }
+
+    @Override
+    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties props, byte[] body)
+            throws IOException {
+      try {
+        long deliveryTag = envelope.getDeliveryTag();
+
+        if (Properties.APPROVE_APPID.equals(props.getAppId()) &&
+            Properties.CONTENT_TYPE_JSON.equals(props.getContentType())) {
+          // TODO: Get message then input as review. required 2.9 or later.
+        }
+
+        getChannel().basicAck(deliveryTag, false);
+
+      } catch (IOException ex) {
+        throw ex;
+      } catch (RuntimeException ex) {
+        LOGGER.warn("caught exception in delivery handler", ex);
+      }
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Properties.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Properties.java
index 14fd285..f87fbd9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Properties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Properties.java
@@ -37,10 +37,13 @@
 
 public class Properties {
 
+  // TODO: Value will be replaced to "gerrit.event".
+  public final static String EVENT_APPID = "gerrit";
+  public final static String APPROVE_APPID = "gerrit.approve";
+  public final static String CONTENT_TYPE_JSON = "application/json";
+
   private static final Logger LOGGER = LoggerFactory.getLogger(Properties.class);
   private final static String CONFIG_FILENAME = "rabbitmq.config";
-  private final static String GERRIT = "gerrit";
-  private final static String CONTENT_TYPE_JSON = "application/json";
 
   private final static int MINIMUM_CONNECTION_MONITOR_INTERVAL = 5000;
 
@@ -120,7 +123,7 @@
       headers.put(Keys.GERRIT_VERSION.key, getGerritVersion());
 
       AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
-      builder.appId(GERRIT);
+      builder.appId(EVENT_APPID);
       builder.contentEncoding(CharEncoding.UTF_8);
       builder.contentType(CONTENT_TYPE_JSON);
       builder.deliveryMode(getInt(Keys.MESSAGE_DELIVERY_MODE));