Cleanup codes
Package structure is dyamically changed.
* Decoupled interface from implementation
* More beanize
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
index 378cf75..e68f53b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
@@ -20,30 +20,33 @@
import com.google.inject.AbstractModule;
import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.googlesource.gerrit.plugins.rabbitmq.config.AMQProperties;
-import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
-import com.googlesource.gerrit.plugins.rabbitmq.message.DefaultMessagePublisher;
+import com.googlesource.gerrit.plugins.rabbitmq.message.DefaultChangeListener;
+import com.googlesource.gerrit.plugins.rabbitmq.message.IdentifiedChangeListener;
import com.googlesource.gerrit.plugins.rabbitmq.message.MessagePublisher;
-import com.googlesource.gerrit.plugins.rabbitmq.session.AMQPSession;
+import com.googlesource.gerrit.plugins.rabbitmq.message.Publisher;
+import com.googlesource.gerrit.plugins.rabbitmq.message.PublisherFactory;
+import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
+import com.googlesource.gerrit.plugins.rabbitmq.session.SessionFactory;
+import com.googlesource.gerrit.plugins.rabbitmq.session.impl.AMQPSession;
import com.googlesource.gerrit.plugins.rabbitmq.solver.BCSolver;
+import com.googlesource.gerrit.plugins.rabbitmq.solver.Solver;
+import com.googlesource.gerrit.plugins.rabbitmq.solver.SolverFactory;
class Module extends AbstractModule {
@Override
protected void configure() {
- install(new FactoryModuleBuilder().build(Properties.Factory.class));
- install(new FactoryModuleBuilder().build(AMQProperties.Factory.class));
- install(new FactoryModuleBuilder().build(AMQPSession.Factory.class));
- install(new FactoryModuleBuilder().build(MessagePublisher.Factory.class));
bind(PropertiesStore.class);
bind(BCSolver.class);
-// bind(AMQPSession.class);
-// bind(Properties.class);
-// bind(MessagePublisher.class);
- bind(DefaultMessagePublisher.class);
+ bind(IdentifiedChangeListener.class);
bind(RabbitMQManager.class);
+
+ install(new FactoryModuleBuilder().implement(Solver.class, BCSolver.class).build(SolverFactory.class));
+ install(new FactoryModuleBuilder().implement(Session.class, AMQPSession.class).build(SessionFactory.class));
+ install(new FactoryModuleBuilder().implement(Publisher.class, MessagePublisher.class).build(PublisherFactory.class));
+
DynamicSet.bind(binder(), LifecycleListener.class).to(RabbitMQManager.class);
- DynamicSet.bind(binder(), LifecycleListener.class).to(DefaultMessagePublisher.class);
- DynamicSet.bind(binder(), ChangeListener.class).to(DefaultMessagePublisher.class);
+ DynamicSet.bind(binder(), LifecycleListener.class).to(DefaultChangeListener.class);
+ DynamicSet.bind(binder(), ChangeListener.class).to(DefaultChangeListener.class);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/PropertiesStore.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/PropertiesStore.java
index 0073a95..ff77481 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/PropertiesStore.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/PropertiesStore.java
@@ -16,11 +16,14 @@
import com.google.gerrit.extensions.annotations.PluginData;
import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.GerritServerConfig;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
+import com.googlesource.gerrit.plugins.rabbitmq.config.internal.GerritFrontUrl;
+import org.eclipse.jgit.lib.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,29 +48,29 @@
private final List<Properties> propertiesStore;
private final String pluginName;
private final Path pluginDataDir;
- private final Properties.Factory propFactory;
+ private final Config serverConfig;
@Inject
public PropertiesStore(
@PluginName final String pluginName,
@PluginData final File pluginData,
- final Properties.Factory propFactory
- ) {
+ @GerritServerConfig final Config serverConfig) {
this.propertiesStore = new ArrayList<>();
this.pluginName = pluginName;
this.pluginDataDir = pluginData.toPath();
- this.propFactory = propFactory;
+ this.serverConfig = serverConfig;
}
public void load() {
// Load base
- Properties base = propFactory.create(pluginDataDir.resolve(pluginName + FILE_EXT));
+ Properties base = new Properties(pluginDataDir.resolve(pluginName + FILE_EXT));
base.load();
+ ((GerritFrontUrl)base).setGerritFrontUrlFromConfig(serverConfig);
// Load site
try (DirectoryStream<Path> ds = Files.newDirectoryStream(pluginDataDir.resolve(SITE_DIR), "*" + FILE_EXT)) {
for (Path configFile : ds) {
- Properties site = propFactory.create(configFile);
+ Properties site = new Properties(configFile);
if (site.load(base)) {
propertiesStore.add(site);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java
index 195c5ef..95aaf60 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java
@@ -19,10 +19,16 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
-import com.googlesource.gerrit.plugins.rabbitmq.message.DefaultMessagePublisher;
+import com.googlesource.gerrit.plugins.rabbitmq.message.DefaultChangeListener;
+import com.googlesource.gerrit.plugins.rabbitmq.message.IdentifiedChangeListener;
import com.googlesource.gerrit.plugins.rabbitmq.message.MessagePublisher;
-import com.googlesource.gerrit.plugins.rabbitmq.session.AMQPSession;
+import com.googlesource.gerrit.plugins.rabbitmq.message.Publisher;
+import com.googlesource.gerrit.plugins.rabbitmq.message.PublisherFactory;
+import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
+import com.googlesource.gerrit.plugins.rabbitmq.session.SessionFactory;
import com.googlesource.gerrit.plugins.rabbitmq.solver.BCSolver;
+import com.googlesource.gerrit.plugins.rabbitmq.solver.Solver;
+import com.googlesource.gerrit.plugins.rabbitmq.solver.SolverFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,47 +40,54 @@
public class RabbitMQManager implements LifecycleListener {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQManager.class);
- private final DefaultMessagePublisher defaultMessagePublisher;
- private final MessagePublisher.Factory publisherFactory;
- private final AMQPSession.Factory sessionFactory;
+ private final DefaultChangeListener defaultChangeListener;
+ private final IdentifiedChangeListener identifiedChangeListener;
+ private final PublisherFactory publisherFactory;
private final PropertiesStore propertiesStore;
- private final BCSolver bcSolver;
- private final List<MessagePublisher> publisherList = new ArrayList<>();
+ private final SolverFactory solverFactory;
+ private final List<Publisher> publisherList = new ArrayList<>();
@Inject
public RabbitMQManager(
- DefaultMessagePublisher defaultMessagePublisher,
- MessagePublisher.Factory publisherFactory,
- AMQPSession.Factory sessionFactory,
+ DefaultChangeListener defaultChangeListener,
+ IdentifiedChangeListener identifiedChangeListener,
+ PublisherFactory publisherFactory,
PropertiesStore propertiesStore,
- BCSolver bcSolver) {
- this.defaultMessagePublisher = defaultMessagePublisher;
+ SolverFactory solverFactory) {
+ this.defaultChangeListener = defaultChangeListener;
+ this.identifiedChangeListener = identifiedChangeListener;
this.publisherFactory = publisherFactory;
- this.sessionFactory = sessionFactory;
this.propertiesStore = propertiesStore;
- this.bcSolver = bcSolver;
+ this.solverFactory = solverFactory;
}
@Override
public void start() {
- bcSolver.solve();
+ Solver solver = solverFactory.create();
+ solver.solve();
+
propertiesStore.load();
for (Properties properties : propertiesStore) {
- AMQPSession session = sessionFactory.create(properties);
+ Publisher publisher = publisherFactory.create(properties);
+ publisher.start();
if (properties.hasListenAs()) {
- MessagePublisher publisher = publisherFactory.create(session);
- publisher.start();
- publisherList.add(publisher);
+ identifiedChangeListener.addPublisher(publisher, properties.getListenAs());
} else {
- defaultMessagePublisher.addSession(session);
+ defaultChangeListener.addPublisher(publisher);
}
+ publisherList.add(publisher);
}
}
@Override
public void stop() {
- for (MessagePublisher publisher : publisherList) {
+ for (Publisher publisher : publisherList) {
publisher.stop();
+ if (publisher.getSession().getProperties().hasListenAs()) {
+ identifiedChangeListener.removePublisher(publisher);
+ } else {
+ defaultChangeListener.removePublisher(publisher);
+ }
}
publisherList.clear();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/AMQProperties.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/AMQProperties.java
index b369a19..15adc06 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/AMQProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/AMQProperties.java
@@ -14,9 +14,6 @@
package com.googlesource.gerrit.plugins.rabbitmq.config;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
import com.googlesource.gerrit.plugins.rabbitmq.Keys;
import com.rabbitmq.client.AMQP;
@@ -29,10 +26,6 @@
public class AMQProperties {
- public interface Factory {
- AMQProperties create(Properties properties);
- }
-
public final static String EVENT_APPID = "gerrit";
public final static String CONTENT_TYPE_JSON = "application/json";
@@ -41,8 +34,7 @@
private final Properties properties;
private AMQP.BasicProperties amqpProperties;
- @Inject
- public AMQProperties(@Assisted Properties properties) {
+ public AMQProperties(Properties properties) {
this.properties = properties;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/Properties.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/Properties.java
index 83121b6..1e88689 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/Properties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/Properties.java
@@ -15,11 +15,9 @@
package com.googlesource.gerrit.plugins.rabbitmq.config;
import com.google.gerrit.common.Version;
-import com.google.gerrit.server.config.GerritServerConfig;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
import com.googlesource.gerrit.plugins.rabbitmq.Keys;
+import com.googlesource.gerrit.plugins.rabbitmq.config.internal.GerritFrontUrl;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
@@ -34,37 +32,25 @@
import java.nio.file.Files;
import java.nio.file.Path;
-public class Properties {
-
- public interface Factory {
- Properties create(Path propertiesFile);
- }
+public class Properties implements GerritFrontUrl {
private static final Logger LOGGER = LoggerFactory.getLogger(Properties.class);
private final static int MINIMUM_CONNECTION_MONITOR_INTERVAL = 5000;
private final Path propertiesFile;
- private final Config serverConfig;
- private final AMQProperties.Factory amqFactory;
private Config pluginConfig;
private AMQProperties amqProperties;
- @Inject
- public Properties(
- @GerritServerConfig final Config serverConfig,
- final AMQProperties.Factory amqFactory,
- @Assisted final Path propertiesFile) {
+ public Properties(final Path propertiesFile) {
this.propertiesFile = propertiesFile;
- this.serverConfig = serverConfig;
- this.amqFactory = amqFactory;
}
public boolean load() {
return load(null);
}
- public boolean load(Properties baseProperties) {
+ public boolean load(final Properties baseProperties) {
pluginConfig = new Config();
LOGGER.info("Loading {} ...", propertiesFile);
if (!Files.exists(propertiesFile)) {
@@ -120,7 +106,7 @@
}
public String getGerritFrontUrl() {
- return StringUtils.stripToEmpty(serverConfig.getString(
+ return StringUtils.stripToEmpty(pluginConfig.getString(
Keys.GERRIT_FRONT_URL.section, null, Keys.GERRIT_FRONT_URL.name));
}
@@ -147,8 +133,17 @@
public AMQProperties getAMQProperties() {
if (amqProperties == null) {
- amqProperties = amqFactory.create(this);
+ amqProperties = new AMQProperties(this);
}
return amqProperties;
}
+
+ @Override
+ public void setGerritFrontUrlFromConfig(Config config) {
+ if (pluginConfig != null) {
+ pluginConfig.setString(
+ Keys.GERRIT_FRONT_URL.section, null, Keys.GERRIT_FRONT_URL.name,
+ config.getString(Keys.GERRIT_FRONT_URL.section, null, Keys.GERRIT_FRONT_URL.name));
+ }
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/internal/GerritFrontUrl.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/internal/GerritFrontUrl.java
new file mode 100644
index 0000000..233c156
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/internal/GerritFrontUrl.java
@@ -0,0 +1,7 @@
+package com.googlesource.gerrit.plugins.rabbitmq.config.internal;
+
+import org.eclipse.jgit.lib.Config;
+
+public interface GerritFrontUrl {
+ public void setGerritFrontUrlFromConfig(Config config);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/DefaultChangeListener.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/DefaultChangeListener.java
new file mode 100644
index 0000000..067a6d6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/DefaultChangeListener.java
@@ -0,0 +1,69 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.message;
+
+import com.google.gerrit.common.ChangeListener;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.server.events.ChangeEvent;
+import com.google.inject.Singleton;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+@Singleton
+public class DefaultChangeListener implements ChangeListener, LifecycleListener {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DefaultChangeListener.class);
+
+ private final Set<Publisher> publishers = new CopyOnWriteArraySet<>();
+
+ public void addPublisher(Publisher publisher) {
+ publishers.add(publisher);
+ }
+
+ public void removePublisher(Publisher publisher) {
+ publishers.remove(publisher);
+ }
+
+ public void clear() {
+ publishers.clear();
+ }
+
+ @Override
+ public void start() {
+ LOGGER.info("Start default listener.");
+ for (Publisher publisher : publishers) {
+ publisher.start();
+ }
+ }
+
+ @Override
+ public void stop() {
+ LOGGER.info("Stop default listener.");
+ for (Publisher publisher : publishers) {
+ publisher.stop();
+ }
+ }
+
+ @Override
+ public void onChangeEvent(ChangeEvent event) {
+ for (Publisher publisher : publishers) {
+ publisher.onChangeEvent(event);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/DefaultMessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/DefaultMessagePublisher.java
deleted file mode 100644
index b4e528a..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/DefaultMessagePublisher.java
+++ /dev/null
@@ -1,103 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.rabbitmq.message;
-
-import com.google.gerrit.common.ChangeListener;
-import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.server.events.ChangeEvent;
-import com.google.gson.Gson;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import com.googlesource.gerrit.plugins.rabbitmq.Keys;
-import com.googlesource.gerrit.plugins.rabbitmq.session.AMQPSession;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-@Singleton
-public class DefaultMessagePublisher implements ChangeListener, LifecycleListener {
-
- public static class SessionEntry {
- public AMQPSession session;
- public Timer monitorTimer;
- }
-
- private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMessagePublisher.class);
-
- private final static int MONITOR_FIRSTTIME_DELAY = 15000;
-
- private final Set<SessionEntry> sessionEntries = new CopyOnWriteArraySet<>();
- private final Gson gson;
-
- @Inject
- public DefaultMessagePublisher(
- final Gson gson) {
- this.gson = gson;
- }
-
- private void openSession(final SessionEntry entry) {
- if (!entry.session.isOpen()) {
- entry.session.connect();
- entry.monitorTimer = new Timer();
- entry.monitorTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- if (!entry.session.isOpen()) {
- LOGGER.info("#start: try to reconnect");
- entry.session.connect();
- }
- }
- }, MONITOR_FIRSTTIME_DELAY, entry.session.getProperties().getInt(Keys.MONITOR_INTERVAL));
- }
- }
- public void addSession(final AMQPSession session) {
- SessionEntry entry = new SessionEntry();
- entry.session = session;
- openSession(entry);
- sessionEntries.add(entry);
- }
-
- @Override
- public void start() {
- LOGGER.info("Start default listener.");
- for (SessionEntry entry : sessionEntries) {
- openSession(entry);
- }
- }
-
- @Override
- public void stop() {
- LOGGER.info("Stop default listener.");
- for (SessionEntry entry : sessionEntries) {
- entry.monitorTimer.cancel();
- entry.session.disconnect();
- }
- }
-
- @Override
- public void onChangeEvent(ChangeEvent event) {
- for (SessionEntry entry : sessionEntries) {
- if (entry.session.isOpen()) {
- entry.session.publishMessage(gson.toJson(event));
- }
- }
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/IdentifiedChangeListener.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/IdentifiedChangeListener.java
new file mode 100644
index 0000000..5f5a756
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/IdentifiedChangeListener.java
@@ -0,0 +1,126 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.message;
+
+import com.google.gerrit.common.ChangeHooks;
+import com.google.gerrit.common.ChangeListener;
+import com.google.gerrit.reviewdb.client.Account;
+import com.google.gerrit.reviewdb.server.ReviewDb;
+import com.google.gerrit.server.CurrentUser;
+import com.google.gerrit.server.IdentifiedUser;
+import com.google.gerrit.server.PluginUser;
+import com.google.gerrit.server.account.AccountResolver;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import com.google.gwtorm.server.OrmException;
+import com.google.gwtorm.server.SchemaFactory;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IdentifiedChangeListener {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(IdentifiedChangeListener.class);
+
+ private final ChangeHooks hooks;
+ private final WorkQueue workQueue;
+ private final AccountResolver accountResolver;
+ private final IdentifiedUser.GenericFactory userFactory;
+ private final ThreadLocalRequestContext threadLocalRequestContext;
+ private final PluginUser pluginUser;
+ private final SchemaFactory<ReviewDb> schemaFactory;
+
+ @Inject
+ public IdentifiedChangeListener(
+ ChangeHooks hooks,
+ WorkQueue workQueue,
+ AccountResolver accountResolver,
+ IdentifiedUser.GenericFactory userFactory,
+ ThreadLocalRequestContext threadLocalRequestContext,
+ PluginUser pluginUser,
+ SchemaFactory<ReviewDb> schemaFactory) {
+ this.hooks = hooks;
+ this.workQueue = workQueue;
+ this.accountResolver = accountResolver;
+ this.userFactory = userFactory;
+ this.threadLocalRequestContext = threadLocalRequestContext;
+ this.pluginUser = pluginUser;
+ this.schemaFactory = schemaFactory;
+ }
+
+ public void addPublisher(final Publisher publisher, final String userName) {
+ workQueue.getDefaultQueue().submit(new Runnable() {
+ private ReviewDb db;
+ private Account userAccount;
+
+ @Override
+ public void run() {
+ RequestContext old = threadLocalRequestContext
+ .setContext(new RequestContext() {
+
+ @Override
+ public CurrentUser getCurrentUser() {
+ return pluginUser;
+ }
+
+ @Override
+ public Provider<ReviewDb> getReviewDbProvider() {
+ return new Provider<ReviewDb>() {
+ @Override
+ public ReviewDb get() {
+ if (db == null) {
+ try {
+ db = schemaFactory.open();
+ } catch (OrmException e) {
+ throw new ProvisionException("Cannot open ReviewDb", e);
+ }
+ }
+ return db;
+ }
+ };
+ }
+ });
+ try {
+ userAccount = accountResolver.find(userName);
+ if (userAccount == null) {
+ LOGGER.error("No single user could be found when searching for listenAs: {}", userName);
+ return;
+ }
+
+ IdentifiedUser user = userFactory.create(userAccount.getId());
+ hooks.addChangeListener(publisher, user);
+ LOGGER.info("Listen events as : {}", userName);
+ } catch (OrmException e) {
+ LOGGER.error("Could not query database for listenAs", e);
+ return;
+ } finally {
+ threadLocalRequestContext.setContext(old);
+ if (db != null) {
+ db.close();
+ db = null;
+ }
+ }
+ }
+ });
+ }
+
+ public void removePublisher(Publisher publisher) {
+ hooks.removeChangeListener(publisher);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index 010b5e2..3d34ed7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -14,29 +14,15 @@
package com.googlesource.gerrit.plugins.rabbitmq.message;
-import com.google.gerrit.common.ChangeHooks;
-import com.google.gerrit.common.ChangeListener;
import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.reviewdb.client.Account;
-import com.google.gerrit.reviewdb.server.ReviewDb;
-import com.google.gerrit.server.CurrentUser;
-import com.google.gerrit.server.IdentifiedUser;
-import com.google.gerrit.server.PluginUser;
-import com.google.gerrit.server.account.AccountResolver;
import com.google.gerrit.server.events.ChangeEvent;
-import com.google.gerrit.server.git.WorkQueue;
-import com.google.gerrit.server.util.RequestContext;
-import com.google.gerrit.server.util.ThreadLocalRequestContext;
import com.google.gson.Gson;
-import com.google.gwtorm.server.OrmException;
-import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.ProvisionException;
import com.google.inject.assistedinject.Assisted;
-import com.googlesource.gerrit.plugins.rabbitmq.Keys;
-import com.googlesource.gerrit.plugins.rabbitmq.session.AMQPSession;
+import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
+import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
+import com.googlesource.gerrit.plugins.rabbitmq.session.SessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,133 +30,79 @@
import java.util.Timer;
import java.util.TimerTask;
-public class MessagePublisher implements ChangeListener, LifecycleListener {
-
- public interface Factory {
- MessagePublisher create(AMQPSession session);
- }
+public class MessagePublisher implements Publisher, LifecycleListener {
private static final Logger LOGGER = LoggerFactory.getLogger(MessagePublisher.class);
private final static int MONITOR_FIRSTTIME_DELAY = 15000;
- private final AMQPSession session;
- private final ChangeHooks hooks;
+ private final Session session;
private final Gson gson;
- private final WorkQueue workQueue;
- private final AccountResolver accountResolver;
- private final IdentifiedUser.GenericFactory userFactory;
- private final ThreadLocalRequestContext threadLocalRequestContext;
- private final PluginUser pluginUser;
- private final SchemaFactory<ReviewDb> schemaFactory;
private final Timer monitorTimer = new Timer();
- private ReviewDb db;
- private Account userAccount;
+ private boolean available = true;
@Inject
public MessagePublisher(
- @Assisted AMQPSession session,
- ChangeHooks hooks,
- Gson gson,
- WorkQueue workQueue,
- AccountResolver accountResolver,
- IdentifiedUser.GenericFactory userFactory,
- ThreadLocalRequestContext threadLocalRequestContext,
- PluginUser pluginUser,
- SchemaFactory<ReviewDb> schemaFactory) {
- this.session = session;
- this.hooks = hooks;
+ @Assisted Properties properties,
+ SessionFactory sessionFactory,
+ Gson gson) {
+ this.session = sessionFactory.create(properties);
this.gson = gson;
- this.workQueue = workQueue;
- this.accountResolver = accountResolver;
- this.userFactory = userFactory;
- this.threadLocalRequestContext = threadLocalRequestContext;
- this.pluginUser = pluginUser;
- this.schemaFactory = schemaFactory;
}
@Override
public void start() {
- LOGGER.info("Start identified listener.");
- session.connect();
- monitorTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- if (!session.isOpen()) {
- LOGGER.info("#start: try to reconnect");
- session.connect();
- }
- }
- }, MONITOR_FIRSTTIME_DELAY, session.getProperties().getInt(Keys.MONITOR_INTERVAL));
-
- if (session.getProperties().hasListenAs()) {
- final String userName = session.getProperties().getListenAs();
- final ChangeListener changeListener = this;
- workQueue.getDefaultQueue().submit(new Runnable() {
+ if (!session.isOpen()) {
+ session.connect();
+ monitorTimer.schedule(new TimerTask() {
@Override
public void run() {
- RequestContext old = threadLocalRequestContext
- .setContext(new RequestContext() {
-
- @Override
- public CurrentUser getCurrentUser() {
- return pluginUser;
- }
-
- @Override
- public Provider<ReviewDb> getReviewDbProvider() {
- return new Provider<ReviewDb>() {
- @Override
- public ReviewDb get() {
- if (db == null) {
- try {
- db = schemaFactory.open();
- } catch (OrmException e) {
- throw new ProvisionException("Cannot open ReviewDb", e);
- }
- }
- return db;
- }
- };
- }
- });
- try {
- userAccount = accountResolver.find(userName);
- if (userAccount == null) {
- LOGGER.error("No single user could be found when searching for listenAs: {}", userName);
- return;
- }
-
- IdentifiedUser user = userFactory.create(userAccount.getId());
- hooks.addChangeListener(changeListener, user);
- LOGGER.info("Listen events as : {}", userName);
- } catch (OrmException e) {
- LOGGER.error("Could not query database for listenAs", e);
- return;
- } finally {
- threadLocalRequestContext.setContext(old);
- if (db != null) {
- db.close();
- db = null;
- }
+ if (!session.isOpen()) {
+ LOGGER.info("#start: try to reconnect");
+ session.connect();
}
}
- });
+ }, MONITOR_FIRSTTIME_DELAY, session.getProperties().getConnectionMonitorInterval());
+ available = true;
}
}
@Override
public void stop() {
- LOGGER.info("Start identified listener.");
monitorTimer.cancel();
session.disconnect();
- hooks.removeChangeListener(this);
+ available = false;
}
@Override
public void onChangeEvent(ChangeEvent event) {
- if (session.isOpen()) {
- session.publishMessage(gson.toJson(event));
+ if (available && session.isOpen()) {
+ session.publish(gson.toJson(event));
}
}
+
+ @Override
+ public void enable() {
+ available = true;
+ }
+
+ @Override
+ public void disable() {
+ available = false;
+ }
+
+ @Override
+ public boolean isEnable() {
+ return available;
+ }
+
+ @Override
+ public Session getSession() {
+ return session;
+ }
+
+ @Override
+ public String getName() {
+ return session.getProperties().getName();
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
new file mode 100644
index 0000000..d88e1e0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
@@ -0,0 +1,15 @@
+package com.googlesource.gerrit.plugins.rabbitmq.message;
+
+import com.google.gerrit.common.ChangeListener;
+
+import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
+
+public interface Publisher extends ChangeListener {
+ public void start();
+ public void stop();
+ public void enable();
+ public void disable();
+ public boolean isEnable();
+ public Session getSession();
+ public String getName();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/PublisherFactory.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/PublisherFactory.java
new file mode 100644
index 0000000..225f201
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/PublisherFactory.java
@@ -0,0 +1,21 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq.message;
+
+import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
+
+public interface PublisherFactory {
+ Publisher create(Properties properties);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
new file mode 100644
index 0000000..5bd51ef
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.rabbitmq.session;
+
+import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
+
+public interface Session {
+ public Properties getProperties();
+ public boolean isOpen();
+ public void connect();
+ public void disconnect();
+ public void publish(String message);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/SessionFactory.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/SessionFactory.java
new file mode 100644
index 0000000..12683af
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/SessionFactory.java
@@ -0,0 +1,20 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.rabbitmq.session;
+
+import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
+
+public interface SessionFactory {
+ Session create(Properties properties);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/impl/AMQPSession.java
similarity index 73%
rename from src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/AMQPSession.java
rename to src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/impl/AMQPSession.java
index d6c5900..6d1f6cd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/impl/AMQPSession.java
@@ -12,13 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.rabbitmq.session;
+package com.googlesource.gerrit.plugins.rabbitmq.session.impl;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.googlesource.gerrit.plugins.rabbitmq.Keys;
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
+import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
// import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
@@ -30,17 +31,14 @@
import org.apache.commons.codec.CharEncoding;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.ToStringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URISyntaxException;
-public class AMQPSession implements ShutdownListener {
-
- public interface Factory {
- AMQPSession create(Properties properties);
- }
+public final class AMQPSession implements Session, ShutdownListener {
private static final Logger LOGGER = LoggerFactory.getLogger(AMQPSession.class);
private final Properties properties;
@@ -53,10 +51,16 @@
this.properties = properties;
}
+ private String MSG(String msg) {
+ return String.format("[%s] %s", properties.getName(), msg);
+ }
+
+ @Override
public Properties getProperties() {
return properties;
}
+ @Override
public boolean isOpen() {
if (connection != null) {
return true;
@@ -68,17 +72,17 @@
Channel ch = null;
if (connection == null) {
connect();
- }
- if (connection != null) {
+ } else {
try {
ch = connection.createChannel();
ch.addShutdownListener(this);
failureCount = 0;
+ LOGGER.info(MSG("Publish channel opened."));
} catch (Exception ex) {
- LOGGER.warn("Failed to open publish channel.");
+ LOGGER.warn(MSG("Failed to open publish channel."));
failureCount++;
}
- if (failureCount > properties.getInt(Keys.MONITOR_FAILURECOUNT)) {
+ if (failureCount > properties.getConnectionMonitorInterval()) {
LOGGER.warn("Connection has something wrong. So will be disconnected.");
disconnect();
}
@@ -86,8 +90,13 @@
return ch;
}
+ @Override
public void connect() {
- LOGGER.info("Connect to {}...", properties.getString(Keys.AMQP_URI));
+ if (connection != null && connection.isOpen()) {
+ LOGGER.info(MSG("Already connected."));
+ return;
+ }
+ LOGGER.info(MSG("Connect to {}..."), properties.getString(Keys.AMQP_URI));
ConnectionFactory factory = new ConnectionFactory();
try {
if (StringUtils.isNotEmpty(properties.getString(Keys.AMQP_URI))) {
@@ -100,44 +109,46 @@
}
connection = factory.newConnection();
connection.addShutdownListener(this);
- LOGGER.info("Connection established.");
+ LOGGER.info(MSG("Connection established."));
}
} catch (URISyntaxException ex) {
- LOGGER.error("URI syntax error: {}", properties.getString(Keys.AMQP_URI));
+ LOGGER.error(MSG("URI syntax error: {}"), properties.getString(Keys.AMQP_URI));
} catch (IOException ex) {
- LOGGER.error("Connection cannot be opened.");
+ LOGGER.error(MSG("Connection cannot be opened."));
} catch (Exception ex) {
- LOGGER.warn("Connection has something error. it will be disposed.", ex);
+ LOGGER.warn(MSG("Connection has something error. it will be disposed."), ex);
}
}
+ @Override
public void disconnect() {
- LOGGER.info("Disconnecting...");
+ LOGGER.info(MSG("Disconnecting..."));
try {
if (connection != null) {
connection.close();
}
} catch (Exception ex) {
- LOGGER.warn("Error when close connection." , ex);
+ LOGGER.warn(MSG("Error when close connection.") , ex);
} finally {
connection = null;
channel = null;
}
}
- public void publishMessage(String message) {
+ @Override
+ public void publish(String message) {
if (channel == null || !channel.isOpen()) {
channel = getChannel();
}
if (channel != null && channel.isOpen()) {
try {
- LOGGER.debug("Send message.");
+ LOGGER.debug(MSG("Send message."));
channel.basicPublish(properties.getString(Keys.EXCHANGE_NAME),
properties.getString(Keys.MESSAGE_ROUTINGKEY),
properties.getAMQProperties().getBasicProperties(),
message.getBytes(CharEncoding.UTF_8));
} catch (Exception ex) {
- LOGGER.warn("Error when sending meessage.", ex);
+ LOGGER.warn(MSG("Error when sending meessage."), ex);
}
}
}
@@ -149,11 +160,11 @@
if (obj instanceof Channel) {
Channel ch = (Channel) obj;
if (ch.equals(channel)) {
- LOGGER.info("Publish channel closed.");
+ LOGGER.info(MSG("Publish channel closed."));
channel = null;
}
} else if (obj instanceof Connection) {
- LOGGER.info("Connection disconnected.");
+ LOGGER.info(MSG("Connection disconnected."));
connection = null;
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/solver/BCSolver.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/solver/BCSolver.java
index 281f6e2..8662655 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/solver/BCSolver.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/solver/BCSolver.java
@@ -27,11 +27,11 @@
import java.io.File;
import java.io.IOException;
+import java.nio.file.FileSystemException;
import java.nio.file.Files;
import java.nio.file.Path;
-@Singleton
-public class BCSolver {
+public class BCSolver implements Solver {
private final static String DEFAULT_SITE_NAME = "default";
private static final Logger LOGGER = LoggerFactory.getLogger(BCSolver.class);
@@ -66,8 +66,7 @@
Files.createDirectories(siteDir);
Files.move(oldFile, newFile);
Files.createFile(siteDir.resolve(DEFAULT_SITE_NAME + FILE_EXT));
- } catch (IOException iex) {
- LOGGER.warn("{}", iex);
+ } catch (Exception ex) {
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/solver/Solver.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/solver/Solver.java
new file mode 100644
index 0000000..98d5cf1
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/solver/Solver.java
@@ -0,0 +1,17 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+package com.googlesource.gerrit.plugins.rabbitmq.solver;
+
+public interface Solver {
+ public void solve();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/solver/SolverFactory.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/solver/SolverFactory.java
new file mode 100644
index 0000000..e34f98d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/solver/SolverFactory.java
@@ -0,0 +1,17 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+package com.googlesource.gerrit.plugins.rabbitmq.solver;
+
+public interface SolverFactory {
+ public Solver create();
+}