Add consumers from previous broker API implementations

The event-broker lib provides a default brokerapi implementation to
facilitate startup before the real implementation is available. We
should adopt all consumers that have registered to this or any other
previous brokerapi when events-rabbitmq is loaded.
This way we can safely load events-rabbitmq after plugins that sets
up consumers without loosing events.

Added some logging to help understanding load orders.

Draws heavy inspiration from events-kafka's implementation.

Change-Id: Iaffdbc662bee17de1714cbe19048c2d740805984
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/BrokerApiManager.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/BrokerApiManager.java
index 83a61f9..8f982fc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/BrokerApiManager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/BrokerApiManager.java
@@ -14,29 +14,51 @@
 
 package com.googlesource.gerrit.plugins.rabbitmq;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.rabbitmq.message.BrokerApiPublisher;
 import com.googlesource.gerrit.plugins.rabbitmq.message.BrokerApiSubscribers;
+import com.google.common.flogger.FluentLogger;
+import java.util.Set;
 
 @Singleton
 public class BrokerApiManager implements LifecycleListener {
 
+  private final FluentLogger logger = FluentLogger.forEnclosingClass();
   private final BrokerApiPublisher publisher;
   private final BrokerApiSubscribers subscribers;
+  private final Set<TopicSubscriber> consumers;
+  private final BrokerApi brokerApi;
 
   @Inject
-  public BrokerApiManager(BrokerApiPublisher publisher, BrokerApiSubscribers subscribers) {
+  public BrokerApiManager(
+      BrokerApiPublisher publisher,
+      BrokerApiSubscribers subscribers,
+      Set<TopicSubscriber> consumers,
+      BrokerApi brokerApi) {
+    logger.atFine().log("BrokerApiManager Initialized");
     this.publisher = publisher;
     this.subscribers = subscribers;
+    this.consumers = consumers;
+    this.brokerApi = brokerApi;
   }
 
   @Override
-  public void start() {}
+  public void start() {
+    logger.atInfo().log("BrokerApiManager started and loading existing subscribers");
+    consumers.forEach(
+        topicSubscriber ->
+            brokerApi.receiveAsync(topicSubscriber.topic(), topicSubscriber.consumer()));
+    logger.atInfo().log(
+        "RabbitMQ broker started with %d topic subscribers", brokerApi.topicSubscribers().size());
+  }
 
   @Override
   public void stop() {
+    logger.atInfo().log("BrokerApiManager Stopping");
     subscribers.stop();
     publisher.stop();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
index b9125ee..56acd18 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/Module.java
@@ -14,16 +14,21 @@
 
 package com.googlesource.gerrit.plugins.rabbitmq;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.collect.Sets;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gson.Gson;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.google.inject.TypeLiteral;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.multibindings.Multibinder;
 import com.googlesource.gerrit.plugins.rabbitmq.config.PluginProperties;
@@ -52,6 +57,7 @@
 import com.googlesource.gerrit.plugins.rabbitmq.worker.UserEventWorker;
 import java.io.File;
 import java.io.IOException;
+import java.util.Set;
 import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.util.FS;
@@ -61,6 +67,7 @@
 
   private final RabbitMqBrokerApiModule rabbitMqBrokerApiModule;
   private final boolean brokerApiEnabled;
+  private Set<TopicSubscriber> activeConsumers = Sets.newHashSet();
 
   @Inject
   public Module(
@@ -72,6 +79,14 @@
         getBaseConfig(pluginName, pluginData).getBoolean("General", "enableBrokerApi", false);
   }
 
+  @Inject(optional = true)
+  public void setPreviousBrokerApi(DynamicItem<BrokerApi> previousBrokerApi) {
+    if (previousBrokerApi != null && previousBrokerApi.get() != null) {
+      BrokerApi api = previousBrokerApi.get();
+      this.activeConsumers = api.topicSubscribers();
+    }
+  }
+
   @Override
   protected void configure() {
 
@@ -111,6 +126,7 @@
     bind(Properties.class)
         .annotatedWith(BaseProperties.class)
         .toProvider(BasePropertiesProvider.class);
+    bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
 
     if (brokerApiEnabled) {
       install(rabbitMqBrokerApiModule);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApi.java
index 879b9b6..ed72e1c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApi.java
@@ -18,6 +18,7 @@
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.flogger.FluentLogger;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
@@ -30,12 +31,14 @@
 import org.apache.commons.lang3.NotImplementedException;
 
 public class RabbitMqBrokerApi implements BrokerApi {
+  private final FluentLogger logger = FluentLogger.forEnclosingClass();
   private final BrokerApiPublisher publisher;
   private final BrokerApiSubscribers subscribers;
   private final Set<TopicSubscriber> topicSubscribers;
 
   @Inject
   public RabbitMqBrokerApi(BrokerApiPublisher publisher, BrokerApiSubscribers subscribers) {
+    logger.atFine().log("Initializing RabbitMQBrokerApi");
     this.publisher = publisher;
     this.subscribers = subscribers;
     this.topicSubscribers = Collections.synchronizedSet(new HashSet<>());
@@ -62,6 +65,7 @@
 
   @Override
   public void disconnect() {
+    logger.atInfo().log("Disconnecting from broker and cancelling all consumers");
     for (TopicSubscriber topicSubscriber : topicSubscribers) {
       subscribers.removeSubscriber(topicSubscriber);
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApiModule.java
index f022b6c..0f0b86c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApiModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/RabbitMqBrokerApiModule.java
@@ -15,29 +15,37 @@
 package com.googlesource.gerrit.plugins.rabbitmq;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.collect.Sets;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.Inject;
+import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
 import com.googlesource.gerrit.plugins.rabbitmq.message.BrokerApiProperties;
 import com.googlesource.gerrit.plugins.rabbitmq.message.BrokerApiPropertiesProvider;
+import java.util.Set;
 
 @Singleton
 public class RabbitMqBrokerApiModule extends LifecycleModule {
+  private final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private Set<TopicSubscriber> activeConsumers = Sets.newHashSet();
 
   @Inject
-  public RabbitMqBrokerApiModule() {}
+  public RabbitMqBrokerApiModule() {
+    logger.atFine().log("RabbitMqBrokerApiModule loaded");
+  }
 
   @Override
   protected void configure() {
-    DynamicSet.bind(binder(), LifecycleListener.class).to(BrokerApiManager.class);
     bind(Properties.class)
         .annotatedWith(BrokerApiProperties.class)
         .toProvider(BrokerApiPropertiesProvider.class);
-
-    DynamicItem.bind(binder(), BrokerApi.class).to(RabbitMqBrokerApi.class);
+    DynamicItem.bind(binder(), BrokerApi.class).to(RabbitMqBrokerApi.class).in(Scopes.SINGLETON);
+    DynamicSet.bind(binder(), LifecycleListener.class).to(BrokerApiManager.class);
   }
 }