Add MessagePublishers to listen ChangeEvent
This patch moves ChangeListener feature from RabbitMQManager to
MessagePublisher. Also moves complicated listener registration
to DefaultMessagePublisher.
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/DefaultMessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/DefaultMessagePublisher.java
new file mode 100644
index 0000000..d5b94dc
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/DefaultMessagePublisher.java
@@ -0,0 +1,80 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq;
+
+import com.google.gerrit.common.ChangeListener;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.server.events.ChangeEvent;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+@Singleton
+public class DefaultMessagePublisher implements ChangeListener, LifecycleListener {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMessagePublisher.class);
+
+ private final static int MONITOR_FIRSTTIME_DELAY = 15000;
+
+ private final Properties properties;
+ private final AMQPSession session;
+ private final Gson gson;
+ private final Timer monitorTimer = new Timer();
+
+ @Inject
+ public DefaultMessagePublisher(
+ final Properties properties,
+ final AMQPSession session,
+ final Gson gson) {
+ this.properties = properties;
+ this.session = session;
+ this.gson = gson;
+ }
+
+ @Override
+ public void start() {
+ LOGGER.info("Start default listener.");
+ session.connect();
+ monitorTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ if (!session.isOpen()) {
+ LOGGER.info("#start: try to reconnect");
+ session.connect();
+ }
+ }
+ }, MONITOR_FIRSTTIME_DELAY, properties.getInt(Keys.MONITOR_INTERVAL));
+ }
+
+ @Override
+ public void stop() {
+ LOGGER.info("Stop default listener.");
+ monitorTimer.cancel();
+ session.disconnect();
+ }
+
+ @Override
+ public void onChangeEvent(ChangeEvent event) {
+ if (session.isOpen()) {
+ session.publishMessage(gson.toJson(event));
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/MessagePublisher.java
new file mode 100644
index 0000000..facf174
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/MessagePublisher.java
@@ -0,0 +1,173 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq;
+
+import com.google.gerrit.common.ChangeHooks;
+import com.google.gerrit.common.ChangeListener;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.reviewdb.client.Account;
+import com.google.gerrit.reviewdb.server.ReviewDb;
+import com.google.gerrit.server.CurrentUser;
+import com.google.gerrit.server.IdentifiedUser;
+import com.google.gerrit.server.PluginUser;
+import com.google.gerrit.server.account.AccountResolver;
+import com.google.gerrit.server.events.ChangeEvent;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import com.google.gson.Gson;
+import com.google.gwtorm.server.OrmException;
+import com.google.gwtorm.server.SchemaFactory;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+@Singleton
+public class MessagePublisher implements ChangeListener, LifecycleListener {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MessagePublisher.class);
+
+ private final static int MONITOR_FIRSTTIME_DELAY = 15000;
+
+ private final Properties properties;
+ private final AMQPSession session;
+ private final ChangeHooks hooks;
+ private final Gson gson;
+ private final WorkQueue workQueue;
+ private final AccountResolver accountResolver;
+ private final IdentifiedUser.GenericFactory userFactory;
+ private final ThreadLocalRequestContext threadLocalRequestContext;
+ private final PluginUser pluginUser;
+ private final SchemaFactory<ReviewDb> schemaFactory;
+ private final Timer monitorTimer = new Timer();
+ private ReviewDb db;
+ private Account userAccount;
+
+ @Inject
+ public MessagePublisher(
+ Properties properties,
+ AMQPSession session,
+ ChangeHooks hooks,
+ Gson gson,
+ WorkQueue workQueue,
+ AccountResolver accountResolver,
+ IdentifiedUser.GenericFactory userFactory,
+ ThreadLocalRequestContext threadLocalRequestContext,
+ PluginUser pluginUser,
+ SchemaFactory<ReviewDb> schemaFactory) {
+ this.properties = properties;
+ this.session = session;
+ this.hooks = hooks;
+ this.gson = gson;
+ this.workQueue = workQueue;
+ this.accountResolver = accountResolver;
+ this.userFactory = userFactory;
+ this.threadLocalRequestContext = threadLocalRequestContext;
+ this.pluginUser = pluginUser;
+ this.schemaFactory = schemaFactory;
+ }
+
+ @Override
+ public void start() {
+ LOGGER.info("Start identified listener.");
+ session.connect();
+ monitorTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ if (!session.isOpen()) {
+ LOGGER.info("#start: try to reconnect");
+ session.connect();
+ }
+ }
+ }, MONITOR_FIRSTTIME_DELAY, properties.getInt(Keys.MONITOR_INTERVAL));
+
+ if (properties.hasListenAs()) {
+ final String userName = properties.getListenAs();
+ final ChangeListener changeListener = this;
+ workQueue.getDefaultQueue().submit(new Runnable() {
+ @Override
+ public void run() {
+ RequestContext old = threadLocalRequestContext
+ .setContext(new RequestContext() {
+
+ @Override
+ public CurrentUser getCurrentUser() {
+ return pluginUser;
+ }
+
+ @Override
+ public Provider<ReviewDb> getReviewDbProvider() {
+ return new Provider<ReviewDb>() {
+ @Override
+ public ReviewDb get() {
+ if (db == null) {
+ try {
+ db = schemaFactory.open();
+ } catch (OrmException e) {
+ throw new ProvisionException("Cannot open ReviewDb", e);
+ }
+ }
+ return db;
+ }
+ };
+ }
+ });
+ try {
+ userAccount = accountResolver.find(userName);
+ if (userAccount == null) {
+ LOGGER.error("No single user could be found when searching for listenAs: {}", userName);
+ return;
+ }
+
+ IdentifiedUser user = userFactory.create(userAccount.getId());
+ hooks.addChangeListener(changeListener, user);
+ LOGGER.info("Listen events as : {}", userName);
+ } catch (OrmException e) {
+ LOGGER.error("Could not query database for listenAs", e);
+ return;
+ } finally {
+ threadLocalRequestContext.setContext(old);
+ if (db != null) {
+ db.close();
+ db = null;
+ }
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public void stop() {
+ LOGGER.info("Start identified listener.");
+ monitorTimer.cancel();
+ session.disconnect();
+ hooks.removeChangeListener(this);
+ }
+
+ @Override
+ public void onChangeEvent(ChangeEvent event) {
+ if (session.isOpen()) {
+ session.publishMessage(gson.toJson(event));
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
index 9626063..c8dadbe 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
@@ -18,26 +18,17 @@
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
class Module extends AbstractModule {
- private final Properties properties;
-
- @Inject
- public Module(Properties properties) {
- this.properties = properties;
- }
-
@Override
protected void configure() {
bind(AMQPSession.class);
-// bind(Properties.class);
+ bind(Properties.class);
+ bind(MessagePublisher.class);
+ bind(DefaultMessagePublisher.class);
bind(RabbitMQManager.class);
- if (!properties.hasListenAs()) {
- // No listenAs to filter events against. Register an unrestricted ChangeListener
- DynamicSet.bind(binder(), ChangeListener.class).to(RabbitMQManager.class);
- }
DynamicSet.bind(binder(), LifecycleListener.class).to(RabbitMQManager.class);
+ DynamicSet.bind(binder(), ChangeListener.class).to(DefaultMessagePublisher.class);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java
index 8e69528..f9c8701 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java
@@ -14,153 +14,46 @@
package com.googlesource.gerrit.plugins.rabbitmq;
-import com.google.gerrit.common.ChangeHooks;
-import com.google.gerrit.common.ChangeListener;
import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.reviewdb.client.Account;
-import com.google.gerrit.reviewdb.server.ReviewDb;
-import com.google.gerrit.server.CurrentUser;
-import com.google.gerrit.server.IdentifiedUser;
-import com.google.gerrit.server.PluginUser;
-import com.google.gerrit.server.account.AccountResolver;
-import com.google.gerrit.server.events.ChangeEvent;
-import com.google.gerrit.server.git.WorkQueue;
-import com.google.gerrit.server.util.RequestContext;
-import com.google.gerrit.server.util.ThreadLocalRequestContext;
-import com.google.gson.Gson;
-import com.google.gwtorm.server.OrmException;
-import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.ProvisionException;
import com.google.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Timer;
-import java.util.TimerTask;
-
@Singleton
-public class RabbitMQManager implements ChangeListener, LifecycleListener {
+public class RabbitMQManager implements LifecycleListener {
- private static final Logger LOGGER = LoggerFactory
- .getLogger(RabbitMQManager.class);
- private final static int MONITOR_FIRSTTIME_DELAY = 15000;
+ private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQManager.class);
+ private final MessagePublisher messagePublisher;
+ private final DefaultMessagePublisher defaultMessagePublisher;
private final Properties properties;
- private final AMQPSession session;
- private final Gson gson = new Gson();
- private final Timer monitorTimer = new Timer();
- private final ChangeHooks hooks;
- private final AccountResolver accountResolver;
- private final IdentifiedUser.GenericFactory userFactory;
- private final WorkQueue workQueue;
- private final ThreadLocalRequestContext threadLocalRequestContext;
- private final PluginUser pluginUser;
- private final SchemaFactory<ReviewDb> schemaFactory;
- private ReviewDb db;
- private Account userAccount;
@Inject
- public RabbitMQManager(Properties properties,
- AMQPSession session,
- ChangeHooks hooks,
- AccountResolver accountResolver,
- IdentifiedUser.GenericFactory userFactory,
- WorkQueue workQueue,
- ThreadLocalRequestContext threadLocalRequestContext,
- PluginUser pluginUser,
- SchemaFactory<ReviewDb> schemaFactory) {
+ public RabbitMQManager(
+ MessagePublisher messagePublisher,
+ DefaultMessagePublisher defaultMessagePublisher,
+ Properties properties) {
+ this.messagePublisher = messagePublisher;
+ this.defaultMessagePublisher = defaultMessagePublisher;
this.properties = properties;
- this.session = session;
- this.hooks = hooks;
- this.accountResolver = accountResolver;
- this.userFactory = userFactory;
- this.workQueue = workQueue;
- this.threadLocalRequestContext = threadLocalRequestContext;
- this.pluginUser = pluginUser;
- this.schemaFactory = schemaFactory;
}
@Override
public void start() {
- session.connect();
- monitorTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- if (!session.isOpen()) {
- LOGGER.info("#start: try to reconnect");
- session.connect();
- }
- }
- }, MONITOR_FIRSTTIME_DELAY, properties.getInt(Keys.MONITOR_INTERVAL));
-
if (properties.hasListenAs()) {
- final String userName = properties.getListenAs();
- final ChangeListener changeListener = this;
- workQueue.getDefaultQueue().submit(new Runnable() {
- @Override
- public void run() {
- RequestContext old = threadLocalRequestContext
- .setContext(new RequestContext() {
-
- @Override
- public CurrentUser getCurrentUser() {
- return pluginUser;
- }
-
- @Override
- public Provider<ReviewDb> getReviewDbProvider() {
- return new Provider<ReviewDb>() {
- @Override
- public ReviewDb get() {
- if (db == null) {
- try {
- db = schemaFactory.open();
- } catch (OrmException e) {
- throw new ProvisionException("Cannot open ReviewDb", e);
- }
- }
- return db;
- }
- };
- }
- });
- try {
- userAccount = accountResolver.find(userName);
- if (userAccount == null) {
- LOGGER.error("No single user could be found when searching for listenAs: {}", userName);
- return;
- }
-
- IdentifiedUser user = userFactory.create(userAccount.getId());
- hooks.addChangeListener(changeListener, user);
- LOGGER.info("Listen events as : {}", userName);
- } catch (OrmException e) {
- LOGGER.error("Could not query database for listenAs", e);
- return;
- } finally {
- threadLocalRequestContext.setContext(old);
- if (db != null) {
- db.close();
- db = null;
- }
- }
- }
- });
+ messagePublisher.start();
+ } else {
+ defaultMessagePublisher.start();
}
}
@Override
public void stop() {
- monitorTimer.cancel();
- session.disconnect();
- hooks.removeChangeListener(this);
+ if (properties.hasListenAs()) {
+ messagePublisher.stop();
+ } else {
+ defaultMessagePublisher.stop();
+ }
}
-
- @Override
- public void onChangeEvent(ChangeEvent event) {
- session.publishMessage(gson.toJson(event));
- }
-
}