Merge of ExtendedBrokerApi into BrokerApi

Remove all references to the legacy ExtendedBrokerApi and
use the new updated BrokerApi, including the implementation
of the disconnect by topic/groupId which allows to isolate
the disconnection of a plugin's subscribers without impacting
all other subscribers.

Bug:Issue 327226782
Change-Id: Id3ca8efb9e5432b53b954bead0ff5dea82f3d041
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
index 9ed5165..7123d7f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
@@ -15,7 +15,6 @@
 package com.googlesource.gerrit.plugins.kafka;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
 import com.google.gerrit.extensions.events.LifecycleListener;
@@ -30,7 +29,7 @@
   private final KafkaPublisher publisher;
   private final Set<TopicSubscriber> consumers;
   private final Set<TopicSubscriberWithGroupId> consumersWithGroupId;
-  private final ExtendedBrokerApi brokerApi;
+  private final BrokerApi brokerApi;
 
   @Inject
   public Manager(
@@ -40,7 +39,7 @@
       BrokerApi brokerApi) {
     this.publisher = publisher;
     this.consumers = consumers;
-    this.brokerApi = (ExtendedBrokerApi) brokerApi;
+    this.brokerApi = brokerApi;
     this.consumersWithGroupId = consumersWithGroupId;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
index d3ee3f1..e566204 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
@@ -15,7 +15,6 @@
 package com.googlesource.gerrit.plugins.kafka.api;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
 import com.google.common.collect.Sets;
@@ -57,9 +56,7 @@
   public void setPreviousBrokerApi(DynamicItem<BrokerApi> previousBrokerApi) {
     if (previousBrokerApi != null && previousBrokerApi.get() != null) {
       BrokerApi api = previousBrokerApi.get();
-      if (api instanceof ExtendedBrokerApi) {
-        this.activeConsumersWithGroupId = ((ExtendedBrokerApi) api).topicSubscribersWithGroupId();
-      }
+      this.activeConsumersWithGroupId = api.topicSubscribersWithGroupId();
       this.activeConsumers = api.topicSubscribers();
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
index d1d6961..06f1ac6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -14,22 +14,24 @@
 
 package com.googlesource.gerrit.plugins.kafka.api;
 
-import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.common.Nullable;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
 import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-public class KafkaBrokerApi implements ExtendedBrokerApi {
+public class KafkaBrokerApi implements BrokerApi {
 
   private final KafkaPublisher publisher;
   private final KafkaEventSubscriber.Factory kafkaEventSubscriberFactory;
@@ -40,7 +42,7 @@
       KafkaPublisher publisher, KafkaEventSubscriber.Factory kafkaEventSubscriberFactory) {
     this.publisher = publisher;
     this.kafkaEventSubscriberFactory = kafkaEventSubscriberFactory;
-    subscribers = new ArrayList<>();
+    subscribers = Collections.synchronizedList(new ArrayList<>());
   }
 
   @Override
@@ -67,6 +69,18 @@
   }
 
   @Override
+  public void disconnect(String topic, @Nullable String groupId) {
+    Set<KafkaEventSubscriber> subscribersToDisconnect =
+        subscribers.stream()
+            .filter(s -> topic.equals(s.getTopic()))
+            .filter(
+                s -> groupId == null || s.getExternalGroupId().stream().anyMatch(groupId::equals))
+            .collect(Collectors.toSet());
+    subscribersToDisconnect.forEach(KafkaEventSubscriber::shutdown);
+    subscribers.removeAll(subscribersToDisconnect);
+  }
+
+  @Override
   public Set<TopicSubscriber> topicSubscribers() {
     return subscribers.stream()
         .filter(s -> !s.getExternalGroupId().isPresent())
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 028d589..d9e82e3 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -19,7 +19,6 @@
 import static org.junit.Assert.fail;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterables;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
@@ -143,7 +142,7 @@
     eventMessage.instanceId = "test-instance-id-1";
     List<Event> receivedEventsWithGroupId1 = new ArrayList<>();
 
-    ExtendedBrokerApi kafkaBrokerApi = ((ExtendedBrokerApi) kafkaBrokerApi());
+    BrokerApi kafkaBrokerApi = kafkaBrokerApi();
     kafkaBrokerApi.send(topic, eventMessage);
     kafkaBrokerApi.receiveAsync(topic, consumerGroup1, receivedEventsWithGroupId1::add);