Make KafkaBrokerApi class implement ExtendedBrokerApi interface

Previously, the KafkaBrokerApi Java class permitted the retrieval of
Kafka messages by utilising a groupId configured within Gerrit.
Nevertheless, this capability has now been improved to provide the
flexibility of directly specifying the groupId. Simultaneously, it
also enables the listing of subscribers along with their respective
consumer group identifiers.

Bug: Issue 299327285
Change-Id: Icc9267681725e2c1af0270402b3ac1ec73e3a14a
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
index ca6c45d..7828085 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
@@ -25,6 +25,7 @@
 import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.google.inject.TypeLiteral;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.ClientType;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
@@ -61,10 +62,16 @@
     ClientType clientType = configuration.getClientType();
     switch (clientType) {
       case NATIVE:
-        bind(KafkaEventSubscriber.class).to(KafkaEventNativeSubscriber.class);
+        install(
+            new FactoryModuleBuilder()
+                .implement(KafkaEventSubscriber.class, KafkaEventNativeSubscriber.class)
+                .build(KafkaEventSubscriber.Factory.class));
         break;
       case REST:
-        bind(KafkaEventSubscriber.class).to(KafkaEventRestSubscriber.class);
+        install(
+            new FactoryModuleBuilder()
+                .implement(KafkaEventSubscriber.class, KafkaEventRestSubscriber.class)
+                .build(KafkaEventSubscriber.Factory.class));
         break;
       default:
         throw new IllegalArgumentException("Unsupported Kafka client type " + clientType);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
index 3ec21e0..d1d6961 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -14,31 +14,32 @@
 
 package com.googlesource.gerrit.plugins.kafka.api;
 
-import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
-import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
 import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-public class KafkaBrokerApi implements BrokerApi {
+public class KafkaBrokerApi implements ExtendedBrokerApi {
 
   private final KafkaPublisher publisher;
-  private final Provider<KafkaEventSubscriber> subscriberProvider;
+  private final KafkaEventSubscriber.Factory kafkaEventSubscriberFactory;
   private List<KafkaEventSubscriber> subscribers;
 
   @Inject
   public KafkaBrokerApi(
-      KafkaPublisher publisher, Provider<KafkaEventSubscriber> subscriberProvider) {
+      KafkaPublisher publisher, KafkaEventSubscriber.Factory kafkaEventSubscriberFactory) {
     this.publisher = publisher;
-    this.subscriberProvider = subscriberProvider;
+    this.kafkaEventSubscriberFactory = kafkaEventSubscriberFactory;
     subscribers = new ArrayList<>();
   }
 
@@ -49,11 +50,12 @@
 
   @Override
   public void receiveAsync(String topic, Consumer<Event> eventConsumer) {
-    KafkaEventSubscriber subscriber = subscriberProvider.get();
-    synchronized (subscribers) {
-      subscribers.add(subscriber);
-    }
-    subscriber.subscribe(topic, eventConsumer);
+    receiveAsync(topic, eventConsumer, Optional.empty());
+  }
+
+  @Override
+  public void receiveAsync(String topic, String groupId, Consumer<Event> eventConsumer) {
+    receiveAsync(topic, eventConsumer, Optional.ofNullable(groupId));
   }
 
   @Override
@@ -67,14 +69,36 @@
   @Override
   public Set<TopicSubscriber> topicSubscribers() {
     return subscribers.stream()
+        .filter(s -> !s.getExternalGroupId().isPresent())
         .map(s -> TopicSubscriber.topicSubscriber(s.getTopic(), s.getMessageProcessor()))
         .collect(Collectors.toSet());
   }
 
   @Override
+  public Set<TopicSubscriberWithGroupId> topicSubscribersWithGroupId() {
+    return subscribers.stream()
+        .filter(s -> s.getExternalGroupId().isPresent())
+        .map(
+            s ->
+                TopicSubscriberWithGroupId.topicSubscriberWithGroupId(
+                    s.getExternalGroupId().get(),
+                    TopicSubscriber.topicSubscriber(s.getTopic(), s.getMessageProcessor())))
+        .collect(Collectors.toSet());
+  }
+
+  @Override
   public void replayAllEvents(String topic) {
     subscribers.stream()
         .filter(subscriber -> topic.equals(subscriber.getTopic()))
         .forEach(subscriber -> subscriber.resetOffset());
   }
+
+  private void receiveAsync(
+      String topic, Consumer<Event> eventConsumer, Optional<String> externalGroupId) {
+    KafkaEventSubscriber subscriber = kafkaEventSubscriberFactory.create(externalGroupId);
+    synchronized (subscribers) {
+      subscribers.add(subscriber);
+    }
+    subscriber.subscribe(topic, eventConsumer);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaConsumerFactory.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaConsumerFactory.java
index 9ec109d..43bdc73 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaConsumerFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaConsumerFactory.java
@@ -32,6 +32,11 @@
   }
 
   public Consumer<byte[], byte[]> create(Deserializer<byte[]> keyDeserializer) {
+    return create(config, keyDeserializer);
+  }
+
+  public Consumer<byte[], byte[]> create(
+      KafkaSubscriberProperties config, Deserializer<byte[]> keyDeserializer) {
     return new KafkaConsumer<>(config, keyDeserializer, new ByteArrayDeserializer());
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventNativeSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventNativeSubscriber.java
index 528cd00..f8809a6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventNativeSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventNativeSubscriber.java
@@ -20,10 +20,12 @@
 import com.google.gerrit.server.util.ManualRequestContext;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
 import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -51,6 +53,7 @@
   private AtomicBoolean resetOffset = new AtomicBoolean(false);
 
   private volatile ReceiverJob receiver;
+  private final Optional<String> externalGroupId;
 
   @Inject
   public KafkaEventNativeSubscriber(
@@ -60,15 +63,18 @@
       Deserializer<Event> valueDeserializer,
       OneOffRequestContext oneOffCtx,
       @ConsumerExecutor ExecutorService executor,
-      KafkaEventSubscriberMetrics subscriberMetrics) {
+      KafkaEventSubscriberMetrics subscriberMetrics,
+      @Assisted Optional<String> externalGroupId) {
 
-    this.configuration = configuration;
     this.oneOffCtx = oneOffCtx;
     this.executor = executor;
     this.subscriberMetrics = subscriberMetrics;
     this.consumerFactory = consumerFactory;
     this.keyDeserializer = keyDeserializer;
     this.valueDeserializer = valueDeserializer;
+    this.externalGroupId = externalGroupId;
+    this.configuration = (KafkaSubscriberProperties) configuration.clone();
+    externalGroupId.ifPresent(gid -> this.configuration.setProperty("group.id", gid));
   }
 
   /* (non-Javadoc)
@@ -79,16 +85,16 @@
     this.topic = topic;
     this.messageProcessor = messageProcessor;
     logger.atInfo().log(
-        "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, topic);
-    runReceiver();
+        "Kafka consumer subscribing to topic alias [%s] for event topic [%s] with groupId [%s]",
+        topic, topic, configuration.getGroupId());
+    runReceiver(consumerFactory.create(configuration, keyDeserializer));
   }
 
-  private void runReceiver() {
+  private void runReceiver(Consumer<byte[], byte[]> consumer) {
     final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
     try {
       Thread.currentThread()
           .setContextClassLoader(KafkaEventNativeSubscriber.class.getClassLoader());
-      Consumer<byte[], byte[]> consumer = consumerFactory.create(keyDeserializer);
       consumer.subscribe(Collections.singleton(topic));
       receiver = new ReceiverJob(consumer);
       executor.execute(receiver);
@@ -130,6 +136,11 @@
     resetOffset.set(true);
   }
 
+  @Override
+  public Optional<String> getExternalGroupId() {
+    return externalGroupId;
+  }
+
   private class ReceiverJob implements Runnable {
     private final Consumer<byte[], byte[]> consumer;
 
@@ -201,7 +212,7 @@
           DELAY_RECONNECT_AFTER_FAILURE_MSEC / 2
               + new Random().nextInt(DELAY_RECONNECT_AFTER_FAILURE_MSEC);
       Thread.sleep(reconnectDelay);
-      runReceiver();
+      runReceiver(consumerFactory.create(configuration, keyDeserializer));
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
index 7429991..469e43e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
@@ -26,6 +26,7 @@
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
 import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
 import com.googlesource.gerrit.plugins.kafka.rest.KafkaRestClient;
@@ -38,6 +39,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -83,6 +85,7 @@
   private final AtomicBoolean resetOffset;
   private final long restClientTimeoutMs;
   private volatile ReceiverJob receiver;
+  private final Optional<String> externalGroupId;
 
   @Inject
   public KafkaEventRestSubscriber(
@@ -91,13 +94,16 @@
       OneOffRequestContext oneOffCtx,
       @ConsumerExecutor ExecutorService executor,
       KafkaEventSubscriberMetrics subscriberMetrics,
-      KafkaRestClient.Factory restClientFactory) {
+      KafkaRestClient.Factory restClientFactory,
+      @Assisted Optional<String> externalGroupId) {
 
-    this.configuration = configuration;
     this.oneOffCtx = oneOffCtx;
     this.executor = executor;
     this.subscriberMetrics = subscriberMetrics;
     this.valueDeserializer = valueDeserializer;
+    this.externalGroupId = externalGroupId;
+    this.configuration = (KafkaSubscriberProperties) configuration.clone();
+    externalGroupId.ifPresent(gid -> this.configuration.setProperty("group.id", gid));
 
     gson = new Gson();
     restClient = restClientFactory.create(configuration);
@@ -113,7 +119,8 @@
     this.topic = topic;
     this.messageProcessor = messageProcessor;
     logger.atInfo().log(
-        "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, topic);
+        "Kafka consumer subscribing to topic alias [%s] for event topic [%s] with groupId [%s]",
+        topic, topic, configuration.getGroupId());
     try {
       runReceiver();
     } catch (InterruptedException | ExecutionException | TimeoutException e) {
@@ -163,6 +170,11 @@
     resetOffset.set(true);
   }
 
+  @Override
+  public Optional<String> getExternalGroupId() {
+    return externalGroupId;
+  }
+
   private class ReceiverJob implements Runnable {
     private final ListenableFuture<URI> kafkaRestConsumerUri;
     private final ListenableFuture<?> kafkaSubscriber;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
index 34c64b2..1530be7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -15,10 +15,15 @@
 package com.googlesource.gerrit.plugins.kafka.subscribe;
 
 import com.google.gerrit.server.events.Event;
+import java.util.Optional;
 
 /** Generic interface to a Kafka topic subscriber. */
 public interface KafkaEventSubscriber {
 
+  public interface Factory {
+    KafkaEventSubscriber create(Optional<String> externalGroupId);
+  }
+
   /**
    * Subscribe to a topic and receive messages asynchronously.
    *
@@ -46,4 +51,12 @@
 
   /** Reset the offset for reading incoming Kafka messages of the topic. */
   void resetOffset();
+
+  /**
+   * Returns the external consumer's group id when it is defined.
+   *
+   * @return Optional instance with external consumer's group id otherwise an empty Optional
+   *     instance
+   */
+  Optional<String> getExternalGroupId();
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 70a664b..1a3507d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -19,6 +19,7 @@
 import static org.junit.Assert.fail;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterables;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
@@ -52,8 +53,9 @@
 @TestPlugin(name = "events-kafka", sysModule = "com.googlesource.gerrit.plugins.kafka.Module")
 public class EventConsumerIT extends LightweightPluginDaemonTest {
   static final Duration KAFKA_POLL_TIMEOUT = Duration.ofSeconds(10);
-
+  private static final Duration WAIT_FOR_POLL_TIMEOUT = Duration.ofSeconds(30);
   private KafkaContainer kafka;
+  private final Gson gson = new EventGsonProvider().get();
 
   @Override
   public void setUpTestPlugin() throws Exception {
@@ -119,7 +121,6 @@
     assertThat(events).hasSize(6);
     String commentAddedEventJson = Iterables.getLast(events);
 
-    Gson gson = new EventGsonProvider().get();
     Event event = gson.fromJson(commentAddedEventJson, Event.class);
     assertThat(event).isInstanceOf(CommentAddedEvent.class);
 
@@ -137,13 +138,36 @@
       name = "plugin.events-kafka.valueDeserializer",
       value = "org.apache.kafka.common.serialization.StringDeserializer")
   @GerritConfig(name = "plugin.events-kafka.pollingIntervalMs", value = "500")
+  public void consumeEventsWithExternalGroupId() throws Exception {
+    String topic = "a_topic";
+    String consumerGroup1 = "consumer-group-1";
+    Event eventMessage = new ProjectCreatedEvent();
+    eventMessage.instanceId = "test-instance-id-1";
+    List<Event> receivedEventsWithGroupId1 = new ArrayList<>();
+
+    ExtendedBrokerApi kafkaBrokerApi = ((ExtendedBrokerApi) kafkaBrokerApi());
+    kafkaBrokerApi.send(topic, eventMessage);
+    kafkaBrokerApi.receiveAsync(topic, consumerGroup1, receivedEventsWithGroupId1::add);
+
+    waitUntil(() -> receivedEventsWithGroupId1.size() == 1, WAIT_FOR_POLL_TIMEOUT);
+    assertThat(gson.toJson(receivedEventsWithGroupId1.get(0))).isEqualTo(gson.toJson(eventMessage));
+  }
+
+  @Test
+  @UseLocalDisk
+  @GerritConfig(name = "plugin.events-kafka.groupId", value = "test-consumer-group")
+  @GerritConfig(
+      name = "plugin.events-kafka.keyDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(
+      name = "plugin.events-kafka.valueDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(name = "plugin.events-kafka.pollingIntervalMs", value = "500")
   public void shouldReplayAllEvents() throws InterruptedException {
     String topic = "a_topic";
     Event eventMessage = new ProjectCreatedEvent();
     eventMessage.instanceId = "test-instance-id";
 
-    Duration WAIT_FOR_POLL_TIMEOUT = Duration.ofSeconds(30);
-
     List<Event> receivedEvents = new ArrayList<>();
 
     BrokerApi kafkaBrokerApi = kafkaBrokerApi();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 1991467..3f83e92 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.kafka.api;
 
+import static com.gerritforge.gerrit.eventbroker.TopicSubscriber.topicSubscriber;
+import static com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId.topicSubscriberWithGroupId;
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.Mockito.mock;
 
@@ -52,7 +54,9 @@
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
 import org.mockito.Answers;
 import org.mockito.junit.MockitoJUnitRunner;
@@ -84,6 +88,8 @@
   private Gson gson;
   protected ClientType clientType;
 
+  @Rule public TestName name = new TestName();
+
   public static class TestWorkQueue extends WorkQueue {
 
     @Inject
@@ -232,7 +238,7 @@
         new KafkaProperties(
             false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
-    String testTopic = "test_topic_sync";
+    String testTopic = testTopic();
     TestConsumer testConsumer = new TestConsumer(1);
     Event testEventMessage = new ProjectCreatedEvent();
     testEventMessage.instanceId = TEST_INSTANCE_ID;
@@ -247,13 +253,17 @@
     assertNoMoreExpectedMessages(testConsumer);
   }
 
+  private String testTopic() {
+    return "test_topic_" + name.getMethodName();
+  }
+
   @Test
   public void shouldSendAsyncAndReceiveToTopic() {
     connectToKafka(
         new KafkaProperties(
             true, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
-    String testTopic = "test_topic_async";
+    String testTopic = testTopic();
     TestConsumer testConsumer = new TestConsumer(1);
     Event testEventMessage = new ProjectCreatedEvent();
     testEventMessage.instanceId = TEST_INSTANCE_ID;
@@ -274,7 +284,7 @@
         new KafkaProperties(
             false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
-    String testTopic = "test_topic_reset";
+    String testTopic = testTopic();
     Event testEventMessage = new ProjectCreatedEvent();
 
     TestConsumer testConsumer = new TestConsumer(2);
@@ -291,6 +301,128 @@
     assertThat(gson.toJson(testConsumer.messages.get(1))).isEqualTo(gson.toJson(testEventMessage));
   }
 
+  @Test
+  public void shouldConsumerWithGroupIdConsumeMessage() {
+    connectToKafka(
+        new KafkaProperties(
+            true, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = testTopic();
+    TestConsumer testConsumer = new TestConsumer(1);
+    Event testEventMessage = new ProjectCreatedEvent();
+    testEventMessage.instanceId = TEST_INSTANCE_ID;
+
+    kafkaBrokerApi.send(testTopic, testEventMessage);
+    kafkaBrokerApi.receiveAsync(testTopic, "group-id-1", testConsumer);
+
+    assertThat(testConsumer.await()).isTrue();
+    assertThat(testConsumer.messages).hasSize(1);
+    assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
+
+    assertNoMoreExpectedMessages(testConsumer);
+  }
+
+  @Test
+  public void shouldRegisterConsumerWithoutExternalGroupId() {
+    connectToKafka(
+        new KafkaProperties(
+            false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = testTopic();
+    TestConsumer testConsumer = new TestConsumer(1);
+
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId()).isEmpty();
+    kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
+    assertThat(kafkaBrokerApi.topicSubscribers())
+        .containsExactly(topicSubscriber(testTopic, testConsumer));
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId()).isEmpty();
+  }
+
+  @Test
+  public void shouldRegisterConsumerWithExternalGroupId() {
+    connectToKafka(
+        new KafkaProperties(
+            false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = testTopic();
+    String groupId = "group_id_1";
+    TestConsumer testConsumer = new TestConsumer(1);
+
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId()).isEmpty();
+    kafkaBrokerApi.receiveAsync(testTopic, groupId, testConsumer);
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId())
+        .containsExactly(
+            topicSubscriberWithGroupId(groupId, topicSubscriber(testTopic, testConsumer)));
+  }
+
+  @Test
+  public void shouldRegisterDifferentConsumersWithTheSameExternalGroupId() {
+    connectToKafka(
+        new KafkaProperties(
+            false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = testTopic();
+    String groupId = "group_id_1";
+    TestConsumer testConsumer1 = new TestConsumer(1);
+    TestConsumer testConsumer2 = new TestConsumer(1);
+
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId()).isEmpty();
+    kafkaBrokerApi.receiveAsync(testTopic, groupId, testConsumer1);
+    kafkaBrokerApi.receiveAsync(testTopic, groupId, testConsumer2);
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId())
+        .containsExactly(
+            topicSubscriberWithGroupId(groupId, topicSubscriber(testTopic, testConsumer1)),
+            topicSubscriberWithGroupId(groupId, topicSubscriber(testTopic, testConsumer2)));
+  }
+
+  @Test
+  public void shouldRegisterConsumerWithConfiguredGroupIdAndConsumerWithExternalGroupId() {
+    connectToKafka(
+        new KafkaProperties(
+            false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = testTopic();
+    String groupId = "group_id_1";
+    TestConsumer testConsumer1 = new TestConsumer(1);
+    TestConsumer testConsumer2 = new TestConsumer(1);
+
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId()).isEmpty();
+    kafkaBrokerApi.receiveAsync(testTopic, testConsumer1);
+    kafkaBrokerApi.receiveAsync(testTopic, groupId, testConsumer2);
+    assertThat(kafkaBrokerApi.topicSubscribers())
+        .containsExactly(topicSubscriber(testTopic, testConsumer1));
+
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId())
+        .containsExactly(
+            topicSubscriberWithGroupId(groupId, topicSubscriber(testTopic, testConsumer2)));
+  }
+
+  @Test
+  public void shouldNotRegisterTheSameConsumerWithExternalGroupIdTwicePerTopic() {
+    connectToKafka(
+        new KafkaProperties(
+            false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = testTopic();
+    String groupId = "group_id_1";
+    TestConsumer testConsumer = new TestConsumer(1);
+
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId()).isEmpty();
+    kafkaBrokerApi.receiveAsync(testTopic, groupId, testConsumer);
+    kafkaBrokerApi.receiveAsync(testTopic, groupId, testConsumer);
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId())
+        .containsExactly(
+            topicSubscriberWithGroupId(groupId, topicSubscriber(testTopic, testConsumer)));
+  }
+
   protected String getKafkaRestApiUriString() {
     return null;
   }