Merge branch 'stable-2.15' into stable-2.16
* stable-2.15:
Don't block plugin unload when no events available.
Catch ShutDownSignalException when closing connection
MessagePublisher: Threads cannot be restarted
Change-Id: I3100a053f0536b50bb2f7ef27c5cc8b918f78d20
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Manager.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Manager.java
index d4f1c59..55b0929 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Manager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Manager.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.rabbitmq;
+import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.annotations.PluginData;
import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.extensions.events.LifecycleListener;
@@ -34,13 +35,11 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Singleton
public class Manager implements LifecycleListener {
- private static final Logger LOGGER = LoggerFactory.getLogger(Manager.class);
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public static final String FILE_EXT = ".config";
public static final String SITE_DIR = "site";
@@ -77,7 +76,7 @@
publisher.start();
String listenAs = properties.getSection(Gerrit.class).listenAs;
if (!listenAs.isEmpty()) {
- userEventWorker.addPublisher(publisher, listenAs);
+ userEventWorker.addPublisher(pluginName, publisher, listenAs);
} else {
defaultEventWorker.addPublisher(publisher);
}
@@ -114,11 +113,11 @@
propList.add(site);
}
}
- } catch (IOException iex) {
- LOGGER.warn(iex.getMessage());
+ } catch (IOException ioe) {
+ logger.atWarning().log(ioe.getMessage());
}
if (propList.isEmpty()) {
- LOGGER.warn("No site configs found. Using base config only!");
+ logger.atWarning().log("No site configs found. Using base config only!");
propList.add(base);
}
return propList;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
index 2ecdec3..126a036 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
@@ -14,9 +14,9 @@
package com.googlesource.gerrit.plugins.rabbitmq;
-import com.google.gerrit.common.EventListener;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/AMQProperties.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/AMQProperties.java
index ef2660c..4ecf35e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/AMQProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/AMQProperties.java
@@ -14,7 +14,8 @@
package com.googlesource.gerrit.plugins.rabbitmq.config;
-import com.google.gerrit.common.TimeUtil;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.util.time.TimeUtil;
import com.googlesource.gerrit.plugins.rabbitmq.annotation.MessageHeader;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Message;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Section;
@@ -24,15 +25,13 @@
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.codec.CharEncoding;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class AMQProperties {
public static final String EVENT_APPID = "gerrit";
public static final String CONTENT_TYPE_JSON = "application/json";
- private static final Logger LOGGER = LoggerFactory.getLogger(AMQProperties.class);
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final Message message;
private final Map<String, Object> headers;
@@ -66,7 +65,8 @@
break;
}
} catch (IllegalAccessException | IllegalArgumentException ex) {
- LOGGER.warn("Cannot access field {}. Cause: {}", f.getName(), ex.getMessage());
+ logger.atWarning().log(
+ "Cannot access field %s. Cause: %s", f.getName(), ex.getMessage());
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/PluginProperties.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/PluginProperties.java
index 7577815..dcfe32f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/PluginProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/config/PluginProperties.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.rabbitmq.config;
+import com.google.common.flogger.FluentLogger;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Gerrit;
@@ -30,12 +31,10 @@
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.util.FS;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class PluginProperties implements Properties {
- private static final Logger LOGGER = LoggerFactory.getLogger(PluginProperties.class);
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final int MINIMUM_CONNECTION_MONITOR_INTERVAL = 5000;
@@ -77,9 +76,9 @@
@Override
public boolean load(Properties baseProperties) {
initialize();
- LOGGER.info("Loading {} ...", propertiesFile);
+ logger.atInfo().log("Loading %s", propertiesFile);
if (!Files.exists(propertiesFile)) {
- LOGGER.warn("No {}", propertiesFile);
+ logger.atWarning().log("No %s", propertiesFile);
return false;
}
@@ -87,10 +86,10 @@
try {
cfg.load();
} catch (ConfigInvalidException e) {
- LOGGER.info("{} has invalid format: {}", propertiesFile, e.getMessage());
+ logger.atInfo().log("%s has invalid format: %s", propertiesFile, e.getMessage());
return false;
} catch (IOException e) {
- LOGGER.info("Cannot read {}: {}", propertiesFile, e.getMessage());
+ logger.atInfo().log("Cannot read %s: %s", propertiesFile, e.getMessage());
return false;
}
for (Section section : getSections()) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index 06da71c..0a35f54 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -14,9 +14,10 @@
package com.googlesource.gerrit.plugins.rabbitmq.message;
-import com.google.gerrit.common.EventListener;
+import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@@ -29,12 +30,11 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class MessagePublisher implements Publisher, LifecycleListener {
- private static final Logger LOGGER = LoggerFactory.getLogger(MessagePublisher.class);
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
private static final int MAX_EVENTS = 16384;
private static final int MONITOR_FIRSTTIME_DELAY = 15000;
private static final String END_OF_STREAM = "END-OF-STREAM_$F7;XTSUQ(Dv#N6]g+gd,,uzRp%G-P";
@@ -70,12 +70,13 @@
if (queue.offer(event)) {
if (lostEventCount > 0) {
- LOGGER.warn("Event queue is no longer full, {} events were lost", lostEventCount);
+ logger.atWarning().log(
+ "Event queue is no longer full, %d events were lost", lostEventCount);
lostEventCount = 0;
}
} else {
if (lostEventCount++ % 10 == 0) {
- LOGGER.error("Event queue is full, lost {} event(s)", lostEventCount);
+ logger.atSevere().log("Event queue is full, lost %d event(s)", lostEventCount);
}
}
}
@@ -99,10 +100,11 @@
}
}
if (!publishEvent(event) && !queue.offer(event)) {
- LOGGER.error("Event lost: {}", gson.toJson(event));
+ logger.atSevere().log("Event lost: %s", gson.toJson(event));
}
} catch (InterruptedException e) {
- LOGGER.warn("Interupted while waiting for event or connection.", e);
+ logger.atWarning().withCause(e).log(
+ "Interupted while waiting for event or connection.");
}
}
}
@@ -135,7 +137,7 @@
@Override
public void run() {
if (!isConnected()) {
- LOGGER.info("#start: try to reconnect");
+ logger.atInfo().log("#start: try to reconnect");
connect();
}
}
@@ -192,7 +194,7 @@
private synchronized void ensurePublisherThreadStarted() {
if (publisherThread == null || !publisherThread.isAlive()) {
- LOGGER.info("Creating new publisher thread.");
+ logger.atInfo().log("Creating new publisher thread.");
publisherThread = new Thread(publisher);
publisherThread.setName("rabbitmq-publisher");
publisherThread.start();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
index f782b00..8ef1b3e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
@@ -1,6 +1,6 @@
package com.googlesource.gerrit.plugins.rabbitmq.message;
-import com.google.gerrit.common.EventListener;
+import com.google.gerrit.server.events.EventListener;
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
public interface Publisher {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
index 5523b16..8f91772 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.rabbitmq.session.type;
+import com.google.common.flogger.FluentLogger;
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.AMQP;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Exchange;
@@ -36,10 +37,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.CharEncoding;
import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public final class AMQPSession implements Session {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private class ShutdownListenerImpl implements ShutdownListener {
@@ -62,10 +62,10 @@
if (clazz == Channel.class) {
Channel ch = Channel.class.cast(obj);
if (cause.isInitiatedByApplication()) {
- LOGGER.info(MSG("Channel #{} closed by application."), ch.getChannelNumber());
+ logger.atInfo().log(MSG("Channel #%d closed by application."), ch.getChannelNumber());
} else {
- LOGGER.warn(
- MSG("Channel #{} closed. Cause: {}"), ch.getChannelNumber(), cause.getMessage());
+ logger.atWarning().log(
+ MSG("Channel #%dclosed. Cause: %s"), ch.getChannelNumber(), cause.getMessage());
}
if (ch.equals(AMQPSession.this.channel)) {
AMQPSession.this.channel = null;
@@ -73,9 +73,9 @@
} else if (clazz == Connection.class) {
Connection conn = Connection.class.cast(obj);
if (cause.isInitiatedByApplication()) {
- LOGGER.info(MSG("Connection closed by application."));
+ logger.atInfo().log(MSG("Connection closed by application."));
} else {
- LOGGER.warn(MSG("Connection closed. Cause: {}"), cause.getMessage());
+ logger.atWarning().log(MSG("Connection closed. Cause: %s"), cause.getMessage());
}
if (conn.equals(AMQPSession.this.connection)) {
AMQPSession.this.connection = null;
@@ -86,7 +86,6 @@
}
}
- private static final Logger LOGGER = LoggerFactory.getLogger(AMQPSession.class);
private final Properties properties;
private volatile Connection connection;
private volatile Channel channel;
@@ -120,13 +119,13 @@
ch = connection.createChannel();
ch.addShutdownListener(channelListener);
failureCount.set(0);
- LOGGER.info(MSG("Channel #{} opened."), ch.getChannelNumber());
+ logger.atInfo().log(MSG("Channel #%d opened."), ch.getChannelNumber());
} catch (IOException | AlreadyClosedException ex) {
- LOGGER.error(MSG("Failed to open channel."), ex);
+ logger.atSevere().withCause(ex).log(MSG("Failed to open channel."));
failureCount.incrementAndGet();
}
if (failureCount.get() > properties.getSection(Monitor.class).failureCount) {
- LOGGER.warn("Connection has something wrong. So will be disconnected.");
+ logger.atWarning().log("Creating channel failed %d times, closing connection.", failureCount.get());
disconnect();
}
}
@@ -136,11 +135,11 @@
@Override
public boolean connect() {
if (connection != null && connection.isOpen()) {
- LOGGER.info(MSG("Already connected."));
+ logger.atInfo().log(MSG("Already connected."));
return true;
}
AMQP amqp = properties.getSection(AMQP.class);
- LOGGER.info(MSG("Connect to {}..."), amqp.uri);
+ logger.atInfo().log(MSG("Connect to %s..."), amqp.uri);
ConnectionFactory factory = new ConnectionFactory();
try {
if (StringUtils.isNotEmpty(amqp.uri)) {
@@ -157,40 +156,40 @@
}
connection = factory.newConnection();
connection.addShutdownListener(connectionListener);
- LOGGER.info(MSG("Connection established."));
+ logger.atInfo().log(MSG("Connection established."));
return true;
}
} catch (URISyntaxException ex) {
- LOGGER.error(MSG("URI syntax error: {}"), amqp.uri);
+ logger.atSevere().log(MSG("URI syntax error: %s"), amqp.uri);
} catch (IOException | TimeoutException ex) {
- LOGGER.error(MSG("Connection cannot be opened."), ex);
+ logger.atSevere().withCause(ex).log(MSG("Connection cannot be opened."));
} catch (KeyManagementException | NoSuchAlgorithmException ex) {
- LOGGER.error(MSG("Security error when opening connection."), ex);
+ logger.atSevere().withCause(ex).log(MSG("Security error when opening connection."));
}
return false;
}
@Override
public void disconnect() {
- LOGGER.info(MSG("Disconnecting..."));
+ logger.atInfo().log(MSG("Disconnecting..."));
try {
if (channel != null) {
- LOGGER.info(MSG("Closing Channel #{}..."), channel.getChannelNumber());
+ logger.atInfo().log(MSG("Closing Channel #%d..."), channel.getChannelNumber());
channel.close();
}
} catch (IOException | TimeoutException ex) {
- LOGGER.error(MSG("Error when closing channel."), ex);
+ logger.atSevere().withCause(ex).log(MSG("Error when closing channel."));
} finally {
channel = null;
}
try {
if (connection != null) {
- LOGGER.info(MSG("Closing Connection..."));
+ logger.atInfo().log(MSG("Closing Connection..."));
connection.close();
}
} catch (IOException | ShutdownSignalException ex) {
- LOGGER.warn(MSG("Error when closing connection."), ex);
+ logger.atWarning().withCause(ex).log(MSG("Error when closing connection."));
} finally {
connection = null;
}
@@ -205,7 +204,7 @@
Message message = properties.getSection(Message.class);
Exchange exchange = properties.getSection(Exchange.class);
try {
- LOGGER.debug(MSG("Sending message."));
+ logger.atFine().log(MSG("Sending message."));
channel.basicPublish(
exchange.name,
message.routingKey,
@@ -213,11 +212,11 @@
messageBody.getBytes(CharEncoding.UTF_8));
return true;
} catch (IOException ex) {
- LOGGER.error(MSG("Error when sending meessage."), ex);
+ logger.atSevere().withCause(ex).log(MSG("Error when sending meessage."));
return false;
}
}
- LOGGER.error(MSG("Cannot open channel."));
+ logger.atSevere().log(MSG("Cannot open channel."));
return false;
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java
index 5ac8764..53ed103 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java
@@ -14,19 +14,18 @@
package com.googlesource.gerrit.plugins.rabbitmq.worker;
-import com.google.gerrit.common.EventListener;
+import com.google.common.flogger.FluentLogger;
import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.rabbitmq.message.Publisher;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Singleton
public class DefaultEventWorker implements EventListener, EventWorker {
- private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventWorker.class);
+ private final FluentLogger logger = FluentLogger.forEnclosingClass();
private final Set<Publisher> publishers = new CopyOnWriteArraySet<>();
@@ -36,8 +35,8 @@
}
@Override
- public void addPublisher(Publisher publisher, String userName) {
- LOGGER.warn("addPublisher() with username '{}' was called. Hence no operation.", userName);
+ public void addPublisher(String pluginName, Publisher publisher, String userName) {
+ logger.atWarning().log("addPublisher() with username '%s' was called. No-op.", userName);
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/EventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/EventWorker.java
index e25d3c6..97392c9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/EventWorker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/EventWorker.java
@@ -5,7 +5,7 @@
public interface EventWorker {
void addPublisher(Publisher publisher);
- void addPublisher(Publisher publisher, String userName);
+ void addPublisher(String pluginName, Publisher publisher, String userName);
void removePublisher(Publisher publisher);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java
index 126a8b7..de21369 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java
@@ -14,7 +14,7 @@
package com.googlesource.gerrit.plugins.rabbitmq.worker;
-import com.google.gerrit.common.UserScopedEventListener;
+import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.extensions.registration.RegistrationHandle;
import com.google.gerrit.reviewdb.client.Account;
@@ -24,6 +24,7 @@
import com.google.gerrit.server.PluginUser;
import com.google.gerrit.server.account.AccountResolver;
import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.UserScopedEventListener;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.util.RequestContext;
import com.google.gerrit.server.util.ThreadLocalRequestContext;
@@ -37,12 +38,10 @@
import java.util.HashMap;
import java.util.Map;
import org.eclipse.jgit.errors.ConfigInvalidException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class UserEventWorker implements EventWorker {
- private static final Logger LOGGER = LoggerFactory.getLogger(UserEventWorker.class);
+ private final FluentLogger logger = FluentLogger.forEnclosingClass();
private final DynamicSet<UserScopedEventListener> eventListeners;
private final WorkQueue workQueue;
@@ -74,11 +73,12 @@
@Override
public void addPublisher(final Publisher publisher) {
- LOGGER.warn("addPublisher() without username was called. Hence no operation.");
+ logger.atWarning().log("addPublisher() without username was called. No-op.");
}
@Override
- public void addPublisher(final Publisher publisher, final String userName) {
+ public void addPublisher(
+ final String pluginName, final Publisher publisher, final String userName) {
workQueue
.getDefaultQueue()
.submit(
@@ -117,13 +117,13 @@
try {
userAccount = accountResolver.find(userName);
if (userAccount == null) {
- LOGGER.error(
- "No single user could be found when searching for listenAs: {}", userName);
+ logger.atSevere().log("Cannot find account for listenAs: %s", userName);
return;
}
final IdentifiedUser user = userFactory.create(userAccount.getId());
RegistrationHandle registration =
eventListeners.add(
+ pluginName,
new UserScopedEventListener() {
@Override
public void onEvent(Event event) {
@@ -136,9 +136,9 @@
}
});
eventListenerRegistrations.put(publisher, registration);
- LOGGER.info("Listen events as : {}", userName);
+ logger.atInfo().log("Listen events as : %s", userName);
} catch (OrmException | ConfigInvalidException | IOException e) {
- LOGGER.error("Could not query database for listenAs", e);
+ logger.atSevere().withCause(e).log("Could not query database for listenAs");
return;
} finally {
threadLocalRequestContext.setContext(old);