Roughly support multi config
This is the first implementation for Issue #11.
* New config location: data/rabbitmq/.
* Existing config will be migrated automatically.
* data/rabbitmq/rabbitmq.config is base config.
* Actual session will not be established.
* Actual config should be stored into data/rabbitmq/site/.
* By migration, empty file `default.config` will be created.
This patch might be buggy and log is unkindly.
Especially reconnection and restart have not been tested.
Please feel free to contribute patches :)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
index 4cf2968..7b81f0e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQPSession.java
@@ -16,6 +16,8 @@
import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
// import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
@@ -35,18 +37,25 @@
public class AMQPSession implements ShutdownListener {
+ interface Factory {
+ AMQPSession create(Properties properties);
+ }
+
private static final Logger LOGGER = LoggerFactory.getLogger(AMQPSession.class);
private final Properties properties;
private volatile Connection connection;
- private volatile Channel publishChannel;
- private volatile Channel consumeChannel;
+ private volatile Channel channel;
private volatile int failureCount = 0;
@Inject
- public AMQPSession(Properties properties) {
+ public AMQPSession(@Assisted Properties properties) {
this.properties = properties;
}
+ public Properties getProperties() {
+ return properties;
+ }
+
public boolean isOpen() {
if (connection != null) {
return true;
@@ -92,8 +101,6 @@
connection.addShutdownListener(this);
LOGGER.info("Connection established.");
}
- //TODO: Consume review
- // setupConsumer();
} catch (URISyntaxException ex) {
LOGGER.error("URI syntax error: {}", properties.getString(Keys.AMQP_URI));
} catch (IOException ex) {
@@ -103,20 +110,6 @@
}
}
-//TODO: Consume review
-/*
- private void setupConsumer() {
- if (connection != null) {
- if (properties.getBoolean(Keys.QUEUE_CONSUME)) {
- if (StringUtils.isNotEmpty(properties.getString(Keys.QUEUE_NAME))) {
- consumeMessage();
- }
- }
- LOGGER.info("Complete to setup channel.");
- }
- }
-*/
-
public void disconnect() {
LOGGER.info("Disconnecting...");
try {
@@ -127,20 +120,20 @@
LOGGER.warn("Error when close connection." , ex);
} finally {
connection = null;
- publishChannel = null;
+ channel = null;
}
}
public void publishMessage(String message) {
- if (publishChannel == null || !publishChannel.isOpen()) {
- publishChannel = getChannel();
+ if (channel == null || !channel.isOpen()) {
+ channel = getChannel();
}
- if (publishChannel != null && publishChannel.isOpen()) {
+ if (channel != null && channel.isOpen()) {
try {
LOGGER.debug("Send message.");
- publishChannel.basicPublish(properties.getString(Keys.EXCHANGE_NAME),
+ channel.basicPublish(properties.getString(Keys.EXCHANGE_NAME),
properties.getString(Keys.MESSAGE_ROUTINGKEY),
- properties.getBasicProperties(),
+ properties.getAMQProperties().getBasicProperties(),
message.getBytes(CharEncoding.UTF_8));
} catch (Exception ex) {
LOGGER.warn("Error when sending meessage.", ex);
@@ -148,71 +141,19 @@
}
}
-// TODO: Consume review.
-/*
- public void consumeMessage() {
- if (consumeChannel == null || !consumeChannel.isOpen()) {
- consumeChannel = getChannel();
- }
- if (consumeChannel != null && consumeChannel.isOpen()) {
- try {
- consumeChannel.basicConsume(
- properties.getString(Keys.QUEUE_NAME),
- false,
- new MessageConsumer(consumeChannel));
- LOGGER.debug("Start consuming message.");
- } catch (Exception ex) {
- LOGGER.warn("Error when consuming message.", ex);
- }
- }
- }
-*/
@Override
public void shutdownCompleted(ShutdownSignalException exception) {
Object obj = exception.getReference();
if (obj instanceof Channel) {
Channel ch = (Channel) obj;
- if (ch.equals(publishChannel)) {
+ if (ch.equals(channel)) {
LOGGER.info("Publish channel closed.");
- publishChannel = null;
- } else if (ch.equals(consumeChannel)) {
- LOGGER.info("Consume channel closed.");
- consumeChannel = null;
+ channel = null;
}
} else if (obj instanceof Connection) {
LOGGER.info("Connection disconnected.");
connection = null;
}
}
-
-// TODO: Consume review.
-/*
- public class MessageConsumer extends DefaultConsumer {
-
- public MessageConsumer(Channel channel) {
- super(channel);
- }
-
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties props, byte[] body)
- throws IOException {
- try {
- long deliveryTag = envelope.getDeliveryTag();
-
- if (Properties.APPROVE_APPID.equals(props.getAppId()) &&
- Properties.CONTENT_TYPE_JSON.equals(props.getContentType())) {
- // TODO: Get message then input as review. required 2.9 or later.
- }
-
- getChannel().basicAck(deliveryTag, false);
-
- } catch (IOException ex) {
- throw ex;
- } catch (RuntimeException ex) {
- LOGGER.warn("caught exception in delivery handler", ex);
- }
- }
- }
-*/
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQProperties.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQProperties.java
new file mode 100644
index 0000000..4c8f7c2
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/AMQProperties.java
@@ -0,0 +1,70 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import com.rabbitmq.client.AMQP;
+
+import org.apache.commons.codec.CharEncoding;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AMQProperties {
+
+ interface Factory {
+ AMQProperties create(Properties properties);
+ }
+
+ public final static String EVENT_APPID = "gerrit";
+ public final static String CONTENT_TYPE_JSON = "application/json";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AMQProperties.class);
+
+ private final Properties properties;
+ private AMQP.BasicProperties amqpProperties;
+
+ @Inject
+ public AMQProperties(@Assisted Properties properties) {
+ this.properties = properties;
+ }
+
+ public AMQP.BasicProperties getBasicProperties() {
+ if (amqpProperties == null) {
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(Keys.GERRIT_NAME.key, properties.getString(Keys.GERRIT_NAME));
+ headers.put(Keys.GERRIT_HOSTNAME.key, properties.getString(Keys.GERRIT_HOSTNAME));
+ headers.put(Keys.GERRIT_SCHEME.key, properties.getString(Keys.GERRIT_SCHEME));
+ headers.put(Keys.GERRIT_PORT.key, String.valueOf(properties.getInt(Keys.GERRIT_PORT)));
+ headers.put(Keys.GERRIT_FRONT_URL.key, properties.getGerritFrontUrl());
+ headers.put(Keys.GERRIT_VERSION.key, properties.getGerritVersion());
+
+ AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
+ builder.appId(EVENT_APPID);
+ builder.contentEncoding(CharEncoding.UTF_8);
+ builder.contentType(CONTENT_TYPE_JSON);
+ builder.deliveryMode(properties.getInt(Keys.MESSAGE_DELIVERY_MODE));
+ builder.priority(properties.getInt(Keys.MESSAGE_PRIORITY));
+ builder.headers(headers);
+
+ amqpProperties = builder.build();
+ }
+ return amqpProperties;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/BCSolver.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/BCSolver.java
new file mode 100644
index 0000000..84cf2cc
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/BCSolver.java
@@ -0,0 +1,73 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.rabbitmq;
+
+import static com.googlesource.gerrit.plugins.rabbitmq.PropertiesStore.FILE_EXT;
+import static com.googlesource.gerrit.plugins.rabbitmq.PropertiesStore.SITE_DIR;
+
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+@Singleton
+public class BCSolver {
+
+ private final static String DEFAULT_SITE_NAME = "default";
+ private static final Logger LOGGER = LoggerFactory.getLogger(BCSolver.class);
+
+ private final String pluginName;
+ private final Path pluginDataDir;
+ private final Path etcDir;
+
+ @Inject
+ public BCSolver(
+ @PluginName final String pluginName,
+ @PluginData final File pluginData,
+ final SitePaths sites
+ ) {
+ this.pluginName = pluginName;
+ this.pluginDataDir = pluginData.toPath();
+ this.etcDir = sites.etc_dir.toPath();
+ }
+
+ /**
+ * old : etc/rabbitmq.config
+ *
+ * new : data/rabbitmq/rabbitmq.config
+ * data/rabbitmq/site/default.config
+ */
+ void solve() {
+ try {
+ Path oldFile = etcDir.resolve(pluginName + FILE_EXT);
+ Path newFile = pluginDataDir.resolve(pluginName + FILE_EXT);
+ Path siteDir = pluginDataDir.resolve(SITE_DIR);
+
+ Files.createDirectories(siteDir);
+ Files.move(oldFile, newFile);
+ Files.createFile(siteDir.resolve(DEFAULT_SITE_NAME + FILE_EXT));
+ } catch (IOException iex) {
+ LOGGER.warn("{}", iex);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/DefaultMessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/DefaultMessagePublisher.java
index d5b94dc..fb93634 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/DefaultMessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/DefaultMessagePublisher.java
@@ -24,57 +24,77 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.CopyOnWriteArraySet;
@Singleton
public class DefaultMessagePublisher implements ChangeListener, LifecycleListener {
+ public static class SessionEntry {
+ public AMQPSession session;
+ public Timer monitorTimer;
+ }
+
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMessagePublisher.class);
private final static int MONITOR_FIRSTTIME_DELAY = 15000;
- private final Properties properties;
- private final AMQPSession session;
+ private final Set<SessionEntry> sessionEntries = new CopyOnWriteArraySet<>();
private final Gson gson;
- private final Timer monitorTimer = new Timer();
@Inject
public DefaultMessagePublisher(
- final Properties properties,
- final AMQPSession session,
final Gson gson) {
- this.properties = properties;
- this.session = session;
this.gson = gson;
}
+ private void openSession(final SessionEntry entry) {
+ if (!entry.session.isOpen()) {
+ entry.session.connect();
+ entry.monitorTimer = new Timer();
+ entry.monitorTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ if (!entry.session.isOpen()) {
+ LOGGER.info("#start: try to reconnect");
+ entry.session.connect();
+ }
+ }
+ }, MONITOR_FIRSTTIME_DELAY, entry.session.getProperties().getInt(Keys.MONITOR_INTERVAL));
+ }
+ }
+ public void addSession(final AMQPSession session) {
+ SessionEntry entry = new SessionEntry();
+ entry.session = session;
+ openSession(entry);
+ sessionEntries.add(entry);
+ }
+
@Override
public void start() {
LOGGER.info("Start default listener.");
- session.connect();
- monitorTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- if (!session.isOpen()) {
- LOGGER.info("#start: try to reconnect");
- session.connect();
- }
- }
- }, MONITOR_FIRSTTIME_DELAY, properties.getInt(Keys.MONITOR_INTERVAL));
+ for (SessionEntry entry : sessionEntries) {
+ openSession(entry);
+ }
}
@Override
public void stop() {
LOGGER.info("Stop default listener.");
- monitorTimer.cancel();
- session.disconnect();
+ for (SessionEntry entry : sessionEntries) {
+ entry.monitorTimer.cancel();
+ entry.session.disconnect();
+ }
}
@Override
public void onChangeEvent(ChangeEvent event) {
- if (session.isOpen()) {
- session.publishMessage(gson.toJson(event));
+ for (SessionEntry entry : sessionEntries) {
+ if (entry.session.isOpen()) {
+ entry.session.publishMessage(gson.toJson(event));
+ }
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/MessagePublisher.java
index facf174..1be39a8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/MessagePublisher.java
@@ -33,7 +33,7 @@
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.ProvisionException;
-import com.google.inject.Singleton;
+import com.google.inject.assistedinject.Assisted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,14 +41,16 @@
import java.util.Timer;
import java.util.TimerTask;
-@Singleton
public class MessagePublisher implements ChangeListener, LifecycleListener {
+ interface Factory {
+ MessagePublisher create(AMQPSession session);
+ }
+
private static final Logger LOGGER = LoggerFactory.getLogger(MessagePublisher.class);
private final static int MONITOR_FIRSTTIME_DELAY = 15000;
- private final Properties properties;
private final AMQPSession session;
private final ChangeHooks hooks;
private final Gson gson;
@@ -64,8 +66,7 @@
@Inject
public MessagePublisher(
- Properties properties,
- AMQPSession session,
+ @Assisted AMQPSession session,
ChangeHooks hooks,
Gson gson,
WorkQueue workQueue,
@@ -74,7 +75,6 @@
ThreadLocalRequestContext threadLocalRequestContext,
PluginUser pluginUser,
SchemaFactory<ReviewDb> schemaFactory) {
- this.properties = properties;
this.session = session;
this.hooks = hooks;
this.gson = gson;
@@ -98,10 +98,10 @@
session.connect();
}
}
- }, MONITOR_FIRSTTIME_DELAY, properties.getInt(Keys.MONITOR_INTERVAL));
+ }, MONITOR_FIRSTTIME_DELAY, session.getProperties().getInt(Keys.MONITOR_INTERVAL));
- if (properties.hasListenAs()) {
- final String userName = properties.getListenAs();
+ if (session.getProperties().hasListenAs()) {
+ final String userName = session.getProperties().getListenAs();
final ChangeListener changeListener = this;
workQueue.getDefaultQueue().submit(new Runnable() {
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
index c8dadbe..2d596e2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
@@ -18,17 +18,25 @@
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
class Module extends AbstractModule {
@Override
protected void configure() {
- bind(AMQPSession.class);
- bind(Properties.class);
- bind(MessagePublisher.class);
+ install(new FactoryModuleBuilder().build(Properties.Factory.class));
+ install(new FactoryModuleBuilder().build(AMQProperties.Factory.class));
+ install(new FactoryModuleBuilder().build(AMQPSession.Factory.class));
+ install(new FactoryModuleBuilder().build(MessagePublisher.Factory.class));
+ bind(PropertiesStore.class);
+ bind(BCSolver.class);
+// bind(AMQPSession.class);
+// bind(Properties.class);
+// bind(MessagePublisher.class);
bind(DefaultMessagePublisher.class);
bind(RabbitMQManager.class);
DynamicSet.bind(binder(), LifecycleListener.class).to(RabbitMQManager.class);
+ DynamicSet.bind(binder(), LifecycleListener.class).to(DefaultMessagePublisher.class);
DynamicSet.bind(binder(), ChangeListener.class).to(DefaultMessagePublisher.class);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Properties.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Properties.java
index f03c347..0e0bd12 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Properties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Properties.java
@@ -15,15 +15,11 @@
package com.googlesource.gerrit.plugins.rabbitmq;
import com.google.gerrit.common.Version;
-import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.server.config.GerritServerConfig;
-import com.google.gerrit.server.config.SitePaths;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
+import com.google.inject.assistedinject.Assisted;
-import com.rabbitmq.client.AMQP;
-
-import org.apache.commons.codec.CharEncoding;
+import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Config;
@@ -32,53 +28,77 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.nio.file.Files;
+import java.nio.file.Path;
-@Singleton
public class Properties {
- public final static String EVENT_APPID = "gerrit";
- public final static String CONTENT_TYPE_JSON = "application/json";
+ interface Factory {
+ Properties create(Path propertiesFile);
+ }
private static final Logger LOGGER = LoggerFactory.getLogger(Properties.class);
- private final static String CONFIG_FILEEXT = ".config";
private final static int MINIMUM_CONNECTION_MONITOR_INTERVAL = 5000;
- private final Config config;
- private final Config pluginConfig;
- private AMQP.BasicProperties properties;
+ private final Path propertiesFile;
+ private final Config serverConfig;
+ private final AMQProperties.Factory amqFactory;
+ private Config pluginConfig;
+ private AMQProperties amqProperties;
@Inject
- public Properties(@PluginName final String pluginName, final SitePaths site, @GerritServerConfig final Config config) {
- this.config = config;
- this.pluginConfig = getPluginConfig(new File(site.etc_dir, pluginName + CONFIG_FILEEXT));
- this.properties = generateBasicProperties();
+ public Properties(
+ @GerritServerConfig final Config serverConfig,
+ final AMQProperties.Factory amqFactory,
+ @Assisted final Path propertiesFile) {
+ this.propertiesFile = propertiesFile;
+ this.serverConfig = serverConfig;
+ this.amqFactory = amqFactory;
}
- public Config getPluginConfig(File cfgPath) {
- LOGGER.info("Loading " + cfgPath.toString() + " ...");
- FileBasedConfig cfg = new FileBasedConfig(cfgPath, FS.DETECTED);
- if (!cfg.getFile().exists()) {
- LOGGER.warn("No " + cfg.getFile());
- return cfg;
- }
- if (cfg.getFile().length() == 0) {
- LOGGER.info("Empty " + cfg.getFile());
- return cfg;
+ public boolean load() {
+ return load(null);
+ }
+
+ public boolean load(Properties baseProperties) {
+ pluginConfig = new Config();
+ LOGGER.info("Loading {} ...", propertiesFile);
+ if (!Files.exists(propertiesFile)) {
+ LOGGER.warn("No {}", propertiesFile);
+ return false;
}
+ FileBasedConfig cfg;
try {
+ if (baseProperties != null) {
+ cfg = new FileBasedConfig(baseProperties.getConfig(), propertiesFile.toFile(), FS.DETECTED);
+ } else {
+ cfg = new FileBasedConfig(propertiesFile.toFile(), FS.DETECTED);
+ }
cfg.load();
} catch (ConfigInvalidException e) {
- LOGGER.info("Config file " + cfg.getFile() + " is invalid: " + e.getMessage());
+ LOGGER.info("{} has invalid format: {}", propertiesFile, e.getMessage());
+ return false;
} catch (IOException e) {
- LOGGER.info("Cannot read " + cfg.getFile() + ": " + e.getMessage());
+ LOGGER.info("Cannot read {}: {}", propertiesFile, e.getMessage());
+ return false;
}
- return cfg;
+ pluginConfig = cfg;
+ return true;
+ }
+
+ public Config getConfig() {
+ return pluginConfig;
+ }
+
+ public Path getPath() {
+ return propertiesFile;
+ }
+
+ public String getName() {
+ return FilenameUtils.removeExtension(propertiesFile.getFileName().toString());
}
public String getString(Keys key) {
@@ -98,7 +118,8 @@
}
public String getGerritFrontUrl() {
- return StringUtils.stripToEmpty(config.getString(Keys.GERRIT_FRONT_URL.section, null, Keys.GERRIT_FRONT_URL.name));
+ return StringUtils.stripToEmpty(serverConfig.getString(
+ Keys.GERRIT_FRONT_URL.section, null, Keys.GERRIT_FRONT_URL.name));
}
public boolean hasListenAs() {
@@ -122,27 +143,10 @@
return interval;
}
- private AMQP.BasicProperties generateBasicProperties() {
- Map<String, Object> headers = new HashMap<>();
- headers.put(Keys.GERRIT_NAME.key, getString(Keys.GERRIT_NAME));
- headers.put(Keys.GERRIT_HOSTNAME.key, getString(Keys.GERRIT_HOSTNAME));
- headers.put(Keys.GERRIT_SCHEME.key, getString(Keys.GERRIT_SCHEME));
- headers.put(Keys.GERRIT_PORT.key, String.valueOf(getInt(Keys.GERRIT_PORT)));
- headers.put(Keys.GERRIT_FRONT_URL.key, getGerritFrontUrl());
- headers.put(Keys.GERRIT_VERSION.key, getGerritVersion());
-
- AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
- builder.appId(EVENT_APPID);
- builder.contentEncoding(CharEncoding.UTF_8);
- builder.contentType(CONTENT_TYPE_JSON);
- builder.deliveryMode(getInt(Keys.MESSAGE_DELIVERY_MODE));
- builder.priority(getInt(Keys.MESSAGE_PRIORITY));
- builder.headers(headers);
-
- return builder.build();
- }
-
- public AMQP.BasicProperties getBasicProperties() {
- return properties;
+ public AMQProperties getAMQProperties() {
+ if (amqProperties == null) {
+ amqProperties = amqFactory.create(this);
+ }
+ return amqProperties;
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/PropertiesStore.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/PropertiesStore.java
new file mode 100644
index 0000000..75cd4fc
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/PropertiesStore.java
@@ -0,0 +1,96 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.rabbitmq;
+
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.List;
+
+@Singleton
+public class PropertiesStore extends AbstractList<Properties> {
+
+ public static final String FILE_EXT = ".config";
+ public static final String SITE_DIR = "site";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PropertiesStore.class);
+
+
+ private final List<Properties> propertiesStore;
+ private final String pluginName;
+ private final Path pluginDataDir;
+ private final Properties.Factory propFactory;
+
+ @Inject
+ public PropertiesStore(
+ @PluginName final String pluginName,
+ @PluginData final File pluginData,
+ final Properties.Factory propFactory
+ ) {
+ this.propertiesStore = new ArrayList<>();
+ this.pluginName = pluginName;
+ this.pluginDataDir = pluginData.toPath();
+ this.propFactory = propFactory;
+ }
+
+ public void load() {
+ // Load base
+ Properties base = propFactory.create(pluginDataDir.resolve(pluginName + FILE_EXT));
+ base.load();
+
+ // Load site
+ try (DirectoryStream<Path> ds = Files.newDirectoryStream(pluginDataDir.resolve(SITE_DIR), "*" + FILE_EXT)) {
+ for (Path configFile : ds) {
+ Properties site = propFactory.create(configFile);
+ if (site.load(base)) {
+ propertiesStore.add(site);
+ }
+ }
+ } catch (IOException iex) {
+ LOGGER.warn(iex.getMessage());
+ }
+ }
+
+ @Override
+ public Properties get(int index) {
+ return propertiesStore.get(index);
+ }
+
+ public Properties get(String name) {
+ for (Properties p : propertiesStore) {
+ if (p.getName().equals(name)) {
+ return p;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public int size() {
+ return propertiesStore.size();
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java
index f9c8701..5aa6966 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMQManager.java
@@ -21,39 +21,55 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+
@Singleton
public class RabbitMQManager implements LifecycleListener {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQManager.class);
- private final MessagePublisher messagePublisher;
private final DefaultMessagePublisher defaultMessagePublisher;
- private final Properties properties;
+ private final MessagePublisher.Factory publisherFactory;
+ private final AMQPSession.Factory sessionFactory;
+ private final PropertiesStore propertiesStore;
+ private final BCSolver bcSolver;
+ private final List<MessagePublisher> publisherList = new ArrayList<>();
@Inject
public RabbitMQManager(
- MessagePublisher messagePublisher,
DefaultMessagePublisher defaultMessagePublisher,
- Properties properties) {
- this.messagePublisher = messagePublisher;
+ MessagePublisher.Factory publisherFactory,
+ AMQPSession.Factory sessionFactory,
+ PropertiesStore propertiesStore,
+ BCSolver bcSolver) {
this.defaultMessagePublisher = defaultMessagePublisher;
- this.properties = properties;
+ this.publisherFactory = publisherFactory;
+ this.sessionFactory = sessionFactory;
+ this.propertiesStore = propertiesStore;
+ this.bcSolver = bcSolver;
}
@Override
public void start() {
- if (properties.hasListenAs()) {
- messagePublisher.start();
- } else {
- defaultMessagePublisher.start();
+ bcSolver.solve();
+ propertiesStore.load();
+ for (Properties properties : propertiesStore) {
+ AMQPSession session = sessionFactory.create(properties);
+ if (properties.hasListenAs()) {
+ MessagePublisher publisher = publisherFactory.create(session);
+ publisher.start();
+ publisherList.add(publisher);
+ } else {
+ defaultMessagePublisher.addSession(session);
+ }
}
}
@Override
public void stop() {
- if (properties.hasListenAs()) {
- messagePublisher.stop();
- } else {
- defaultMessagePublisher.stop();
+ for (MessagePublisher publisher : publisherList) {
+ publisher.stop();
}
+ publisherList.clear();
}
}