Adapt to the latest BrokerApi interface

Implement three methods added to the `BrokerApi` interface on `master`
and fix compilation errors.

The added methods allow managing the subscriptions by adding the
`groupId` parameter.

Bug: Issue 327699479
Change-Id: If3e4bd131bfd492d5e452b9989ac8d975991293e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
index 17233b0..6957afc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
@@ -16,12 +16,15 @@
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
 import com.google.common.flogger.FluentLogger;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.common.Nullable;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
@@ -29,6 +32,7 @@
 
 class PubSubBrokerApi implements BrokerApi {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private PubSubConfiguration configuration;
   private PubSubPublisher.Factory publisherFactory;
   private PubSubEventSubscriber.Factory subscriberFactory;
   private Map<String, PubSubPublisher> publishers = new ConcurrentHashMap<>();
@@ -36,7 +40,10 @@
 
   @Inject
   public PubSubBrokerApi(
-      PubSubPublisher.Factory publisherFactory, PubSubEventSubscriber.Factory subscriberFactory) {
+      PubSubConfiguration configuration,
+      PubSubPublisher.Factory publisherFactory,
+      PubSubEventSubscriber.Factory subscriberFactory) {
+    this.configuration = configuration;
     this.publisherFactory = publisherFactory;
     this.subscriberFactory = subscriberFactory;
     subscribers = Collections.newSetFromMap(new ConcurrentHashMap<>());
@@ -49,12 +56,29 @@
 
   @Override
   public void receiveAsync(String topic, Consumer<Event> eventConsumer) {
-    PubSubEventSubscriber subscriber = subscriberFactory.create(topic, eventConsumer);
+    receiveAsync(topic, null, eventConsumer);
+  }
+
+  @Override
+  public void receiveAsync(String topic, @Nullable String maybeGroupId, Consumer<Event> consumer) {
+    String groupId = Optional.ofNullable(maybeGroupId).orElse(configuration.getSubscriptionId());
+    PubSubEventSubscriber subscriber = subscriberFactory.create(topic, groupId, consumer);
     subscribers.add(subscriber);
     subscriber.subscribe();
   }
 
   @Override
+  public Set<TopicSubscriberWithGroupId> topicSubscribersWithGroupId() {
+    return subscribers.stream()
+        .map(
+            s ->
+                TopicSubscriberWithGroupId.topicSubscriberWithGroupId(
+                    s.getGroupId(),
+                    TopicSubscriber.topicSubscriber(s.getTopic(), s.getMessageProcessor())))
+        .collect(Collectors.toSet());
+  }
+
+  @Override
   public Set<TopicSubscriber> topicSubscribers() {
     return subscribers.stream()
         .map(s -> TopicSubscriber.topicSubscriber(s.getTopic(), s.getMessageProcessor()))
@@ -81,6 +105,17 @@
   }
 
   @Override
+  public void disconnect(String topic, String groupId) {
+    subscribers.stream()
+        .filter(s -> s.getGroupId().equals(groupId) && topic.equals(s.getTopic()))
+        .forEach(
+            c -> {
+              subscribers.remove(c);
+              c.shutdown();
+            });
+  }
+
+  @Override
   public void replayAllEvents(String topic) {
     subscribers.stream()
         .filter(subscriber -> topic.equals(subscriber.getTopic()))
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
index b8f4d18..3210051 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
@@ -34,7 +34,10 @@
 public class PubSubEventSubscriber {
 
   public interface Factory {
-    public PubSubEventSubscriber create(String topic, Consumer<Event> messageProcessor);
+    public PubSubEventSubscriber create(
+        @Assisted("topic") String topic,
+        @Assisted("groupId") String groupId,
+        Consumer<Event> messageProcessor);
   }
 
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -43,6 +46,7 @@
   private final PubSubSubscriberMetrics subscriberMetrics;
   private final OneOffRequestContext oneOffRequestContext;
   private final String topic;
+  private final String groupId;
   private final Consumer<Event> messageProcessor;
   private final SubscriberProvider subscriberProvider;
   private final PubSubConfiguration config;
@@ -55,12 +59,14 @@
       PubSubConfiguration config,
       PubSubSubscriberMetrics subscriberMetrics,
       OneOffRequestContext oneOffRequestContext,
-      @Assisted String topic,
+      @Assisted("topic") String topic,
+      @Assisted("groupId") String groupId,
       @Assisted Consumer<Event> messageProcessor) {
     this.eventsDeserializer = eventsDeserializer;
     this.subscriberMetrics = subscriberMetrics;
     this.oneOffRequestContext = oneOffRequestContext;
     this.topic = topic;
+    this.groupId = groupId;
     this.messageProcessor = messageProcessor;
     this.subscriberProvider = subscriberProvider;
     this.config = config;
@@ -68,7 +74,7 @@
 
   public void subscribe() {
     try {
-      subscriber = subscriberProvider.get(topic, getMessageReceiver());
+      subscriber = subscriberProvider.get(topic, groupId, getMessageReceiver());
       subscriber
           .startAsync()
           .awaitRunning(config.getSubscribtionTimeoutInSeconds(), TimeUnit.SECONDS);
@@ -83,6 +89,10 @@
     return topic;
   }
 
+  public String getGroupId() {
+    return groupId;
+  }
+
   public Consumer<Event> getMessageProcessor() {
     return messageProcessor;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/SubscriberProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/SubscriberProvider.java
index 31024d8..be4258f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/SubscriberProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/SubscriberProvider.java
@@ -53,8 +53,8 @@
     this.executor = executor;
   }
 
-  public Subscriber get(String topic, MessageReceiver receiver) throws IOException {
-    return Subscriber.newBuilder(getOrCreateSubscription(topic).getName(), receiver)
+  public Subscriber get(String topic, String groupId, MessageReceiver receiver) throws IOException {
+    return Subscriber.newBuilder(getOrCreateSubscription(topic, groupId).getName(), receiver)
         .setExecutorProvider(FixedExecutorProvider.create(executor))
         .setCredentialsProvider(credentials)
         .build();
@@ -65,10 +65,14 @@
   }
 
   protected Subscription getOrCreateSubscription(String topicId) throws IOException {
+    return getOrCreateSubscription(topicId, pubSubProperties.getSubscriptionId());
+  }
+
+  protected Subscription getOrCreateSubscription(String topicId, String groupId)
+      throws IOException {
     try (SubscriptionAdminClient subscriptionAdminClient =
         SubscriptionAdminClient.create(createSubscriptionAdminSettings())) {
-      String subscriptionName =
-          String.format("%s-%s", pubSubProperties.getSubscriptionId(), topicId);
+      String subscriptionName = String.format("%s-%s", groupId, topicId);
       ProjectSubscriptionName projectSubscriptionName =
           ProjectSubscriptionName.of(pubSubProperties.getGCloudProject(), subscriptionName);
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalSubscriberProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalSubscriberProvider.java
index a0da1e5..97e61f8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalSubscriberProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalSubscriberProvider.java
@@ -50,10 +50,10 @@
   }
 
   @Override
-  public Subscriber get(String topic, MessageReceiver receiver) throws IOException {
+  public Subscriber get(String topic, String groupId, MessageReceiver receiver) throws IOException {
     TransportChannelProvider channelProvider = createChannelProvider();
     createTopic(channelProvider, pubSubProperties.getGCloudProject(), topic);
-    return Subscriber.newBuilder(getOrCreateSubscription(topic).getName(), receiver)
+    return Subscriber.newBuilder(getOrCreateSubscription(topic, groupId).getName(), receiver)
         .setChannelProvider(channelProvider)
         .setExecutorProvider(FixedExecutorProvider.create(executor))
         .setCredentialsProvider(credentials)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
index e27e946..3654397 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
@@ -66,6 +66,7 @@
 public class PubSubBrokerApiIT extends LightweightPluginDaemonTest {
   private static final String PROJECT_ID = "test_project";
   private static final String TOPIC_ID = "test_topic";
+  private static final String GROUP_ID = "test_group";
   private static final String SUBSCRIPTION_ID = "test_subscription_id";
 
   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5);
@@ -203,6 +204,51 @@
         TEST_TIMEOUT);
   }
 
+  @Test
+  @GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
+  @GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
+  @GerritConfig(
+      name = "plugin.events-gcloud-pubsub.privateKeyLocation",
+      value = PRIVATE_KEY_LOCATION)
+  public void shouldConsumeEventWithGroupId() throws InterruptedException {
+    Event event = new ProjectCreatedEvent();
+    event.instanceId = DEFAULT_INSTANCE_ID;
+    String expectedMessageJson = gson.toJson(event);
+    TestConsumer consumer = new TestConsumer();
+
+    objectUnderTest.receiveAsync(TOPIC_ID, GROUP_ID, consumer);
+
+    objectUnderTest.send(TOPIC_ID, event);
+
+    assertThat(countSubscribers()).isEqualTo(1);
+    WaitUtil.waitUntil(
+        () ->
+            consumer.getMessage() != null
+                && expectedMessageJson.equals(gson.toJson(consumer.getMessage())),
+        TEST_TIMEOUT);
+  }
+
+  @Test
+  @GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
+  @GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
+  @GerritConfig(
+      name = "plugin.events-gcloud-pubsub.privateKeyLocation",
+      value = PRIVATE_KEY_LOCATION)
+  public void shouldDisconnectOnlySubscriberForSpecificGroupId() throws Exception {
+    String streamName = "stream_name";
+    String groupId1 = "group1";
+    String groupId2 = "group2";
+    TestConsumer consumer = new TestConsumer();
+
+    objectUnderTest.receiveAsync(streamName, groupId1, consumer);
+    objectUnderTest.receiveAsync(streamName, groupId2, consumer);
+
+    objectUnderTest.disconnect(streamName, groupId2);
+
+    assertThat(countSubscribers(streamName, groupId2)).isEqualTo(0L);
+    assertThat(countSubscribers(streamName, groupId1)).isEqualTo(1L);
+  }
+
   private void readMessageAndValidate(Consumer<PullResponse> validate) throws IOException {
     readMessageAndValidate(validate, PROJECT_ID, SUBSCRIPTION_ID);
   }
@@ -266,6 +312,16 @@
         .getName();
   }
 
+  private long countSubscribers() {
+    return countSubscribers(TOPIC_ID, GROUP_ID);
+  }
+
+  private long countSubscribers(String topicId, String groupId) {
+    return objectUnderTest.topicSubscribersWithGroupId().stream()
+        .filter(t -> groupId.equals(t.groupId()) && topicId.equals(t.topicSubscriber().topic()))
+        .count();
+  }
+
   private class TestConsumer implements Consumer<Event> {
     private Event msg;
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
index d9d9fac..f9d6a6d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
@@ -50,6 +50,7 @@
   @Captor ArgumentCaptor<Event> eventMessageCaptor;
 
   private static final String TOPIC = "foo";
+  private static final String GROUP_ID = "bar";
 
   private Gson gson = new EventGsonProvider().get();
   private EventDeserializer deserializer = new EventDeserializer(gson);
@@ -127,6 +128,7 @@
             pubSubSubscriberMetricsMock,
             oneOffRequestContext,
             TOPIC,
+            GROUP_ID,
             consumer)
         .getMessageReceiver();
   }