Add consumers from previous broker API implementations
The event-broker lib provides a default brokerapi implementation to
facilitate startup before the real implementation is available. We
should adopt all consumers that have registered to this or any other
previous brokerapi when events-rabbitmq is loaded.
This way we can safely load events-rabbitmq after plugins that sets
up consumers without loosing events.
Added some logging to help understanding load orders.
Draws heavy inspiration from events-kafka's implementation.
Change-Id: Iaffdbc662bee17de1714cbe19048c2d740805984
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/BrokerApiManager.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/BrokerApiManager.java
index 83a61f9..8f982fc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/BrokerApiManager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/BrokerApiManager.java
@@ -14,29 +14,51 @@
package com.googlesource.gerrit.plugins.rabbitmq;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.rabbitmq.message.BrokerApiPublisher;
import com.googlesource.gerrit.plugins.rabbitmq.message.BrokerApiSubscribers;
+import com.google.common.flogger.FluentLogger;
+import java.util.Set;
@Singleton
public class BrokerApiManager implements LifecycleListener {
+ private final FluentLogger logger = FluentLogger.forEnclosingClass();
private final BrokerApiPublisher publisher;
private final BrokerApiSubscribers subscribers;
+ private final Set<TopicSubscriber> consumers;
+ private final BrokerApi brokerApi;
@Inject
- public BrokerApiManager(BrokerApiPublisher publisher, BrokerApiSubscribers subscribers) {
+ public BrokerApiManager(
+ BrokerApiPublisher publisher,
+ BrokerApiSubscribers subscribers,
+ Set<TopicSubscriber> consumers,
+ BrokerApi brokerApi) {
+ logger.atFine().log("BrokerApiManager Initialized");
this.publisher = publisher;
this.subscribers = subscribers;
+ this.consumers = consumers;
+ this.brokerApi = brokerApi;
}
@Override
- public void start() {}
+ public void start() {
+ logger.atInfo().log("BrokerApiManager started and loading existing subscribers");
+ consumers.forEach(
+ topicSubscriber ->
+ brokerApi.receiveAsync(topicSubscriber.topic(), topicSubscriber.consumer()));
+ logger.atInfo().log(
+ "RabbitMQ broker started with %d topic subscribers", brokerApi.topicSubscribers().size());
+ }
@Override
public void stop() {
+ logger.atInfo().log("BrokerApiManager Stopping");
subscribers.stop();
publisher.stop();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
index b9125ee..56acd18 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
@@ -14,16 +14,21 @@
package com.googlesource.gerrit.plugins.rabbitmq;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.collect.Sets;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.annotations.PluginData;
import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.Multibinder;
import com.googlesource.gerrit.plugins.rabbitmq.config.PluginProperties;
@@ -52,6 +57,7 @@
import com.googlesource.gerrit.plugins.rabbitmq.worker.UserEventWorker;
import java.io.File;
import java.io.IOException;
+import java.util.Set;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.util.FS;
@@ -61,6 +67,7 @@
private final RabbitMqBrokerApiModule rabbitMqBrokerApiModule;
private final boolean brokerApiEnabled;
+ private Set<TopicSubscriber> activeConsumers = Sets.newHashSet();
@Inject
public Module(
@@ -72,6 +79,14 @@
getBaseConfig(pluginName, pluginData).getBoolean("General", "enableBrokerApi", false);
}
+ @Inject(optional = true)
+ public void setPreviousBrokerApi(DynamicItem<BrokerApi> previousBrokerApi) {
+ if (previousBrokerApi != null && previousBrokerApi.get() != null) {
+ BrokerApi api = previousBrokerApi.get();
+ this.activeConsumers = api.topicSubscribers();
+ }
+ }
+
@Override
protected void configure() {
@@ -111,6 +126,7 @@
bind(Properties.class)
.annotatedWith(BaseProperties.class)
.toProvider(BasePropertiesProvider.class);
+ bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
if (brokerApiEnabled) {
install(rabbitMqBrokerApiModule);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApi.java
index 879b9b6..ed72e1c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApi.java
@@ -18,6 +18,7 @@
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
import com.google.common.collect.ImmutableSet;
+import com.google.common.flogger.FluentLogger;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
@@ -30,12 +31,14 @@
import org.apache.commons.lang3.NotImplementedException;
public class RabbitMqBrokerApi implements BrokerApi {
+ private final FluentLogger logger = FluentLogger.forEnclosingClass();
private final BrokerApiPublisher publisher;
private final BrokerApiSubscribers subscribers;
private final Set<TopicSubscriber> topicSubscribers;
@Inject
public RabbitMqBrokerApi(BrokerApiPublisher publisher, BrokerApiSubscribers subscribers) {
+ logger.atFine().log("Initializing RabbitMQBrokerApi");
this.publisher = publisher;
this.subscribers = subscribers;
this.topicSubscribers = Collections.synchronizedSet(new HashSet<>());
@@ -62,6 +65,7 @@
@Override
public void disconnect() {
+ logger.atInfo().log("Disconnecting from broker and cancelling all consumers");
for (TopicSubscriber topicSubscriber : topicSubscribers) {
subscribers.removeSubscriber(topicSubscriber);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApiModule.java
index f022b6c..0f0b86c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApiModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApiModule.java
@@ -15,29 +15,37 @@
package com.googlesource.gerrit.plugins.rabbitmq;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.collect.Sets;
+import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.inject.Inject;
+import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
import com.googlesource.gerrit.plugins.rabbitmq.message.BrokerApiProperties;
import com.googlesource.gerrit.plugins.rabbitmq.message.BrokerApiPropertiesProvider;
+import java.util.Set;
@Singleton
public class RabbitMqBrokerApiModule extends LifecycleModule {
+ private final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private Set<TopicSubscriber> activeConsumers = Sets.newHashSet();
@Inject
- public RabbitMqBrokerApiModule() {}
+ public RabbitMqBrokerApiModule() {
+ logger.atFine().log("RabbitMqBrokerApiModule loaded");
+ }
@Override
protected void configure() {
- DynamicSet.bind(binder(), LifecycleListener.class).to(BrokerApiManager.class);
bind(Properties.class)
.annotatedWith(BrokerApiProperties.class)
.toProvider(BrokerApiPropertiesProvider.class);
-
- DynamicItem.bind(binder(), BrokerApi.class).to(RabbitMqBrokerApi.class);
+ DynamicItem.bind(binder(), BrokerApi.class).to(RabbitMqBrokerApi.class).in(Scopes.SINGLETON);
+ DynamicSet.bind(binder(), LifecycleListener.class).to(BrokerApiManager.class);
}
}