Merge of ExtendedBrokerApi into BrokerApi
Remove all references to the legacy ExtendedBrokerApi and
use the new updated BrokerApi, including the implementation
of the disconnect by topic/groupId which allows to isolate
the disconnection of a plugin's subscribers without impacting
all other subscribers.
Bug:Issue 327226782
Change-Id: Id3ca8efb9e5432b53b954bead0ff5dea82f3d041
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
index 9ed5165..7123d7f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
@@ -15,7 +15,6 @@
package com.googlesource.gerrit.plugins.kafka;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
import com.google.gerrit.extensions.events.LifecycleListener;
@@ -30,7 +29,7 @@
private final KafkaPublisher publisher;
private final Set<TopicSubscriber> consumers;
private final Set<TopicSubscriberWithGroupId> consumersWithGroupId;
- private final ExtendedBrokerApi brokerApi;
+ private final BrokerApi brokerApi;
@Inject
public Manager(
@@ -40,7 +39,7 @@
BrokerApi brokerApi) {
this.publisher = publisher;
this.consumers = consumers;
- this.brokerApi = (ExtendedBrokerApi) brokerApi;
+ this.brokerApi = brokerApi;
this.consumersWithGroupId = consumersWithGroupId;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
index d3ee3f1..e566204 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
@@ -15,7 +15,6 @@
package com.googlesource.gerrit.plugins.kafka.api;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
import com.google.common.collect.Sets;
@@ -57,9 +56,7 @@
public void setPreviousBrokerApi(DynamicItem<BrokerApi> previousBrokerApi) {
if (previousBrokerApi != null && previousBrokerApi.get() != null) {
BrokerApi api = previousBrokerApi.get();
- if (api instanceof ExtendedBrokerApi) {
- this.activeConsumersWithGroupId = ((ExtendedBrokerApi) api).topicSubscribersWithGroupId();
- }
+ this.activeConsumersWithGroupId = api.topicSubscribersWithGroupId();
this.activeConsumers = api.topicSubscribers();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
index d1d6961..06f1ac6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -14,22 +14,24 @@
package com.googlesource.gerrit.plugins.kafka.api;
-import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.common.Nullable;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-public class KafkaBrokerApi implements ExtendedBrokerApi {
+public class KafkaBrokerApi implements BrokerApi {
private final KafkaPublisher publisher;
private final KafkaEventSubscriber.Factory kafkaEventSubscriberFactory;
@@ -40,7 +42,7 @@
KafkaPublisher publisher, KafkaEventSubscriber.Factory kafkaEventSubscriberFactory) {
this.publisher = publisher;
this.kafkaEventSubscriberFactory = kafkaEventSubscriberFactory;
- subscribers = new ArrayList<>();
+ subscribers = Collections.synchronizedList(new ArrayList<>());
}
@Override
@@ -67,6 +69,18 @@
}
@Override
+ public void disconnect(String topic, @Nullable String groupId) {
+ Set<KafkaEventSubscriber> subscribersToDisconnect =
+ subscribers.stream()
+ .filter(s -> topic.equals(s.getTopic()))
+ .filter(
+ s -> groupId == null || s.getExternalGroupId().stream().anyMatch(groupId::equals))
+ .collect(Collectors.toSet());
+ subscribersToDisconnect.forEach(KafkaEventSubscriber::shutdown);
+ subscribers.removeAll(subscribersToDisconnect);
+ }
+
+ @Override
public Set<TopicSubscriber> topicSubscribers() {
return subscribers.stream()
.filter(s -> !s.getExternalGroupId().isPresent())
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 028d589..d9e82e3 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -19,7 +19,6 @@
import static org.junit.Assert.fail;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
@@ -143,7 +142,7 @@
eventMessage.instanceId = "test-instance-id-1";
List<Event> receivedEventsWithGroupId1 = new ArrayList<>();
- ExtendedBrokerApi kafkaBrokerApi = ((ExtendedBrokerApi) kafkaBrokerApi());
+ BrokerApi kafkaBrokerApi = kafkaBrokerApi();
kafkaBrokerApi.send(topic, eventMessage);
kafkaBrokerApi.receiveAsync(topic, consumerGroup1, receivedEventsWithGroupId1::add);