Merge branch 'stable-2.15'

* stable-2.15:
  Catch AlreadyClosedException and retry connection
  Rename dependencies to align with core
  Name the publisher thread
  Do not block Gerrit's event bus when queue is full
  Clean up the Publisher interface and implementations
  Upgrade amqp-client to 4.1.1
  Format Java files with google-java-format
  Serialize Project.NameKey correctly

Change-Id: Ic8cade8a935991606fc8c67d65ecfc455d1b9f97
diff --git a/BUILD b/BUILD
index 3f40714..f42177c 100644
--- a/BUILD
+++ b/BUILD
@@ -3,18 +3,18 @@
 gerrit_plugin(
     name = "rabbitmq",
     srcs = glob(["src/main/java/**/*.java"]),
-    resources = glob(["src/main/resources/**/*"]),
     manifest_entries = [
         "Gerrit-PluginName: rabbitmq",
         "Gerrit-Module: com.googlesource.gerrit.plugins.rabbitmq.Module",
         "Implementation-Title: Gerrit rabbitmq plugin",
         "Implementation-URL: https://gerrit-review.googlesource.com/#/admin/projects/plugins/rabbitmq",
     ],
+    resources = glob(["src/main/resources/**/*"]),
     deps = [
         "@amqp_client//jar",
-        "@commons_codec//jar:neverlink",
-        "@commons_io//jar",
-        "@commons_lang//jar:neverlink",
+        "@commons-codec//jar:neverlink",
+        "@commons-io//jar",
+        "@commons-lang//jar:neverlink",
         "@gson//jar:neverlink",
     ],
 )
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 70ed9e6..0af44ef 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -3,6 +3,6 @@
 def external_plugin_deps():
     maven_jar(
         name = "amqp_client",
-        artifact = "com.rabbitmq:amqp-client:3.5.2",
-        sha1 = "8d10edd29e08f78349bd1da9d18f81c9f8b90567",
+        artifact = "com.rabbitmq:amqp-client:4.1.1",
+        sha1 = "256f6c92c55a8d3cfae8d32e1a15713baedab184",
     )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java
index fcc4e26..1b7c975 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java
@@ -15,6 +15,8 @@
 package com.googlesource.gerrit.plugins.rabbitmq.message;
 
 import com.google.common.base.Supplier;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.events.ProjectNameKeySerializer;
 import com.google.gerrit.server.events.SupplierSerializer;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -24,6 +26,9 @@
 
   @Override
   public Gson get() {
-    return new GsonBuilder().registerTypeAdapter(Supplier.class, new SupplierSerializer()).create();
+    return new GsonBuilder()
+        .registerTypeAdapter(Supplier.class, new SupplierSerializer())
+        .registerTypeAdapter(Project.NameKey.class, new ProjectNameKeySerializer())
+        .create();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index 1651608..37adac7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -30,7 +30,6 @@
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,9 +44,9 @@
   private final Properties properties;
   private final Gson gson;
   private final Timer monitorTimer = new Timer();
-  private boolean available = true;
-  private EventListener eventListener;
   private final LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<>(MAX_EVENTS);
+  private final Object sessionMon = new Object();
+  private EventListener eventListener;
   private CancelableRunnable publisher;
   private Thread publisherThread;
 
@@ -61,15 +60,22 @@
     this.gson = gson;
     this.eventListener =
         new EventListener() {
+          private int lostEventCount = 0;
+
           @Override
           public void onEvent(Event event) {
-            try {
-              if (!publisherThread.isAlive()) {
-                publisherThread.start();
+            if (!publisherThread.isAlive()) {
+              publisherThread.start();
+            }
+            if (queue.offer(event)) {
+              if (lostEventCount > 0) {
+                LOGGER.warn("Event queue is no longer full, {} events were lost", lostEventCount);
+                lostEventCount = 0;
               }
-              queue.put(event);
-            } catch (InterruptedException e) {
-              LOGGER.warn("Failed to queue event", e);
+            } else {
+              if (lostEventCount++ % 10 == 0) {
+                LOGGER.error("Event queue is full, lost {} event(s)", lostEventCount);
+              }
             }
           }
         };
@@ -82,20 +88,17 @@
           public void run() {
             while (!canceled) {
               try {
-                if (isEnable() && session.isOpen()) {
-                  Event event = queue.poll(200, TimeUnit.MILLISECONDS);
-                  if (event != null) {
-                    if (isEnable() && session.isOpen()) {
-                      publishEvent(event);
-                    } else {
-                      queue.put(event);
-                    }
+                Event event = queue.take();
+                while (!isConnected() && !canceled) {
+                  synchronized (sessionMon) {
+                    sessionMon.wait(1000);
                   }
-                } else {
-                  Thread.sleep(1000);
+                }
+                if (!publishEvent(event) && !queue.offer(event)) {
+                  LOGGER.error("Event lost: {}", gson.toJson(event));
                 }
               } catch (InterruptedException e) {
-                LOGGER.warn("Interupted while taking event", e);
+                LOGGER.warn("Interupted while waiting for event or connection.", e);
               }
             }
           }
@@ -118,22 +121,22 @@
   @Override
   public void start() {
     publisherThread = new Thread(publisher);
+    publisherThread.setName("rabbitmq-publisher");
     publisherThread.start();
-    if (!session.isOpen()) {
-      session.connect();
+    if (!isConnected()) {
+      connect();
       monitorTimer.schedule(
           new TimerTask() {
             @Override
             public void run() {
-              if (!session.isOpen()) {
+              if (!isConnected()) {
                 LOGGER.info("#start: try to reconnect");
-                session.connect();
+                connect();
               }
             }
           },
           MONITOR_FIRSTTIME_DELAY,
           properties.getSection(Monitor.class).interval);
-      available = true;
     }
   }
 
@@ -149,27 +152,6 @@
       }
     }
     session.disconnect();
-    available = false;
-  }
-
-  @Override
-  public void enable() {
-    available = true;
-  }
-
-  @Override
-  public void disable() {
-    available = false;
-  }
-
-  @Override
-  public boolean isEnable() {
-    return available;
-  }
-
-  @Override
-  public Session getSession() {
-    return session;
   }
 
   @Override
@@ -187,7 +169,19 @@
     return this.eventListener;
   }
 
-  private void publishEvent(Event event) {
-    session.publish(gson.toJson(event));
+  private boolean isConnected() {
+    return session != null && session.isOpen();
+  }
+
+  private boolean publishEvent(Event event) {
+    return session.publish(gson.toJson(event));
+  }
+
+  private void connect() {
+    if (!isConnected() && session.connect()) {
+      synchronized (sessionMon) {
+        sessionMon.notifyAll();
+      }
+    }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
index 81998ec..8ef1b3e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
@@ -2,21 +2,12 @@
 
 import com.google.gerrit.server.events.EventListener;
 import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
-import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
 
 public interface Publisher {
   void start();
 
   void stop();
 
-  void enable();
-
-  void disable();
-
-  boolean isEnable();
-
-  Session getSession();
-
   Properties getProperties();
 
   String getName();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
index 180aeda..7d0669f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
@@ -16,9 +16,9 @@
 public interface Session {
   boolean isOpen();
 
-  void connect();
+  boolean connect();
 
   void disconnect();
 
-  void publish(String message);
+  boolean publish(String message);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
index d0a872b..3ae3104 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
@@ -21,6 +21,7 @@
 import com.googlesource.gerrit.plugins.rabbitmq.config.section.Message;
 import com.googlesource.gerrit.plugins.rabbitmq.config.section.Monitor;
 import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
+import com.rabbitmq.client.AlreadyClosedException;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
@@ -120,7 +121,7 @@
         ch.addShutdownListener(channelListener);
         failureCount.set(0);
         LOGGER.info(MSG("Channel #{} opened."), ch.getChannelNumber());
-      } catch (IOException ex) {
+      } catch (IOException | AlreadyClosedException ex) {
         LOGGER.error(MSG("Failed to open channel."), ex);
         failureCount.incrementAndGet();
       }
@@ -133,10 +134,10 @@
   }
 
   @Override
-  public void connect() {
+  public boolean connect() {
     if (connection != null && connection.isOpen()) {
       LOGGER.info(MSG("Already connected."));
-      return;
+      return true;
     }
     AMQP amqp = properties.getSection(AMQP.class);
     LOGGER.info(MSG("Connect to {}..."), amqp.uri);
@@ -157,14 +158,16 @@
         connection = factory.newConnection();
         connection.addShutdownListener(connectionListener);
         LOGGER.info(MSG("Connection established."));
+        return true;
       }
     } catch (URISyntaxException ex) {
       LOGGER.error(MSG("URI syntax error: {}"), amqp.uri);
-    } catch (IOException ex) {
+    } catch (IOException | TimeoutException ex) {
       LOGGER.error(MSG("Connection cannot be opened."), ex);
     } catch (KeyManagementException | NoSuchAlgorithmException ex) {
       LOGGER.error(MSG("Security error when opening connection."), ex);
     }
+    return false;
   }
 
   @Override
@@ -194,7 +197,7 @@
   }
 
   @Override
-  public void publish(String messageBody) {
+  public boolean publish(String messageBody) {
     if (channel == null || !channel.isOpen()) {
       channel = getChannel();
     }
@@ -208,9 +211,13 @@
             message.routingKey,
             properties.getAMQProperties().getBasicProperties(),
             messageBody.getBytes(CharEncoding.UTF_8));
+        return true;
       } catch (IOException ex) {
         LOGGER.error(MSG("Error when sending meessage."), ex);
+        return false;
       }
     }
+    LOGGER.error(MSG("Cannot open channel."));
+    return false;
   }
 }