Format Java files with google-java-format
Change-Id: Icf57b8d72c94abb9be565e41718a3406d6262e8e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Manager.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Manager.java
index c26685b..d4f1c59 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Manager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Manager.java
@@ -19,7 +19,6 @@
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
import com.googlesource.gerrit.plugins.rabbitmq.config.PropertiesFactory;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Gerrit;
@@ -28,10 +27,6 @@
import com.googlesource.gerrit.plugins.rabbitmq.worker.DefaultEventWorker;
import com.googlesource.gerrit.plugins.rabbitmq.worker.EventWorker;
import com.googlesource.gerrit.plugins.rabbitmq.worker.EventWorkerFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
@@ -39,6 +34,8 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Singleton
public class Manager implements LifecycleListener {
@@ -109,7 +106,8 @@
base.load();
// Load sites
- try (DirectoryStream<Path> ds = Files.newDirectoryStream(pluginDataDir.resolve(SITE_DIR), "*" + FILE_EXT)) {
+ try (DirectoryStream<Path> ds =
+ Files.newDirectoryStream(pluginDataDir.resolve(SITE_DIR), "*" + FILE_EXT)) {
for (Path configFile : ds) {
Properties site = propFactory.create(configFile);
if (site.load(base)) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
index 762d78e..2ecdec3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
@@ -22,7 +22,6 @@
import com.google.inject.Singleton;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.Multibinder;
-
import com.googlesource.gerrit.plugins.rabbitmq.config.PluginProperties;
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
import com.googlesource.gerrit.plugins.rabbitmq.config.PropertiesFactory;
@@ -56,9 +55,18 @@
sectionBinder.addBinding().to(Message.class);
sectionBinder.addBinding().to(Monitor.class);
- install(new FactoryModuleBuilder().implement(Publisher.class, MessagePublisher.class).build(PublisherFactory.class));
- install(new FactoryModuleBuilder().implement(Properties.class, PluginProperties.class).build(PropertiesFactory.class));
- install(new FactoryModuleBuilder().implement(EventWorker.class, UserEventWorker.class).build(EventWorkerFactory.class));
+ install(
+ new FactoryModuleBuilder()
+ .implement(Publisher.class, MessagePublisher.class)
+ .build(PublisherFactory.class));
+ install(
+ new FactoryModuleBuilder()
+ .implement(Properties.class, PluginProperties.class)
+ .build(PropertiesFactory.class));
+ install(
+ new FactoryModuleBuilder()
+ .implement(EventWorker.class, UserEventWorker.class)
+ .build(EventWorkerFactory.class));
bind(Gson.class).toProvider(GsonProvider.class).in(Singleton.class);
DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/annotation/Limit.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/annotation/Limit.java
index 4c33219..60c3e01 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/annotation/Limit.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/annotation/Limit.java
@@ -22,5 +22,6 @@
@Retention(RetentionPolicy.RUNTIME)
public @interface Limit {
int max() default -1;
+
int min() default -1;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/AMQProperties.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/AMQProperties.java
index ceaef0a..ef2660c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/AMQProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/AMQProperties.java
@@ -15,20 +15,17 @@
package com.googlesource.gerrit.plugins.rabbitmq.config;
import com.google.gerrit.common.TimeUtil;
-
import com.googlesource.gerrit.plugins.rabbitmq.annotation.MessageHeader;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Message;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Section;
import com.rabbitmq.client.AMQP;
-
-import org.apache.commons.codec.CharEncoding;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.lang.reflect.Field;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import org.apache.commons.codec.CharEncoding;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AMQProperties {
@@ -52,7 +49,7 @@
if (value == null) {
continue;
}
- switch(f.getType().getSimpleName()) {
+ switch (f.getType().getSimpleName()) {
case "String":
headers.put(mh.value(), value.toString());
break;
@@ -69,8 +66,7 @@
break;
}
} catch (IllegalAccessException | IllegalArgumentException ex) {
- LOGGER.warn("Cannot access field {}. Cause: {}",
- f.getName(), ex.getMessage());
+ LOGGER.warn("Cannot access field {}. Cause: {}", f.getName(), ex.getMessage());
}
}
}
@@ -79,13 +75,13 @@
public AMQP.BasicProperties getBasicProperties() {
return new AMQP.BasicProperties.Builder()
- .appId(EVENT_APPID)
- .contentEncoding(CharEncoding.UTF_8)
- .contentType(CONTENT_TYPE_JSON)
- .deliveryMode(message.deliveryMode)
- .priority(message.priority)
- .headers(headers)
- .timestamp(new Date(TimeUtil.nowMs()))
- .build();
+ .appId(EVENT_APPID)
+ .contentEncoding(CharEncoding.UTF_8)
+ .contentType(CONTENT_TYPE_JSON)
+ .deliveryMode(message.deliveryMode)
+ .priority(message.priority)
+ .headers(headers)
+ .timestamp(new Date(TimeUtil.nowMs()))
+ .build();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/PluginProperties.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/PluginProperties.java
index 09661ca..7577815 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/PluginProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/PluginProperties.java
@@ -16,12 +16,15 @@
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
-
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Gerrit;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Monitor;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Section;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Sections;
-
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Set;
import org.apache.commons.io.FilenameUtils;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Config;
@@ -30,12 +33,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Collections;
-import java.util.Set;
-
public class PluginProperties implements Properties {
private static final Logger LOGGER = LoggerFactory.getLogger(PluginProperties.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/Properties.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/Properties.java
index 635d3ad..476a198 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/Properties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/Properties.java
@@ -1,19 +1,24 @@
package com.googlesource.gerrit.plugins.rabbitmq.config;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Section;
-
-import org.eclipse.jgit.lib.Config;
-
import java.nio.file.Path;
import java.util.Set;
+import org.eclipse.jgit.lib.Config;
public interface Properties extends Cloneable {
Config toConfig();
+
boolean load();
+
boolean load(Properties baseProperties);
+
Path getPath();
+
String getName();
+
Set<Section> getSections();
+
<T extends Section> T getSection(Class<T> clazz);
+
AMQProperties getAMQProperties();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/PropertiesFactory.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/PropertiesFactory.java
index a82ebac..40d8d3c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/PropertiesFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/PropertiesFactory.java
@@ -18,5 +18,6 @@
public interface PropertiesFactory {
Properties create();
+
Properties create(Path propertiesFile);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Gerrit.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Gerrit.java
index b400b76..a7456dc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Gerrit.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Gerrit.java
@@ -17,10 +17,8 @@
import com.google.gerrit.common.Version;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.inject.Inject;
-
import com.googlesource.gerrit.plugins.rabbitmq.annotation.Default;
import com.googlesource.gerrit.plugins.rabbitmq.annotation.MessageHeader;
-
import org.eclipse.jgit.lib.Config;
public class Gerrit implements Section {
@@ -48,8 +46,7 @@
@MessageHeader("gerrit-version")
public String version;
- @Default
- public String listenAs;
+ @Default public String listenAs;
public String getAMQPUserPassword(String userName) {
return gerritConfig.getString("AMQP", userName, "password");
@@ -61,5 +58,4 @@
this.version = Version.getVersion();
this.gerritConfig = config;
}
-
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Message.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Message.java
index 4760c4c..b23a6e7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Message.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Message.java
@@ -24,6 +24,5 @@
@Default("0")
public Integer priority;
- @Default
- public String routingKey;
+ @Default public String routingKey;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Monitor.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Monitor.java
index 6f91648..b603c4a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Monitor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Monitor.java
@@ -20,7 +20,7 @@
public class Monitor implements Section {
@Default("15000")
- @Limit(min=5000)
+ @Limit(min = 5000)
public Integer interval;
@Default("15")
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Section.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Section.java
index dcb0a9a..87581a8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Section.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Section.java
@@ -14,5 +14,4 @@
package com.googlesource.gerrit.plugins.rabbitmq.config.section;
-public interface Section {
-}
+public interface Section {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Sections.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Sections.java
index ba6f220..0151607 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Sections.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/section/Sections.java
@@ -16,14 +16,12 @@
import com.googlesource.gerrit.plugins.rabbitmq.annotation.Default;
import com.googlesource.gerrit.plugins.rabbitmq.annotation.Limit;
-
+import java.lang.reflect.Field;
+import java.util.Set;
import org.eclipse.jgit.lib.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Field;
-import java.util.Set;
-
public final class Sections {
private static final Logger LOGGER = LoggerFactory.getLogger(Sections.class);
@@ -49,8 +47,7 @@
}
}
} catch (IllegalAccessException ex) {
- LOGGER.warn("Cannot access field {}. Cause: {}",
- f.getName(), ex.getMessage());
+ LOGGER.warn("Cannot access field {}. Cause: {}", f.getName(), ex.getMessage());
}
}
return section;
@@ -78,8 +75,7 @@
}
}
} catch (IllegalAccessException ex) {
- LOGGER.warn("Cannot access field {}. Cause: {}",
- f.getName(), ex.getMessage());
+ LOGGER.warn("Cannot access field {}. Cause: {}", f.getName(), ex.getMessage());
}
}
return config;
@@ -106,8 +102,7 @@
}
}
} catch (IllegalAccessException ex) {
- LOGGER.warn("Cannot access field {}. Cause: {}",
- f.getName(), ex.getMessage());
+ LOGGER.warn("Cannot access field {}. Cause: {}", f.getName(), ex.getMessage());
}
}
}
@@ -134,8 +129,7 @@
}
}
} catch (IllegalAccessException ex) {
- LOGGER.warn("Cannot access field {}. Cause: {}",
- f.getName(), ex.getMessage());
+ LOGGER.warn("Cannot access field {}. Cause: {}", f.getName(), ex.getMessage());
}
}
return section;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java
index 9aa38ed..fcc4e26 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java
@@ -24,7 +24,6 @@
@Override
public Gson get() {
- return new GsonBuilder().registerTypeAdapter(Supplier.class,
- new SupplierSerializer()).create();
+ return new GsonBuilder().registerTypeAdapter(Supplier.class, new SupplierSerializer()).create();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index 48465bc..607f4d2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -17,25 +17,22 @@
import com.google.gerrit.common.EventListener;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
-
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.AMQP;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Gerrit;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Monitor;
import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
import com.googlesource.gerrit.plugins.rabbitmq.session.SessionFactoryProvider;
-import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MessagePublisher implements Publisher, LifecycleListener {
@@ -50,8 +47,7 @@
private final Timer monitorTimer = new Timer();
private boolean available = true;
private EventListener eventListener;
- private final LinkedBlockingQueue<Event> queue =
- new LinkedBlockingQueue<>(MAX_EVENTS);
+ private final LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<>(MAX_EVENTS);
private CancelableRunnable publisher;
private Thread publisherThread;
@@ -63,58 +59,60 @@
this.session = sessionFactoryProvider.get().create(properties);
this.properties = properties;
this.gson = gson;
- this.eventListener = new EventListener() {
- @Override
- public void onEvent(Event event) {
- try {
- if (!publisherThread.isAlive()) {
- publisherThread.start();
- }
- queue.put(event);
- } catch (InterruptedException e) {
- LOGGER.warn("Failed to queue event", e);
- }
- }
- };
- this.publisher = new CancelableRunnable() {
-
- boolean canceled = false;
-
- @Override
- public void run() {
- while (!canceled) {
- try {
- if (isEnable() && session.isOpen()) {
- Event event = queue.poll(200, TimeUnit.MILLISECONDS);
- if (event != null) {
- if (isEnable() && session.isOpen()) {
- publishEvent(event);
- } else {
- queue.put(event);
- }
+ this.eventListener =
+ new EventListener() {
+ @Override
+ public void onEvent(Event event) {
+ try {
+ if (!publisherThread.isAlive()) {
+ publisherThread.start();
}
- } else {
- Thread.sleep(1000);
+ queue.put(event);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Failed to queue event", e);
}
- } catch (InterruptedException e) {
- LOGGER.warn("Interupted while taking event", e);
}
- }
- }
+ };
+ this.publisher =
+ new CancelableRunnable() {
- @Override
- public void cancel() {
- this.canceled = true;
- }
+ boolean canceled = false;
- @Override
- public String toString() {
- return "Rabbitmq publisher: "
- + properties.getSection(Gerrit.class).listenAs
- + "-"
- + properties.getSection(AMQP.class).uri;
- }
- };
+ @Override
+ public void run() {
+ while (!canceled) {
+ try {
+ if (isEnable() && session.isOpen()) {
+ Event event = queue.poll(200, TimeUnit.MILLISECONDS);
+ if (event != null) {
+ if (isEnable() && session.isOpen()) {
+ publishEvent(event);
+ } else {
+ queue.put(event);
+ }
+ }
+ } else {
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interupted while taking event", e);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ this.canceled = true;
+ }
+
+ @Override
+ public String toString() {
+ return "Rabbitmq publisher: "
+ + properties.getSection(Gerrit.class).listenAs
+ + "-"
+ + properties.getSection(AMQP.class).uri;
+ }
+ };
}
@Override
@@ -123,15 +121,18 @@
publisherThread.start();
if (!session.isOpen()) {
session.connect();
- monitorTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- if (!session.isOpen()) {
- LOGGER.info("#start: try to reconnect");
- session.connect();
- }
- }
- }, MONITOR_FIRSTTIME_DELAY, properties.getSection(Monitor.class).interval);
+ monitorTimer.schedule(
+ new TimerTask() {
+ @Override
+ public void run() {
+ if (!session.isOpen()) {
+ LOGGER.info("#start: try to reconnect");
+ session.connect();
+ }
+ }
+ },
+ MONITOR_FIRSTTIME_DELAY,
+ properties.getSection(Monitor.class).interval);
available = true;
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
index b729dd3..b74b94c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
@@ -1,18 +1,25 @@
package com.googlesource.gerrit.plugins.rabbitmq.message;
import com.google.gerrit.common.EventListener;
-
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
public interface Publisher {
void start();
+
void stop();
+
void enable();
+
void disable();
+
boolean isEnable();
+
Session getSession();
+
Properties getProperties();
+
String getName();
+
EventListener getEventListener();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
index 5aa03fb..180aeda 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
@@ -15,7 +15,10 @@
public interface Session {
boolean isOpen();
+
void connect();
+
void disconnect();
+
void publish(String message);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
index d5389dc..d0a872b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
@@ -27,18 +27,16 @@
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownNotifier;
import com.rabbitmq.client.ShutdownSignalException;
-
-import org.apache.commons.codec.CharEncoding;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.codec.CharEncoding;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class AMQPSession implements Session {
@@ -65,8 +63,8 @@
if (cause.isInitiatedByApplication()) {
LOGGER.info(MSG("Channel #{} closed by application."), ch.getChannelNumber());
} else {
- LOGGER.warn(MSG("Channel #{} closed. Cause: {}"),
- ch.getChannelNumber(), cause.getMessage());
+ LOGGER.warn(
+ MSG("Channel #{} closed. Cause: {}"), ch.getChannelNumber(), cause.getMessage());
}
if (ch.equals(AMQPSession.this.channel)) {
AMQPSession.this.channel = null;
@@ -205,7 +203,9 @@
Exchange exchange = properties.getSection(Exchange.class);
try {
LOGGER.debug(MSG("Sending message."));
- channel.basicPublish(exchange.name, message.routingKey,
+ channel.basicPublish(
+ exchange.name,
+ message.routingKey,
properties.getAMQProperties().getBasicProperties(),
messageBody.getBytes(CharEncoding.UTF_8));
} catch (IOException ex) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java
index 5241ce3..5ac8764 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java
@@ -17,14 +17,11 @@
import com.google.gerrit.common.EventListener;
import com.google.gerrit.server.events.Event;
import com.google.inject.Singleton;
-
import com.googlesource.gerrit.plugins.rabbitmq.message.Publisher;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Singleton
public class DefaultEventWorker implements EventListener, EventWorker {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/EventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/EventWorker.java
index 1a46663..e25d3c6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/EventWorker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/EventWorker.java
@@ -4,7 +4,10 @@
public interface EventWorker {
void addPublisher(Publisher publisher);
+
void addPublisher(Publisher publisher, String userName);
+
void removePublisher(Publisher publisher);
+
void clear();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java
index a919dbd..324215d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java
@@ -32,14 +32,11 @@
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.ProvisionException;
-
import com.googlesource.gerrit.plugins.rabbitmq.message.Publisher;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.HashMap;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class UserEventWorker implements EventWorker {
@@ -80,75 +77,81 @@
@Override
public void addPublisher(final Publisher publisher, final String userName) {
- workQueue.getDefaultQueue().submit(new Runnable() {
- private ReviewDb db;
- private Account userAccount;
-
- @Override
- public void run() {
- RequestContext old = threadLocalRequestContext
- .setContext(new RequestContext() {
+ workQueue
+ .getDefaultQueue()
+ .submit(
+ new Runnable() {
+ private ReviewDb db;
+ private Account userAccount;
@Override
- public CurrentUser getUser() {
- return pluginUser;
- }
+ public void run() {
+ RequestContext old =
+ threadLocalRequestContext.setContext(
+ new RequestContext() {
- @Override
- public Provider<ReviewDb> getReviewDbProvider() {
- return new Provider<ReviewDb>() {
- @Override
- public ReviewDb get() {
- if (db == null) {
- try {
- db = schemaFactory.open();
- } catch (OrmException e) {
- throw new ProvisionException("Cannot open ReviewDb", e);
- }
- }
- return db;
+ @Override
+ public CurrentUser getUser() {
+ return pluginUser;
+ }
+
+ @Override
+ public Provider<ReviewDb> getReviewDbProvider() {
+ return new Provider<ReviewDb>() {
+ @Override
+ public ReviewDb get() {
+ if (db == null) {
+ try {
+ db = schemaFactory.open();
+ } catch (OrmException e) {
+ throw new ProvisionException("Cannot open ReviewDb", e);
+ }
+ }
+ return db;
+ }
+ };
+ }
+ });
+ try {
+ userAccount = accountResolver.find(db, userName);
+ if (userAccount == null) {
+ LOGGER.error(
+ "No single user could be found when searching for listenAs: {}", userName);
+ return;
}
- };
+ final IdentifiedUser user = userFactory.create(userAccount.getId());
+ RegistrationHandle registration =
+ eventListeners.add(
+ new UserScopedEventListener() {
+ @Override
+ public void onEvent(Event event) {
+ publisher.getEventListener().onEvent(event);
+ }
+
+ @Override
+ public CurrentUser getUser() {
+ return user;
+ }
+ });
+ eventListenerRegistrations.put(publisher, registration);
+ LOGGER.info("Listen events as : {}", userName);
+ } catch (OrmException e) {
+ LOGGER.error("Could not query database for listenAs", e);
+ return;
+ } finally {
+ threadLocalRequestContext.setContext(old);
+ if (db != null) {
+ db.close();
+ db = null;
+ }
+ }
}
});
- try {
- userAccount = accountResolver.find(db, userName);
- if (userAccount == null) {
- LOGGER.error("No single user could be found when searching for listenAs: {}", userName);
- return;
- }
- final IdentifiedUser user = userFactory.create(userAccount.getId());
- RegistrationHandle registration =
- eventListeners.add(new UserScopedEventListener() {
- @Override
- public void onEvent(Event event) {
- publisher.getEventListener().onEvent(event);
- }
- @Override
- public CurrentUser getUser() {
- return user;
- }
- });
- eventListenerRegistrations.put(publisher, registration);
- LOGGER.info("Listen events as : {}", userName);
- } catch (OrmException e) {
- LOGGER.error("Could not query database for listenAs", e);
- return;
- } finally {
- threadLocalRequestContext.setContext(old);
- if (db != null) {
- db.close();
- db = null;
- }
- }
- }
- });
}
@Override
public void removePublisher(final Publisher publisher) {
- RegistrationHandle registration =
- eventListenerRegistrations.remove(publisher);
+ RegistrationHandle registration = eventListenerRegistrations.remove(publisher);
if (registration != null) {
registration.remove();
}