Register Kafka consumers with external groupId when plugin starts

Link consumers with an external groupId (which were
instantiated by a previous plugin) during the binding phase.
All existing subscribers, including the ones with a custom consumer
group-id, are detached from the previous broker and associated
with a Kafka subscriber

Bug: Issue 299327285
Change-Id: I73a01389866efa6256458939303c278a0ec60115
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
index 4dc394b..9ed5165 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
@@ -15,7 +15,9 @@
 package com.googlesource.gerrit.plugins.kafka;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -27,13 +29,19 @@
 
   private final KafkaPublisher publisher;
   private final Set<TopicSubscriber> consumers;
-  private final BrokerApi brokerApi;
+  private final Set<TopicSubscriberWithGroupId> consumersWithGroupId;
+  private final ExtendedBrokerApi brokerApi;
 
   @Inject
-  public Manager(KafkaPublisher publisher, Set<TopicSubscriber> consumers, BrokerApi brokerApi) {
+  public Manager(
+      KafkaPublisher publisher,
+      Set<TopicSubscriber> consumers,
+      Set<TopicSubscriberWithGroupId> consumersWithGroupId,
+      BrokerApi brokerApi) {
     this.publisher = publisher;
     this.consumers = consumers;
-    this.brokerApi = brokerApi;
+    this.brokerApi = (ExtendedBrokerApi) brokerApi;
+    this.consumersWithGroupId = consumersWithGroupId;
   }
 
   @Override
@@ -42,6 +50,15 @@
     consumers.forEach(
         topicSubscriber ->
             brokerApi.receiveAsync(topicSubscriber.topic(), topicSubscriber.consumer()));
+
+    consumersWithGroupId.forEach(
+        topicSubscriberWithGroupId -> {
+          TopicSubscriber topicSubscriber = topicSubscriberWithGroupId.topicSubscriber();
+          brokerApi.receiveAsync(
+              topicSubscriber.topic(),
+              topicSubscriberWithGroupId.groupId(),
+              topicSubscriber.consumer());
+        });
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
index 7828085..d3ee3f1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
@@ -15,7 +15,9 @@
 package com.googlesource.gerrit.plugins.kafka.api;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
 import com.google.common.collect.Sets;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.lifecycle.LifecycleModule;
@@ -41,6 +43,7 @@
 @Singleton
 public class KafkaApiModule extends LifecycleModule {
   private Set<TopicSubscriber> activeConsumers = Sets.newHashSet();
+  private Set<TopicSubscriberWithGroupId> activeConsumersWithGroupId = Sets.newHashSet();
   private WorkQueue workQueue;
   private KafkaSubscriberProperties configuration;
 
@@ -53,7 +56,11 @@
   @Inject(optional = true)
   public void setPreviousBrokerApi(DynamicItem<BrokerApi> previousBrokerApi) {
     if (previousBrokerApi != null && previousBrokerApi.get() != null) {
-      this.activeConsumers = previousBrokerApi.get().topicSubscribers();
+      BrokerApi api = previousBrokerApi.get();
+      if (api instanceof ExtendedBrokerApi) {
+        this.activeConsumersWithGroupId = ((ExtendedBrokerApi) api).topicSubscribersWithGroupId();
+      }
+      this.activeConsumers = api.topicSubscribers();
     }
   }
 
@@ -85,6 +92,8 @@
     bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
     bind(new TypeLiteral<Deserializer<Event>>() {}).to(KafkaEventDeserializer.class);
     bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
+    bind(new TypeLiteral<Set<TopicSubscriberWithGroupId>>() {})
+        .toInstance(activeConsumersWithGroupId);
 
     DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
   }