Update amqp-client to 5.10.0

Change ShutdownListeners to lambda expressions.
https://github.com/rabbitmq/rabbitmq-java-client/releases/tag/v5.0.0

Instead of setting channel and connection to null in ShutdownListeners,
trust ShutdownNotifierComponent#isOpen() that returns false if there
is a shutdown-cause.

Change-Id: Ia8304b775fb470613b4ae40210a81f6b1ffb55f9
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 0af44ef..2827eee 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -3,6 +3,6 @@
 def external_plugin_deps():
     maven_jar(
         name = "amqp_client",
-        artifact = "com.rabbitmq:amqp-client:4.1.1",
-        sha1 = "256f6c92c55a8d3cfae8d32e1a15713baedab184",
+        artifact = "com.rabbitmq:amqp-client:5.10.0",
+        sha1 = "4de351467a13b8ca4eb7e8023032f9f964a21796",
     )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
index c77ed05..1d3c4f0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
@@ -26,8 +26,6 @@
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.ShutdownListener;
-import com.rabbitmq.client.ShutdownNotifier;
 import com.rabbitmq.client.ShutdownSignalException;
 import java.io.IOException;
 import java.net.URISyntaxException;
@@ -41,58 +39,11 @@
 public final class AMQPSession implements Session {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
-  private class ShutdownListenerImpl implements ShutdownListener {
-
-    private final Class<?> clazz;
-
-    <T extends ShutdownNotifier> ShutdownListenerImpl(Class<T> clazz) {
-      this.clazz = clazz;
-    }
-
-    @Override
-    public void shutdownCompleted(ShutdownSignalException cause) {
-      if (cause != null) {
-        Object obj = cause.getReference();
-        if (Channel.class.isInstance(obj)) {
-          Channel.class.cast(obj).removeShutdownListener(this);
-        } else if (Connection.class.isInstance(obj)) {
-          Connection.class.cast(obj).removeShutdownListener(this);
-        }
-        if (clazz.isInstance(obj)) {
-          if (clazz == Channel.class) {
-            Channel ch = Channel.class.cast(obj);
-            if (cause.isInitiatedByApplication()) {
-              logger.atInfo().log(MSG("Channel #%d closed by application."), ch.getChannelNumber());
-            } else {
-              logger.atWarning().log(
-                  MSG("Channel #%dclosed. Cause: %s"), ch.getChannelNumber(), cause.getMessage());
-            }
-            if (ch.equals(AMQPSession.this.channel)) {
-              AMQPSession.this.channel = null;
-            }
-          } else if (clazz == Connection.class) {
-            Connection conn = Connection.class.cast(obj);
-            if (cause.isInitiatedByApplication()) {
-              logger.atInfo().log(MSG("Connection closed by application."));
-            } else {
-              logger.atWarning().log(MSG("Connection closed. Cause: %s"), cause.getMessage());
-            }
-            if (conn.equals(AMQPSession.this.connection)) {
-              AMQPSession.this.connection = null;
-            }
-          }
-        }
-      }
-    }
-  }
-
   private final Properties properties;
   private volatile Connection connection;
   private volatile Channel channel;
 
   private final AtomicInteger failureCount = new AtomicInteger(0);
-  private final ShutdownListener connectionListener = new ShutdownListenerImpl(Connection.class);
-  private final ShutdownListener channelListener = new ShutdownListenerImpl(Channel.class);
 
   public AMQPSession(Properties properties) {
     this.properties = properties;
@@ -104,22 +55,31 @@
 
   @Override
   public boolean isOpen() {
-    if (connection != null) {
+    if (connection != null && connection.isOpen()) {
       return true;
     }
     return false;
   }
 
   private Channel getChannel() {
-    Channel ch = null;
-    if (connection == null) {
+    if (!isOpen()) {
       connect();
     } else {
       try {
-        ch = connection.createChannel();
-        ch.addShutdownListener(channelListener);
+        Channel ch = connection.createChannel();
+        int channelId = ch.getChannelNumber();
+        ch.addShutdownListener(
+            cause -> {
+              if (cause.isInitiatedByApplication()) {
+                logger.atInfo().log(MSG("Channel #%d closed by application."), channelId);
+              } else {
+                logger.atWarning().log(
+                    MSG("Channel #%d closed. Cause: %s"), channelId, cause.getMessage());
+              }
+            });
         failureCount.set(0);
-        logger.atInfo().log(MSG("Channel #%d opened."), ch.getChannelNumber());
+        logger.atInfo().log(MSG("Channel #%d opened."), channelId);
+        return ch;
       } catch (IOException | AlreadyClosedException ex) {
         logger.atSevere().withCause(ex).log(MSG("Failed to open channel."));
         failureCount.incrementAndGet();
@@ -130,12 +90,12 @@
         disconnect();
       }
     }
-    return ch;
+    return null;
   }
 
   @Override
   public boolean connect() {
-    if (connection != null && connection.isOpen()) {
+    if (isOpen()) {
       logger.atInfo().log(MSG("Already connected."));
       return true;
     }
@@ -156,7 +116,14 @@
           factory.setPassword(amqp.password);
         }
         connection = factory.newConnection();
-        connection.addShutdownListener(connectionListener);
+        connection.addShutdownListener(
+            cause -> {
+              if (cause.isInitiatedByApplication()) {
+                logger.atInfo().log(MSG("Connection closed by application."));
+              } else {
+                logger.atWarning().log(MSG("Connection closed. Cause: %s"), cause.getMessage());
+              }
+            });
         logger.atInfo().log(MSG("Connection established."));
         return true;
       }