Adapt to the latest BrokerApi interface

The latest BrokerApiInterface requires the implementation of three
new interfaces :

- receiveAsync(String streamName, String groupId, Consumer<Event> consumer)
- disconnect(String topic, String groupId)
- Set<TopicSubscriberWithGroupId> topicSubscribersWithGroupId()

Implement the above interfaces for kinesis, so that kinesis streams
subscriptions can be managed by providing an explicit GroupId.

Bug: Issue 327473326
Change-Id: I5f5849c1ffd0a4a3a760794aae4589e99cff0cee
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/CheckpointResetter.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/CheckpointResetter.java
index 86fad53..df03b61 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/CheckpointResetter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/CheckpointResetter.java
@@ -55,8 +55,8 @@
     this.dynamoDbAsyncClient = dynamoDbAsyncClient;
   }
 
-  public void setAllShardsToBeginning(String streamName) {
-    String leaseTable = cosumerLeaseName(configuration.getApplicationName(), streamName);
+  public void setAllShardsToBeginning(String streamName, String groupId) {
+    String leaseTable = cosumerLeaseName(groupId, streamName);
 
     try {
       for (String shard : getAllShards(leaseTable)) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
index 89476cf..6291152 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
@@ -228,8 +228,8 @@
         : System.getProperty(name);
   }
 
-  public static String cosumerLeaseName(String applicationName, String streamName) {
-    return String.format("%s-%s", applicationName, streamName);
+  public static String cosumerLeaseName(String groupId, String streamName) {
+    return String.format("%s-%s", groupId, streamName);
   }
 
   public Long getShutdownTimeoutMs() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
index 0316c06..675d801 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
@@ -16,10 +16,13 @@
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.common.Nullable;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
@@ -27,15 +30,19 @@
 
 class KinesisBrokerApi implements BrokerApi {
   private final KinesisConsumer.Factory consumerFactory;
+  private final Configuration configuration;
 
   private final KinesisPublisher kinesisPublisher;
   private final Set<KinesisConsumer> consumers;
 
   @Inject
   public KinesisBrokerApi(
-      KinesisPublisher kinesisPublisher, KinesisConsumer.Factory consumerFactory) {
+      KinesisPublisher kinesisPublisher,
+      KinesisConsumer.Factory consumerFactory,
+      Configuration configuration) {
     this.kinesisPublisher = kinesisPublisher;
     this.consumerFactory = consumerFactory;
+    this.configuration = configuration;
     this.consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
   }
 
@@ -46,9 +53,12 @@
 
   @Override
   public void receiveAsync(String streamName, Consumer<Event> eventConsumer) {
-    KinesisConsumer consumer = consumerFactory.create(streamName, eventConsumer);
-    consumers.add(consumer);
-    consumer.subscribe(streamName, eventConsumer);
+    receive(streamName, eventConsumer, null);
+  }
+
+  @Override
+  public void receiveAsync(String streamName, String groupId, Consumer<Event> consumer) {
+    receive(streamName, consumer, groupId);
   }
 
   @Override
@@ -65,9 +75,40 @@
   }
 
   @Override
+  public void disconnect(String topic, String groupId) {
+    Set<KinesisConsumer> consumersOfTopic =
+        consumers
+            .parallelStream()
+            .filter(c -> topic.equals(c.getStreamName()) && groupId.equals(c.getGroupId()))
+            .collect(Collectors.toSet());
+
+    consumersOfTopic.forEach(KinesisConsumer::shutdown);
+    consumers.removeAll(consumersOfTopic);
+  }
+
+  @Override
   public void replayAllEvents(String topic) {
     consumers.stream()
         .filter(subscriber -> topic.equals(subscriber.getStreamName()))
         .forEach(KinesisConsumer::resetOffset);
   }
+
+  @Override
+  public Set<TopicSubscriberWithGroupId> topicSubscribersWithGroupId() {
+    return consumers.stream()
+        .map(
+            s ->
+                TopicSubscriberWithGroupId.topicSubscriberWithGroupId(
+                    s.getGroupId(),
+                    TopicSubscriber.topicSubscriber(s.getStreamName(), s.getMessageProcessor())))
+        .collect(Collectors.toSet());
+  }
+
+  private void receive(
+      String streamName, Consumer<Event> eventConsumer, @Nullable String maybeGroupId) {
+    String groupId = Optional.ofNullable(maybeGroupId).orElse(configuration.getApplicationName());
+    KinesisConsumer consumer = consumerFactory.create(streamName, groupId, eventConsumer);
+    consumers.add(consumer);
+    consumer.subscribe(streamName, eventConsumer);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
index b563e26..d17595c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
@@ -17,6 +17,7 @@
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -26,7 +27,10 @@
 
 class KinesisConsumer {
   interface Factory {
-    KinesisConsumer create(String topic, Consumer<Event> messageProcessor);
+    KinesisConsumer create(
+        @Assisted("topic") String topic,
+        @Assisted("groupId") String groupId,
+        Consumer<Event> messageProcessor);
   }
 
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -34,6 +38,8 @@
   private final CheckpointResetter checkpointResetter;
   private final Configuration configuration;
   private final ExecutorService executor;
+
+  private final String groupId;
   private Scheduler kinesisScheduler;
 
   private java.util.function.Consumer<Event> messageProcessor;
@@ -45,11 +51,13 @@
       SchedulerProvider.Factory schedulerFactory,
       CheckpointResetter checkpointResetter,
       Configuration configuration,
-      @ConsumerExecutor ExecutorService executor) {
+      @ConsumerExecutor ExecutorService executor,
+      @Assisted("groupId") String groupId) {
     this.schedulerFactory = schedulerFactory;
     this.checkpointResetter = checkpointResetter;
     this.configuration = configuration;
     this.executor = executor;
+    this.groupId = groupId;
   }
 
   public void subscribe(String streamName, java.util.function.Consumer<Event> messageProcessor) {
@@ -57,12 +65,14 @@
     this.messageProcessor = messageProcessor;
 
     logger.atInfo().log("Subscribe kinesis consumer to stream [%s]", streamName);
-    runReceiver(messageProcessor);
+    runReceiver(groupId, messageProcessor);
   }
 
-  private void runReceiver(java.util.function.Consumer<Event> messageProcessor) {
+  private void runReceiver(String groupId, java.util.function.Consumer<Event> messageProcessor) {
     this.kinesisScheduler =
-        schedulerFactory.create(streamName, resetOffset.getAndSet(false), messageProcessor).get();
+        schedulerFactory
+            .create(streamName, groupId, resetOffset.getAndSet(false), messageProcessor)
+            .get();
     executor.execute(kinesisScheduler);
   }
 
@@ -88,10 +98,14 @@
     return streamName;
   }
 
+  public String getGroupId() {
+    return groupId;
+  }
+
   public void resetOffset() {
     // Move all checkpoints (if any) to TRIM_HORIZON, so that the consumer
     // scheduler will start consuming from beginning.
-    checkpointResetter.setAllShardsToBeginning(streamName);
+    checkpointResetter.setAllShardsToBeginning(streamName, groupId);
 
     // Even when no checkpoints have been persisted, instruct the consumer
     // scheduler to start from TRIM_HORIZON, irrespective of 'initialPosition'
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
index 09083df..deffd67 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
@@ -33,7 +33,8 @@
 class SchedulerProvider implements Provider<Scheduler> {
   interface Factory {
     SchedulerProvider create(
-        String streamName,
+        @Assisted("streamName") String streamName,
+        @Assisted("groupId") String groupId,
         boolean fromBeginning,
         java.util.function.Consumer<Event> messageProcessor);
   }
@@ -51,7 +52,8 @@
       DynamoDbAsyncClient dynamoDbAsyncClient,
       CloudWatchAsyncClient cloudWatchAsyncClient,
       KinesisRecordProcessorFactory.Factory kinesisRecordProcessorFactory,
-      @Assisted String streamName,
+      @Assisted("streamName") String streamName,
+      @Assisted("groupId") String groupId,
       @Assisted boolean fromBeginning,
       @Assisted java.util.function.Consumer<Event> messageProcessor) {
     this.configuration = configuration;
@@ -61,7 +63,7 @@
     this.configsBuilder =
         new ConfigsBuilder(
             streamName,
-            cosumerLeaseName(configuration.getApplicationName(), streamName),
+            cosumerLeaseName(groupId, streamName),
             kinesisAsyncClient,
             dynamoDbAsyncClient,
             cloudWatchAsyncClient,
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
index 7449440..7b61d10 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
@@ -124,6 +124,45 @@
   @Test
   @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
   @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
+  public void shouldConsumeEventBySubscribingWithSpecificGroupId() throws Exception {
+    String streamName = UUID.randomUUID().toString();
+    String groupId = UUID.randomUUID().toString();
+    createStreamAndWait(streamName, STREAM_CREATION_TIMEOUT);
+
+    EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
+    kinesisBroker().receiveAsync(streamName, groupId, eventConsumerCounter);
+
+    Event event = eventMessage();
+    kinesisBroker().send(streamName, event);
+    WaitUtil.waitUntil(
+        () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
+    compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
+
+    assertThat(countSubscribers(streamName, groupId)).isEqualTo(1L);
+  }
+
+  @Test
+  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
+  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
+  public void shouldDisconnectOnlySubscriberForSpecificGroupId() throws Exception {
+    String streamName = UUID.randomUUID().toString();
+    String groupId1 = UUID.randomUUID().toString();
+    String groupId2 = UUID.randomUUID().toString();
+    createStreamAndWait(streamName, STREAM_CREATION_TIMEOUT);
+
+    EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
+    kinesisBroker().receiveAsync(streamName, groupId1, eventConsumerCounter);
+    kinesisBroker().receiveAsync(streamName, groupId2, eventConsumerCounter);
+
+    kinesisBroker().disconnect(streamName, groupId2);
+
+    assertThat(countSubscribers(streamName, groupId2)).isEqualTo(0L);
+    assertThat(countSubscribers(streamName, groupId1)).isEqualTo(1L);
+  }
+
+  @Test
+  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
+  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
   public void shouldConsumeAnEventWithoutInstanceId() throws Exception {
     String streamName = UUID.randomUUID().toString();
     createStreamAndWait(streamName, STREAM_CREATION_TIMEOUT);
@@ -277,6 +316,12 @@
     assertThat(event.instanceId).isEqualTo(expectedEvent.instanceId);
   }
 
+  private long countSubscribers(String streamName, String groupId) {
+    return kinesisBroker().topicSubscribersWithGroupId().stream()
+        .filter(t -> groupId.equals(t.groupId()) && streamName.equals(t.topicSubscriber().topic()))
+        .count();
+  }
+
   private static class EventConsumerCounter implements Consumer<Event> {
     List<Event> consumedMessages = new ArrayList<>();