Merge branch 'stable-2.14' into 'stable-2.15'

* google/stable-2.14:
  Name the publisher thread
  Do not block Gerrit's event bus when queue is full
  Clean up the Publisher interface and implementations
  Format Java files with google-java-format
  Serialize Project.NameKey correctly

Change-Id: Ie9f94aaf1e3e5c831c305d0cfce85ce61ed00a4e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java
index fcc4e26..1b7c975 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java
@@ -15,6 +15,8 @@
 package com.googlesource.gerrit.plugins.rabbitmq.message;
 
 import com.google.common.base.Supplier;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.events.ProjectNameKeySerializer;
 import com.google.gerrit.server.events.SupplierSerializer;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -24,6 +26,9 @@
 
   @Override
   public Gson get() {
-    return new GsonBuilder().registerTypeAdapter(Supplier.class, new SupplierSerializer()).create();
+    return new GsonBuilder()
+        .registerTypeAdapter(Supplier.class, new SupplierSerializer())
+        .registerTypeAdapter(Project.NameKey.class, new ProjectNameKeySerializer())
+        .create();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index 607f4d2..e93f6fc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -30,7 +30,6 @@
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,9 +44,9 @@
   private final Properties properties;
   private final Gson gson;
   private final Timer monitorTimer = new Timer();
-  private boolean available = true;
-  private EventListener eventListener;
   private final LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<>(MAX_EVENTS);
+  private final Object sessionMon = new Object();
+  private EventListener eventListener;
   private CancelableRunnable publisher;
   private Thread publisherThread;
 
@@ -61,15 +60,22 @@
     this.gson = gson;
     this.eventListener =
         new EventListener() {
+          private int lostEventCount = 0;
+
           @Override
           public void onEvent(Event event) {
-            try {
-              if (!publisherThread.isAlive()) {
-                publisherThread.start();
+            if (!publisherThread.isAlive()) {
+              publisherThread.start();
+            }
+            if (queue.offer(event)) {
+              if (lostEventCount > 0) {
+                LOGGER.warn("Event queue is no longer full, {} events were lost", lostEventCount);
+                lostEventCount = 0;
               }
-              queue.put(event);
-            } catch (InterruptedException e) {
-              LOGGER.warn("Failed to queue event", e);
+            } else {
+              if (lostEventCount++ % 10 == 0) {
+                LOGGER.error("Event queue is full, lost {} event(s)", lostEventCount);
+              }
             }
           }
         };
@@ -82,20 +88,17 @@
           public void run() {
             while (!canceled) {
               try {
-                if (isEnable() && session.isOpen()) {
-                  Event event = queue.poll(200, TimeUnit.MILLISECONDS);
-                  if (event != null) {
-                    if (isEnable() && session.isOpen()) {
-                      publishEvent(event);
-                    } else {
-                      queue.put(event);
-                    }
+                Event event = queue.take();
+                while (!isConnected() && !canceled) {
+                  synchronized (sessionMon) {
+                    sessionMon.wait(1000);
                   }
-                } else {
-                  Thread.sleep(1000);
+                }
+                if (!publishEvent(event) && !queue.offer(event)) {
+                  LOGGER.error("Event lost: {}", gson.toJson(event));
                 }
               } catch (InterruptedException e) {
-                LOGGER.warn("Interupted while taking event", e);
+                LOGGER.warn("Interupted while waiting for event or connection.", e);
               }
             }
           }
@@ -118,22 +121,22 @@
   @Override
   public void start() {
     publisherThread = new Thread(publisher);
+    publisherThread.setName("rabbitmq-publisher");
     publisherThread.start();
-    if (!session.isOpen()) {
-      session.connect();
+    if (!isConnected()) {
+      connect();
       monitorTimer.schedule(
           new TimerTask() {
             @Override
             public void run() {
-              if (!session.isOpen()) {
+              if (!isConnected()) {
                 LOGGER.info("#start: try to reconnect");
-                session.connect();
+                connect();
               }
             }
           },
           MONITOR_FIRSTTIME_DELAY,
           properties.getSection(Monitor.class).interval);
-      available = true;
     }
   }
 
@@ -149,27 +152,6 @@
       }
     }
     session.disconnect();
-    available = false;
-  }
-
-  @Override
-  public void enable() {
-    available = true;
-  }
-
-  @Override
-  public void disable() {
-    available = false;
-  }
-
-  @Override
-  public boolean isEnable() {
-    return available;
-  }
-
-  @Override
-  public Session getSession() {
-    return session;
   }
 
   @Override
@@ -187,7 +169,19 @@
     return this.eventListener;
   }
 
-  private void publishEvent(Event event) {
-    session.publish(gson.toJson(event));
+  private boolean isConnected() {
+    return session != null && session.isOpen();
+  }
+
+  private boolean publishEvent(Event event) {
+    return session.publish(gson.toJson(event));
+  }
+
+  private void connect() {
+    if (!isConnected() && session.connect()) {
+      synchronized (sessionMon) {
+        sessionMon.notifyAll();
+      }
+    }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
index b74b94c..f782b00 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
@@ -2,21 +2,12 @@
 
 import com.google.gerrit.common.EventListener;
 import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
-import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
 
 public interface Publisher {
   void start();
 
   void stop();
 
-  void enable();
-
-  void disable();
-
-  boolean isEnable();
-
-  Session getSession();
-
   Properties getProperties();
 
   String getName();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
index 180aeda..7d0669f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
@@ -16,9 +16,9 @@
 public interface Session {
   boolean isOpen();
 
-  void connect();
+  boolean connect();
 
   void disconnect();
 
-  void publish(String message);
+  boolean publish(String message);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
index 38b668f..6284b86 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
@@ -133,10 +133,10 @@
   }
 
   @Override
-  public void connect() {
+  public boolean connect() {
     if (connection != null && connection.isOpen()) {
       LOGGER.info(MSG("Already connected."));
-      return;
+      return true;
     }
     AMQP amqp = properties.getSection(AMQP.class);
     LOGGER.info(MSG("Connect to {}..."), amqp.uri);
@@ -157,6 +157,7 @@
         connection = factory.newConnection();
         connection.addShutdownListener(connectionListener);
         LOGGER.info(MSG("Connection established."));
+        return true;
       }
     } catch (URISyntaxException ex) {
       LOGGER.error(MSG("URI syntax error: {}"), amqp.uri);
@@ -165,6 +166,7 @@
     } catch (KeyManagementException | NoSuchAlgorithmException ex) {
       LOGGER.error(MSG("Security error when opening connection."), ex);
     }
+    return false;
   }
 
   @Override
@@ -194,7 +196,7 @@
   }
 
   @Override
-  public void publish(String messageBody) {
+  public boolean publish(String messageBody) {
     if (channel == null || !channel.isOpen()) {
       channel = getChannel();
     }
@@ -208,9 +210,13 @@
             message.routingKey,
             properties.getAMQProperties().getBasicProperties(),
             messageBody.getBytes(CharEncoding.UTF_8));
+        return true;
       } catch (IOException ex) {
         LOGGER.error(MSG("Error when sending meessage."), ex);
+        return false;
       }
     }
+    LOGGER.error(MSG("Cannot open channel."));
+    return false;
   }
 }