Register Kafka consumers with external groupId when plugin starts
Link consumers with an external groupId (which were
instantiated by a previous plugin) during the binding phase.
All existing subscribers, including the ones with a custom consumer
group-id, are detached from the previous broker and associated
with a Kafka subscriber
Bug: Issue 299327285
Change-Id: I73a01389866efa6256458939303c278a0ec60115
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
index 4dc394b..9ed5165 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
@@ -15,7 +15,9 @@
package com.googlesource.gerrit.plugins.kafka;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -27,13 +29,19 @@
private final KafkaPublisher publisher;
private final Set<TopicSubscriber> consumers;
- private final BrokerApi brokerApi;
+ private final Set<TopicSubscriberWithGroupId> consumersWithGroupId;
+ private final ExtendedBrokerApi brokerApi;
@Inject
- public Manager(KafkaPublisher publisher, Set<TopicSubscriber> consumers, BrokerApi brokerApi) {
+ public Manager(
+ KafkaPublisher publisher,
+ Set<TopicSubscriber> consumers,
+ Set<TopicSubscriberWithGroupId> consumersWithGroupId,
+ BrokerApi brokerApi) {
this.publisher = publisher;
this.consumers = consumers;
- this.brokerApi = brokerApi;
+ this.brokerApi = (ExtendedBrokerApi) brokerApi;
+ this.consumersWithGroupId = consumersWithGroupId;
}
@Override
@@ -42,6 +50,15 @@
consumers.forEach(
topicSubscriber ->
brokerApi.receiveAsync(topicSubscriber.topic(), topicSubscriber.consumer()));
+
+ consumersWithGroupId.forEach(
+ topicSubscriberWithGroupId -> {
+ TopicSubscriber topicSubscriber = topicSubscriberWithGroupId.topicSubscriber();
+ brokerApi.receiveAsync(
+ topicSubscriber.topic(),
+ topicSubscriberWithGroupId.groupId(),
+ topicSubscriber.consumer());
+ });
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
index 7828085..d3ee3f1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
@@ -15,7 +15,9 @@
package com.googlesource.gerrit.plugins.kafka.api;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
import com.google.common.collect.Sets;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.lifecycle.LifecycleModule;
@@ -41,6 +43,7 @@
@Singleton
public class KafkaApiModule extends LifecycleModule {
private Set<TopicSubscriber> activeConsumers = Sets.newHashSet();
+ private Set<TopicSubscriberWithGroupId> activeConsumersWithGroupId = Sets.newHashSet();
private WorkQueue workQueue;
private KafkaSubscriberProperties configuration;
@@ -53,7 +56,11 @@
@Inject(optional = true)
public void setPreviousBrokerApi(DynamicItem<BrokerApi> previousBrokerApi) {
if (previousBrokerApi != null && previousBrokerApi.get() != null) {
- this.activeConsumers = previousBrokerApi.get().topicSubscribers();
+ BrokerApi api = previousBrokerApi.get();
+ if (api instanceof ExtendedBrokerApi) {
+ this.activeConsumersWithGroupId = ((ExtendedBrokerApi) api).topicSubscribersWithGroupId();
+ }
+ this.activeConsumers = api.topicSubscribers();
}
}
@@ -85,6 +92,8 @@
bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
bind(new TypeLiteral<Deserializer<Event>>() {}).to(KafkaEventDeserializer.class);
bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
+ bind(new TypeLiteral<Set<TopicSubscriberWithGroupId>>() {})
+ .toInstance(activeConsumersWithGroupId);
DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
}