Allow subscribing to the topic with custom group id

In the past, when utilizing pull-replication as a standalone solution
in conjunction with a broker (inclusive of plugins such as events-kafka,
events-aws-kinesis, events-gcloud-pubsub), the consumer's groupId was
defined either within the events-* plugin configuration or, in the
absence of configuration, as "gerrit.instance-id."

Thanks to a new feature in events-broker, since v3.4.8-4-gd9f859f, it is
possible to define a different group id by configuring
"replication.eventBrokerGroupId" in the replication.conf.

NOTE: This feature is applicable only if the the events-* plugin
implements the ExtendedBrokerApi interface, which supports the ability
to subscribe to a topic with a custom group id.

Bug: Issue 299327285
Change-Id: I33d056ebd3e80b6a4c9d8eb02a0c2c1b591e8eb5
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 5308294..72a6b74 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -152,7 +152,7 @@
     if (replicationConfig.getBoolean("replication", "consumeStreamEvents", false)) {
       install(new StreamEventModule());
     } else if (eventBrokerTopic != null) {
-      install(new EventsBrokerConsumerModule(eventBrokerTopic));
+      install(new EventsBrokerConsumerModule(eventBrokerTopic, replicationConfig));
     }
 
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
index ce1a076..a8d5cf4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
@@ -17,21 +17,28 @@
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.Scopes;
 import com.google.inject.name.Names;
+import com.google.inject.util.Providers;
+import org.eclipse.jgit.lib.Config;
 
 public class EventsBrokerConsumerModule extends LifecycleModule {
   public static final String STREAM_EVENTS_TOPIC_NAME = "stream_events_topic_name";
+  public static final String STREAM_EVENTS_GROUP_ID = "stream_events_group_id";
 
   private final String topicName;
+  private final Config config;
 
-  public EventsBrokerConsumerModule(String topicName) {
+  public EventsBrokerConsumerModule(String topicName, Config config) {
     this.topicName = topicName;
+    this.config = config;
   }
 
   @Override
   protected void configure() {
     bind(EventsBrokerMessageConsumer.class).in(Scopes.SINGLETON);
     bind(String.class).annotatedWith(Names.named(STREAM_EVENTS_TOPIC_NAME)).toInstance(topicName);
-
+    bind(String.class)
+        .annotatedWith(Names.named(STREAM_EVENTS_GROUP_ID))
+        .toProvider(Providers.of(config.getString("replication", null, "eventBrokerGroupId")));
     listener().to(EventsBrokerMessageConsumer.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
index d06681b..13b038e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
@@ -14,9 +14,13 @@
 
 package com.googlesource.gerrit.plugins.replication.pull.event;
 
+import static com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule.STREAM_EVENTS_GROUP_ID;
 import static com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule.STREAM_EVENTS_TOPIC_NAME;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.common.Nullable;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
@@ -29,22 +33,26 @@
 
 public class EventsBrokerMessageConsumer implements Consumer<Event>, LifecycleListener {
 
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private final DynamicItem<BrokerApi> eventsBroker;
   private final StreamEventListener eventListener;
   private final ShutdownState shutdownState;
   private final String eventsTopicName;
+  private final String groupId;
 
   @Inject
   public EventsBrokerMessageConsumer(
       DynamicItem<BrokerApi> eventsBroker,
       StreamEventListener eventListener,
       ShutdownState shutdownState,
-      @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName) {
+      @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName,
+      @Nullable @Named(STREAM_EVENTS_GROUP_ID) String groupId) {
 
     this.eventsBroker = eventsBroker;
     this.eventListener = eventListener;
     this.shutdownState = shutdownState;
     this.eventsTopicName = eventsTopicName;
+    this.groupId = groupId;
   }
 
   @Override
@@ -59,7 +67,21 @@
 
   @Override
   public void start() {
-    eventsBroker.get().receiveAsync(eventsTopicName, this);
+    BrokerApi brokerApi = eventsBroker.get();
+    if (groupId == null) {
+      brokerApi.receiveAsync(eventsTopicName, this);
+      return;
+    }
+
+    if (!(eventsBroker instanceof ExtendedBrokerApi)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Failed to load the pull-replication plugin: %s does not support the custom group-id '%s'.\n"
+                  + "Remove replication.eventBrokerGroupId from replication.config or install a different event-broker plugin.",
+              eventsBroker.getClass(), groupId));
+    }
+
+    ((ExtendedBrokerApi) brokerApi).receiveAsync(eventsTopicName, groupId, this);
   }
 
   @Override
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 6287773..0c3e02c 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -222,6 +222,16 @@
 
 	Default: unset
 
+replication.eventBrokerGroupId
+:	Optional group id for consumers. Used for keeping the offset of the
+	the last acked message on the topic with a specific group id; supported
+	only if the event-broker plugin supports the ExtendedBrokerApi
+	interface.
+	When not set, the messages received are acked using a global group id
+	shared by all subscribers in Gerrit.
+
+	Default: unset
+
 replication.maxConnectionsPerRoute
 :	Maximum number of HTTP connections per one HTTP route.
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
index 9640b82..8739d4b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
@@ -49,7 +49,7 @@
     shutdownState = new ShutdownState();
     objectUnderTest =
         new EventsBrokerMessageConsumer(
-            eventsBrokerDynamicItem, eventListener, shutdownState, "topicName");
+            eventsBrokerDynamicItem, eventListener, shutdownState, "topicName", null);
   }
 
   @Test