Merge branch 'stable-2.14' into 'stable-2.15'
* google/stable-2.14:
Name the publisher thread
Do not block Gerrit's event bus when queue is full
Clean up the Publisher interface and implementations
Format Java files with google-java-format
Serialize Project.NameKey correctly
Change-Id: Ie9f94aaf1e3e5c831c305d0cfce85ce61ed00a4e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java
index fcc4e26..1b7c975 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/GsonProvider.java
@@ -15,6 +15,8 @@
package com.googlesource.gerrit.plugins.rabbitmq.message;
import com.google.common.base.Supplier;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.events.ProjectNameKeySerializer;
import com.google.gerrit.server.events.SupplierSerializer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -24,6 +26,9 @@
@Override
public Gson get() {
- return new GsonBuilder().registerTypeAdapter(Supplier.class, new SupplierSerializer()).create();
+ return new GsonBuilder()
+ .registerTypeAdapter(Supplier.class, new SupplierSerializer())
+ .registerTypeAdapter(Project.NameKey.class, new ProjectNameKeySerializer())
+ .create();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index 607f4d2..e93f6fc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -30,7 +30,6 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,9 +44,9 @@
private final Properties properties;
private final Gson gson;
private final Timer monitorTimer = new Timer();
- private boolean available = true;
- private EventListener eventListener;
private final LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<>(MAX_EVENTS);
+ private final Object sessionMon = new Object();
+ private EventListener eventListener;
private CancelableRunnable publisher;
private Thread publisherThread;
@@ -61,15 +60,22 @@
this.gson = gson;
this.eventListener =
new EventListener() {
+ private int lostEventCount = 0;
+
@Override
public void onEvent(Event event) {
- try {
- if (!publisherThread.isAlive()) {
- publisherThread.start();
+ if (!publisherThread.isAlive()) {
+ publisherThread.start();
+ }
+ if (queue.offer(event)) {
+ if (lostEventCount > 0) {
+ LOGGER.warn("Event queue is no longer full, {} events were lost", lostEventCount);
+ lostEventCount = 0;
}
- queue.put(event);
- } catch (InterruptedException e) {
- LOGGER.warn("Failed to queue event", e);
+ } else {
+ if (lostEventCount++ % 10 == 0) {
+ LOGGER.error("Event queue is full, lost {} event(s)", lostEventCount);
+ }
}
}
};
@@ -82,20 +88,17 @@
public void run() {
while (!canceled) {
try {
- if (isEnable() && session.isOpen()) {
- Event event = queue.poll(200, TimeUnit.MILLISECONDS);
- if (event != null) {
- if (isEnable() && session.isOpen()) {
- publishEvent(event);
- } else {
- queue.put(event);
- }
+ Event event = queue.take();
+ while (!isConnected() && !canceled) {
+ synchronized (sessionMon) {
+ sessionMon.wait(1000);
}
- } else {
- Thread.sleep(1000);
+ }
+ if (!publishEvent(event) && !queue.offer(event)) {
+ LOGGER.error("Event lost: {}", gson.toJson(event));
}
} catch (InterruptedException e) {
- LOGGER.warn("Interupted while taking event", e);
+ LOGGER.warn("Interupted while waiting for event or connection.", e);
}
}
}
@@ -118,22 +121,22 @@
@Override
public void start() {
publisherThread = new Thread(publisher);
+ publisherThread.setName("rabbitmq-publisher");
publisherThread.start();
- if (!session.isOpen()) {
- session.connect();
+ if (!isConnected()) {
+ connect();
monitorTimer.schedule(
new TimerTask() {
@Override
public void run() {
- if (!session.isOpen()) {
+ if (!isConnected()) {
LOGGER.info("#start: try to reconnect");
- session.connect();
+ connect();
}
}
},
MONITOR_FIRSTTIME_DELAY,
properties.getSection(Monitor.class).interval);
- available = true;
}
}
@@ -149,27 +152,6 @@
}
}
session.disconnect();
- available = false;
- }
-
- @Override
- public void enable() {
- available = true;
- }
-
- @Override
- public void disable() {
- available = false;
- }
-
- @Override
- public boolean isEnable() {
- return available;
- }
-
- @Override
- public Session getSession() {
- return session;
}
@Override
@@ -187,7 +169,19 @@
return this.eventListener;
}
- private void publishEvent(Event event) {
- session.publish(gson.toJson(event));
+ private boolean isConnected() {
+ return session != null && session.isOpen();
+ }
+
+ private boolean publishEvent(Event event) {
+ return session.publish(gson.toJson(event));
+ }
+
+ private void connect() {
+ if (!isConnected() && session.connect()) {
+ synchronized (sessionMon) {
+ sessionMon.notifyAll();
+ }
+ }
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
index b74b94c..f782b00 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
@@ -2,21 +2,12 @@
import com.google.gerrit.common.EventListener;
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
-import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
public interface Publisher {
void start();
void stop();
- void enable();
-
- void disable();
-
- boolean isEnable();
-
- Session getSession();
-
Properties getProperties();
String getName();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
index 180aeda..7d0669f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/Session.java
@@ -16,9 +16,9 @@
public interface Session {
boolean isOpen();
- void connect();
+ boolean connect();
void disconnect();
- void publish(String message);
+ boolean publish(String message);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
index 38b668f..6284b86 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
@@ -133,10 +133,10 @@
}
@Override
- public void connect() {
+ public boolean connect() {
if (connection != null && connection.isOpen()) {
LOGGER.info(MSG("Already connected."));
- return;
+ return true;
}
AMQP amqp = properties.getSection(AMQP.class);
LOGGER.info(MSG("Connect to {}..."), amqp.uri);
@@ -157,6 +157,7 @@
connection = factory.newConnection();
connection.addShutdownListener(connectionListener);
LOGGER.info(MSG("Connection established."));
+ return true;
}
} catch (URISyntaxException ex) {
LOGGER.error(MSG("URI syntax error: {}"), amqp.uri);
@@ -165,6 +166,7 @@
} catch (KeyManagementException | NoSuchAlgorithmException ex) {
LOGGER.error(MSG("Security error when opening connection."), ex);
}
+ return false;
}
@Override
@@ -194,7 +196,7 @@
}
@Override
- public void publish(String messageBody) {
+ public boolean publish(String messageBody) {
if (channel == null || !channel.isOpen()) {
channel = getChannel();
}
@@ -208,9 +210,13 @@
message.routingKey,
properties.getAMQProperties().getBasicProperties(),
messageBody.getBytes(CharEncoding.UTF_8));
+ return true;
} catch (IOException ex) {
LOGGER.error(MSG("Error when sending meessage."), ex);
+ return false;
}
}
+ LOGGER.error(MSG("Cannot open channel."));
+ return false;
}
}