Add MessagePublishers to listen ChangeEvent

This patch moves ChangeListener feature from RabbitMQManager to
MessagePublisher. Also moves complicated listener registration
to DefaultMessagePublisher.
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/DefaultMessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/DefaultMessagePublisher.java
new file mode 100644
index 0000000..d5b94dc
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/DefaultMessagePublisher.java
@@ -0,0 +1,80 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq;
+
+import com.google.gerrit.common.ChangeListener;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.server.events.ChangeEvent;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+@Singleton
+public class DefaultMessagePublisher implements ChangeListener, LifecycleListener {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMessagePublisher.class);
+
+  private final static int MONITOR_FIRSTTIME_DELAY = 15000;
+
+  private final Properties properties;
+  private final AMQPSession session;
+  private final Gson gson;
+  private final Timer monitorTimer = new Timer();
+
+  @Inject
+  public DefaultMessagePublisher(
+      final Properties properties,
+      final AMQPSession session,
+      final Gson gson) {
+    this.properties = properties;
+    this.session = session;
+    this.gson = gson;
+  }
+
+  @Override
+  public void start() {
+    LOGGER.info("Start default listener.");
+    session.connect();
+    monitorTimer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        if (!session.isOpen()) {
+          LOGGER.info("#start: try to reconnect");
+          session.connect();
+        }
+      }
+    }, MONITOR_FIRSTTIME_DELAY, properties.getInt(Keys.MONITOR_INTERVAL));
+  }
+
+  @Override
+  public void stop() {
+    LOGGER.info("Stop default listener.");
+    monitorTimer.cancel();
+    session.disconnect();
+  }
+
+  @Override
+  public void onChangeEvent(ChangeEvent event) {
+    if (session.isOpen()) {
+      session.publishMessage(gson.toJson(event));
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/MessagePublisher.java
new file mode 100644
index 0000000..facf174
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/MessagePublisher.java
@@ -0,0 +1,173 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq;
+
+import com.google.gerrit.common.ChangeHooks;
+import com.google.gerrit.common.ChangeListener;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.reviewdb.client.Account;
+import com.google.gerrit.reviewdb.server.ReviewDb;
+import com.google.gerrit.server.CurrentUser;
+import com.google.gerrit.server.IdentifiedUser;
+import com.google.gerrit.server.PluginUser;
+import com.google.gerrit.server.account.AccountResolver;
+import com.google.gerrit.server.events.ChangeEvent;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import com.google.gson.Gson;
+import com.google.gwtorm.server.OrmException;
+import com.google.gwtorm.server.SchemaFactory;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+@Singleton
+public class MessagePublisher implements ChangeListener, LifecycleListener {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(MessagePublisher.class);
+
+  private final static int MONITOR_FIRSTTIME_DELAY = 15000;
+
+  private final Properties properties;
+  private final AMQPSession session;
+  private final ChangeHooks hooks;
+  private final Gson gson;
+  private final WorkQueue workQueue;
+  private final AccountResolver accountResolver;
+  private final IdentifiedUser.GenericFactory userFactory;
+  private final ThreadLocalRequestContext threadLocalRequestContext;
+  private final PluginUser pluginUser;
+  private final SchemaFactory<ReviewDb> schemaFactory;
+  private final Timer monitorTimer = new Timer();
+  private ReviewDb db;
+  private Account userAccount;
+
+  @Inject
+  public MessagePublisher(
+      Properties properties,
+      AMQPSession session,
+      ChangeHooks hooks,
+      Gson gson,
+      WorkQueue workQueue,
+      AccountResolver accountResolver,
+      IdentifiedUser.GenericFactory userFactory,
+      ThreadLocalRequestContext threadLocalRequestContext,
+      PluginUser pluginUser,
+      SchemaFactory<ReviewDb> schemaFactory) {
+    this.properties = properties;
+    this.session = session;
+    this.hooks = hooks;
+    this.gson = gson;
+    this.workQueue = workQueue;
+    this.accountResolver = accountResolver;
+    this.userFactory = userFactory;
+    this.threadLocalRequestContext = threadLocalRequestContext;
+    this.pluginUser = pluginUser;
+    this.schemaFactory = schemaFactory;
+  }
+
+  @Override
+  public void start() {
+    LOGGER.info("Start identified listener.");
+    session.connect();
+    monitorTimer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        if (!session.isOpen()) {
+          LOGGER.info("#start: try to reconnect");
+          session.connect();
+        }
+      }
+    }, MONITOR_FIRSTTIME_DELAY, properties.getInt(Keys.MONITOR_INTERVAL));
+
+    if (properties.hasListenAs()) {
+      final String userName = properties.getListenAs();
+      final ChangeListener changeListener = this;
+      workQueue.getDefaultQueue().submit(new Runnable() {
+        @Override
+        public void run() {
+          RequestContext old = threadLocalRequestContext
+              .setContext(new RequestContext() {
+
+                @Override
+                public CurrentUser getCurrentUser() {
+                  return pluginUser;
+                }
+
+                @Override
+                public Provider<ReviewDb> getReviewDbProvider() {
+                  return new Provider<ReviewDb>() {
+                    @Override
+                    public ReviewDb get() {
+                      if (db == null) {
+                        try {
+                          db = schemaFactory.open();
+                        } catch (OrmException e) {
+                          throw new ProvisionException("Cannot open ReviewDb", e);
+                        }
+                      }
+                      return db;
+                    }
+                  };
+                }
+              });
+          try {
+            userAccount = accountResolver.find(userName);
+            if (userAccount == null) {
+              LOGGER.error("No single user could be found when searching for listenAs: {}", userName);
+              return;
+            }
+
+            IdentifiedUser user = userFactory.create(userAccount.getId());
+            hooks.addChangeListener(changeListener, user);
+            LOGGER.info("Listen events as : {}", userName);
+          } catch (OrmException e) {
+            LOGGER.error("Could not query database for listenAs", e);
+            return;
+          } finally {
+            threadLocalRequestContext.setContext(old);
+            if (db != null) {
+              db.close();
+              db = null;
+            }
+          }
+        }
+      });
+    }
+  }
+
+  @Override
+  public void stop() {
+    LOGGER.info("Start identified listener.");
+    monitorTimer.cancel();
+    session.disconnect();
+    hooks.removeChangeListener(this);
+  }
+
+  @Override
+  public void onChangeEvent(ChangeEvent event) {
+    if (session.isOpen()) {
+      session.publishMessage(gson.toJson(event));
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
index 9626063..c8dadbe 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
@@ -18,26 +18,17 @@
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
 
 class Module extends AbstractModule {
 
-  private final Properties properties;
-
-  @Inject
-  public Module(Properties properties) {
-    this.properties = properties;
-  }
-
   @Override
   protected void configure() {
     bind(AMQPSession.class);
-//    bind(Properties.class);
+    bind(Properties.class);
+    bind(MessagePublisher.class);
+    bind(DefaultMessagePublisher.class);
     bind(RabbitMQManager.class);
-    if (!properties.hasListenAs()) {
-      // No listenAs to filter events against. Register an unrestricted ChangeListener
-      DynamicSet.bind(binder(), ChangeListener.class).to(RabbitMQManager.class);
-    }
     DynamicSet.bind(binder(), LifecycleListener.class).to(RabbitMQManager.class);
+    DynamicSet.bind(binder(), ChangeListener.class).to(DefaultMessagePublisher.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java
index 8e69528..f9c8701 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java
@@ -14,153 +14,46 @@
 
 package com.googlesource.gerrit.plugins.rabbitmq;
 
-import com.google.gerrit.common.ChangeHooks;
-import com.google.gerrit.common.ChangeListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.reviewdb.client.Account;
-import com.google.gerrit.reviewdb.server.ReviewDb;
-import com.google.gerrit.server.CurrentUser;
-import com.google.gerrit.server.IdentifiedUser;
-import com.google.gerrit.server.PluginUser;
-import com.google.gerrit.server.account.AccountResolver;
-import com.google.gerrit.server.events.ChangeEvent;
-import com.google.gerrit.server.git.WorkQueue;
-import com.google.gerrit.server.util.RequestContext;
-import com.google.gerrit.server.util.ThreadLocalRequestContext;
-import com.google.gson.Gson;
-import com.google.gwtorm.server.OrmException;
-import com.google.gwtorm.server.SchemaFactory;
 import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.ProvisionException;
 import com.google.inject.Singleton;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Timer;
-import java.util.TimerTask;
-
 @Singleton
-public class RabbitMQManager implements ChangeListener, LifecycleListener {
+public class RabbitMQManager implements LifecycleListener {
 
-  private static final Logger LOGGER = LoggerFactory
-      .getLogger(RabbitMQManager.class);
-  private final static int MONITOR_FIRSTTIME_DELAY = 15000;
+  private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQManager.class);
+  private final MessagePublisher messagePublisher;
+  private final DefaultMessagePublisher defaultMessagePublisher;
   private final Properties properties;
-  private final AMQPSession session;
-  private final Gson gson = new Gson();
-  private final Timer monitorTimer = new Timer();
-  private final ChangeHooks hooks;
-  private final AccountResolver accountResolver;
-  private final IdentifiedUser.GenericFactory userFactory;
-  private final WorkQueue workQueue;
-  private final ThreadLocalRequestContext threadLocalRequestContext;
-  private final PluginUser pluginUser;
-  private final SchemaFactory<ReviewDb> schemaFactory;
-  private ReviewDb db;
-  private Account userAccount;
 
   @Inject
-  public RabbitMQManager(Properties properties,
-      AMQPSession session,
-      ChangeHooks hooks,
-      AccountResolver accountResolver,
-      IdentifiedUser.GenericFactory userFactory,
-      WorkQueue workQueue,
-      ThreadLocalRequestContext threadLocalRequestContext,
-      PluginUser pluginUser,
-      SchemaFactory<ReviewDb> schemaFactory) {
+  public RabbitMQManager(
+      MessagePublisher messagePublisher,
+      DefaultMessagePublisher defaultMessagePublisher,
+      Properties properties) {
+    this.messagePublisher = messagePublisher;
+    this.defaultMessagePublisher = defaultMessagePublisher;
     this.properties = properties;
-    this.session = session;
-    this.hooks = hooks;
-    this.accountResolver = accountResolver;
-    this.userFactory = userFactory;
-    this.workQueue = workQueue;
-    this.threadLocalRequestContext = threadLocalRequestContext;
-    this.pluginUser = pluginUser;
-    this.schemaFactory = schemaFactory;
   }
 
   @Override
   public void start() {
-    session.connect();
-    monitorTimer.schedule(new TimerTask() {
-      @Override
-      public void run() {
-        if (!session.isOpen()) {
-          LOGGER.info("#start: try to reconnect");
-          session.connect();
-        }
-      }
-    }, MONITOR_FIRSTTIME_DELAY, properties.getInt(Keys.MONITOR_INTERVAL));
-
     if (properties.hasListenAs()) {
-      final String userName = properties.getListenAs();
-      final ChangeListener changeListener = this;
-      workQueue.getDefaultQueue().submit(new Runnable() {
-        @Override
-        public void run() {
-          RequestContext old = threadLocalRequestContext
-              .setContext(new RequestContext() {
-
-                @Override
-                public CurrentUser getCurrentUser() {
-                  return pluginUser;
-                }
-
-                @Override
-                public Provider<ReviewDb> getReviewDbProvider() {
-                  return new Provider<ReviewDb>() {
-                    @Override
-                    public ReviewDb get() {
-                      if (db == null) {
-                        try {
-                          db = schemaFactory.open();
-                        } catch (OrmException e) {
-                          throw new ProvisionException("Cannot open ReviewDb", e);
-                        }
-                      }
-                      return db;
-                    }
-                  };
-                }
-              });
-          try {
-            userAccount = accountResolver.find(userName);
-            if (userAccount == null) {
-              LOGGER.error("No single user could be found when searching for listenAs: {}", userName);
-              return;
-            }
-
-            IdentifiedUser user = userFactory.create(userAccount.getId());
-            hooks.addChangeListener(changeListener, user);
-            LOGGER.info("Listen events as : {}", userName);
-          } catch (OrmException e) {
-            LOGGER.error("Could not query database for listenAs", e);
-            return;
-          } finally {
-            threadLocalRequestContext.setContext(old);
-            if (db != null) {
-              db.close();
-              db = null;
-            }
-          }
-        }
-      });
+      messagePublisher.start();
+    } else {
+      defaultMessagePublisher.start();
     }
   }
 
   @Override
   public void stop() {
-    monitorTimer.cancel();
-    session.disconnect();
-    hooks.removeChangeListener(this);
+    if (properties.hasListenAs()) {
+      messagePublisher.stop();
+    } else {
+      defaultMessagePublisher.stop();
+    }
   }
-
-  @Override
-  public void onChangeEvent(ChangeEvent event) {
-    session.publishMessage(gson.toJson(event));
-  }
-
 }