Use queue to hold Events during connection glitches
Change-Id: I262ae8cf5800fea1a8c9ecdd610e385cc5ccf7ff
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
index 6c5f450..81ad042 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/MessagePublisher.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.rabbitmq.message;
+import com.google.gerrit.common.EventListener;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.events.Event;
import com.google.gson.Gson;
@@ -21,15 +22,20 @@
import com.google.inject.assistedinject.Assisted;
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
+import com.googlesource.gerrit.plugins.rabbitmq.config.section.AMQP;
+import com.googlesource.gerrit.plugins.rabbitmq.config.section.Gerrit;
import com.googlesource.gerrit.plugins.rabbitmq.config.section.Monitor;
import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
import com.googlesource.gerrit.plugins.rabbitmq.session.SessionFactoryProvider;
+import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
public class MessagePublisher implements Publisher, LifecycleListener {
@@ -37,24 +43,84 @@
private final static int MONITOR_FIRSTTIME_DELAY = 15000;
+ private static final int MAX_EVENTS = 16384;
private final Session session;
private final Properties properties;
private final Gson gson;
private final Timer monitorTimer = new Timer();
private boolean available = true;
+ private EventListener eventListener;
+ private final LinkedBlockingQueue<Event> queue =
+ new LinkedBlockingQueue<>(MAX_EVENTS);
+ private CancelableRunnable publisher;
+ private Thread publisherThread;
@Inject
public MessagePublisher(
- @Assisted Properties properties,
+ @Assisted final Properties properties,
SessionFactoryProvider sessionFactoryProvider,
Gson gson) {
this.session = sessionFactoryProvider.get().create(properties);
this.properties = properties;
this.gson = gson;
+ this.eventListener = new EventListener() {
+ @Override
+ public void onEvent(Event event) {
+ try {
+ if (!publisherThread.isAlive()) {
+ publisherThread.start();
+ }
+ queue.put(event);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Failed to queue event", e);
+ }
+ }
+ };
+ this.publisher = new CancelableRunnable() {
+
+ boolean canceled = false;
+
+ @Override
+ public void run() {
+ while (!canceled) {
+ try {
+ if (isEnable() && session.isOpen()) {
+ Event event = queue.poll(200, TimeUnit.MILLISECONDS);
+ if (event != null) {
+ if (isEnable() && session.isOpen()) {
+ publishEvent(event);
+ } else {
+ queue.put(event);
+ }
+ }
+ } else {
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interupted while taking event", e);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ this.canceled = true;
+ }
+
+ @Override
+ public String toString() {
+ return "Rabbitmq publisher: "
+ + properties.getSection(Gerrit.class).listenAs
+ + "-"
+ + properties.getSection(AMQP.class).uri;
+ }
+ };
}
@Override
public void start() {
+ publisherThread = new Thread(publisher);
+ publisherThread.start();
if (!session.isOpen()) {
session.connect();
monitorTimer.schedule(new TimerTask() {
@@ -73,18 +139,19 @@
@Override
public void stop() {
monitorTimer.cancel();
+ publisher.cancel();
+ if (publisherThread != null) {
+ try {
+ publisherThread.join();
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ }
session.disconnect();
available = false;
}
@Override
- public void onEvent(Event event) {
- if (available && session.isOpen()) {
- session.publish(gson.toJson(event));
- }
- }
-
- @Override
public void enable() {
available = true;
}
@@ -113,4 +180,13 @@
public String getName() {
return properties.getName();
}
+
+ @Override
+ public EventListener getEventListener() {
+ return this.eventListener;
+ }
+
+ private void publishEvent(Event event) {
+ session.publish(gson.toJson(event));
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
index 8cc38cb..e6eaae3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/Publisher.java
@@ -5,7 +5,7 @@
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
import com.googlesource.gerrit.plugins.rabbitmq.session.Session;
-public interface Publisher extends EventListener {
+public interface Publisher {
public void start();
public void stop();
public void enable();
@@ -14,4 +14,5 @@
public Session getSession();
public Properties getProperties();
public String getName();
+ public EventListener getEventListener();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java
index 45e0e97..5241ce3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java
@@ -56,7 +56,7 @@
@Override
public void onEvent(Event event) {
for (Publisher publisher : publishers) {
- publisher.onEvent(event);
+ publisher.getEventListener().onEvent(event);
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java
index 2beee41..76ba4e8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/UserEventWorker.java
@@ -111,7 +111,7 @@
}
IdentifiedUser user = userFactory.create(userAccount.getId());
- source.addEventListener(publisher, user);
+ source.addEventListener(publisher.getEventListener(), user);
LOGGER.info("Listen events as : {}", userName);
} catch (OrmException e) {
LOGGER.error("Could not query database for listenAs", e);
@@ -129,7 +129,7 @@
@Override
public void removePublisher(final Publisher publisher) {
- source.removeEventListener(publisher);
+ source.removeEventListener(publisher.getEventListener());
}
@Override