Update amqp-client to 5.10.0
Change ShutdownListeners to lambda expressions.
https://github.com/rabbitmq/rabbitmq-java-client/releases/tag/v5.0.0
Instead of setting channel and connection to null in ShutdownListeners,
trust ShutdownNotifierComponent#isOpen() that returns false if there
is a shutdown-cause.
Change-Id: Ia8304b775fb470613b4ae40210a81f6b1ffb55f9
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 0af44ef..2827eee 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -3,6 +3,6 @@
def external_plugin_deps():
maven_jar(
name = "amqp_client",
- artifact = "com.rabbitmq:amqp-client:4.1.1",
- sha1 = "256f6c92c55a8d3cfae8d32e1a15713baedab184",
+ artifact = "com.rabbitmq:amqp-client:5.10.0",
+ sha1 = "4de351467a13b8ca4eb7e8023032f9f964a21796",
)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
index c77ed05..1d3c4f0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
@@ -26,8 +26,6 @@
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.ShutdownListener;
-import com.rabbitmq.client.ShutdownNotifier;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.net.URISyntaxException;
@@ -41,58 +39,11 @@
public final class AMQPSession implements Session {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private class ShutdownListenerImpl implements ShutdownListener {
-
- private final Class<?> clazz;
-
- <T extends ShutdownNotifier> ShutdownListenerImpl(Class<T> clazz) {
- this.clazz = clazz;
- }
-
- @Override
- public void shutdownCompleted(ShutdownSignalException cause) {
- if (cause != null) {
- Object obj = cause.getReference();
- if (Channel.class.isInstance(obj)) {
- Channel.class.cast(obj).removeShutdownListener(this);
- } else if (Connection.class.isInstance(obj)) {
- Connection.class.cast(obj).removeShutdownListener(this);
- }
- if (clazz.isInstance(obj)) {
- if (clazz == Channel.class) {
- Channel ch = Channel.class.cast(obj);
- if (cause.isInitiatedByApplication()) {
- logger.atInfo().log(MSG("Channel #%d closed by application."), ch.getChannelNumber());
- } else {
- logger.atWarning().log(
- MSG("Channel #%dclosed. Cause: %s"), ch.getChannelNumber(), cause.getMessage());
- }
- if (ch.equals(AMQPSession.this.channel)) {
- AMQPSession.this.channel = null;
- }
- } else if (clazz == Connection.class) {
- Connection conn = Connection.class.cast(obj);
- if (cause.isInitiatedByApplication()) {
- logger.atInfo().log(MSG("Connection closed by application."));
- } else {
- logger.atWarning().log(MSG("Connection closed. Cause: %s"), cause.getMessage());
- }
- if (conn.equals(AMQPSession.this.connection)) {
- AMQPSession.this.connection = null;
- }
- }
- }
- }
- }
- }
-
private final Properties properties;
private volatile Connection connection;
private volatile Channel channel;
private final AtomicInteger failureCount = new AtomicInteger(0);
- private final ShutdownListener connectionListener = new ShutdownListenerImpl(Connection.class);
- private final ShutdownListener channelListener = new ShutdownListenerImpl(Channel.class);
public AMQPSession(Properties properties) {
this.properties = properties;
@@ -104,22 +55,31 @@
@Override
public boolean isOpen() {
- if (connection != null) {
+ if (connection != null && connection.isOpen()) {
return true;
}
return false;
}
private Channel getChannel() {
- Channel ch = null;
- if (connection == null) {
+ if (!isOpen()) {
connect();
} else {
try {
- ch = connection.createChannel();
- ch.addShutdownListener(channelListener);
+ Channel ch = connection.createChannel();
+ int channelId = ch.getChannelNumber();
+ ch.addShutdownListener(
+ cause -> {
+ if (cause.isInitiatedByApplication()) {
+ logger.atInfo().log(MSG("Channel #%d closed by application."), channelId);
+ } else {
+ logger.atWarning().log(
+ MSG("Channel #%d closed. Cause: %s"), channelId, cause.getMessage());
+ }
+ });
failureCount.set(0);
- logger.atInfo().log(MSG("Channel #%d opened."), ch.getChannelNumber());
+ logger.atInfo().log(MSG("Channel #%d opened."), channelId);
+ return ch;
} catch (IOException | AlreadyClosedException ex) {
logger.atSevere().withCause(ex).log(MSG("Failed to open channel."));
failureCount.incrementAndGet();
@@ -130,12 +90,12 @@
disconnect();
}
}
- return ch;
+ return null;
}
@Override
public boolean connect() {
- if (connection != null && connection.isOpen()) {
+ if (isOpen()) {
logger.atInfo().log(MSG("Already connected."));
return true;
}
@@ -156,7 +116,14 @@
factory.setPassword(amqp.password);
}
connection = factory.newConnection();
- connection.addShutdownListener(connectionListener);
+ connection.addShutdownListener(
+ cause -> {
+ if (cause.isInitiatedByApplication()) {
+ logger.atInfo().log(MSG("Connection closed by application."));
+ } else {
+ logger.atWarning().log(MSG("Connection closed. Cause: %s"), cause.getMessage());
+ }
+ });
logger.atInfo().log(MSG("Connection established."));
return true;
}