Allow subscribing to the topic with custom group id In the past, when utilizing pull-replication as a standalone solution in conjunction with a broker (inclusive of plugins such as events-kafka, events-aws-kinesis, events-gcloud-pubsub), the consumer's groupId was defined either within the events-* plugin configuration or, in the absence of configuration, as "gerrit.instance-id." Thanks to a new feature in events-broker, since v3.4.8-4-gd9f859f, it is possible to define a different group id by configuring "replication.eventBrokerGroupId" in the replication.conf. NOTE: This feature is applicable only if the the events-* plugin implements the ExtendedBrokerApi interface, which supports the ability to subscribe to a topic with a custom group id. Bug: Issue 299327285 Change-Id: I33d056ebd3e80b6a4c9d8eb02a0c2c1b591e8eb5
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java index 5308294..72a6b74 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -152,7 +152,7 @@ if (replicationConfig.getBoolean("replication", "consumeStreamEvents", false)) { install(new StreamEventModule()); } else if (eventBrokerTopic != null) { - install(new EventsBrokerConsumerModule(eventBrokerTopic)); + install(new EventsBrokerConsumerModule(eventBrokerTopic, replicationConfig)); } DynamicSet.setOf(binder(), ReplicationStateListener.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java index ce1a076..a8d5cf4 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
@@ -17,21 +17,28 @@ import com.google.gerrit.lifecycle.LifecycleModule; import com.google.inject.Scopes; import com.google.inject.name.Names; +import com.google.inject.util.Providers; +import org.eclipse.jgit.lib.Config; public class EventsBrokerConsumerModule extends LifecycleModule { public static final String STREAM_EVENTS_TOPIC_NAME = "stream_events_topic_name"; + public static final String STREAM_EVENTS_GROUP_ID = "stream_events_group_id"; private final String topicName; + private final Config config; - public EventsBrokerConsumerModule(String topicName) { + public EventsBrokerConsumerModule(String topicName, Config config) { this.topicName = topicName; + this.config = config; } @Override protected void configure() { bind(EventsBrokerMessageConsumer.class).in(Scopes.SINGLETON); bind(String.class).annotatedWith(Names.named(STREAM_EVENTS_TOPIC_NAME)).toInstance(topicName); - + bind(String.class) + .annotatedWith(Names.named(STREAM_EVENTS_GROUP_ID)) + .toProvider(Providers.of(config.getString("replication", null, "eventBrokerGroupId"))); listener().to(EventsBrokerMessageConsumer.class); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java index d06681b..13b038e 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
@@ -14,9 +14,13 @@ package com.googlesource.gerrit.plugins.replication.pull.event; +import static com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule.STREAM_EVENTS_GROUP_ID; import static com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule.STREAM_EVENTS_TOPIC_NAME; import com.gerritforge.gerrit.eventbroker.BrokerApi; +import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi; +import com.google.common.flogger.FluentLogger; +import com.google.gerrit.common.Nullable; import com.google.gerrit.extensions.events.LifecycleListener; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.extensions.restapi.AuthException; @@ -29,22 +33,26 @@ public class EventsBrokerMessageConsumer implements Consumer<Event>, LifecycleListener { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private final DynamicItem<BrokerApi> eventsBroker; private final StreamEventListener eventListener; private final ShutdownState shutdownState; private final String eventsTopicName; + private final String groupId; @Inject public EventsBrokerMessageConsumer( DynamicItem<BrokerApi> eventsBroker, StreamEventListener eventListener, ShutdownState shutdownState, - @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName) { + @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName, + @Nullable @Named(STREAM_EVENTS_GROUP_ID) String groupId) { this.eventsBroker = eventsBroker; this.eventListener = eventListener; this.shutdownState = shutdownState; this.eventsTopicName = eventsTopicName; + this.groupId = groupId; } @Override @@ -59,7 +67,21 @@ @Override public void start() { - eventsBroker.get().receiveAsync(eventsTopicName, this); + BrokerApi brokerApi = eventsBroker.get(); + if (groupId == null) { + brokerApi.receiveAsync(eventsTopicName, this); + return; + } + + if (!(eventsBroker instanceof ExtendedBrokerApi)) { + throw new IllegalArgumentException( + String.format( + "Failed to load the pull-replication plugin: %s does not support the custom group-id '%s'.\n" + + "Remove replication.eventBrokerGroupId from replication.config or install a different event-broker plugin.", + eventsBroker.getClass(), groupId)); + } + + ((ExtendedBrokerApi) brokerApi).receiveAsync(eventsTopicName, groupId, this); } @Override
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md index 6287773..0c3e02c 100644 --- a/src/main/resources/Documentation/config.md +++ b/src/main/resources/Documentation/config.md
@@ -222,6 +222,16 @@ Default: unset +replication.eventBrokerGroupId +: Optional group id for consumers. Used for keeping the offset of the + the last acked message on the topic with a specific group id; supported + only if the event-broker plugin supports the ExtendedBrokerApi + interface. + When not set, the messages received are acked using a global group id + shared by all subscribers in Gerrit. + + Default: unset + replication.maxConnectionsPerRoute : Maximum number of HTTP connections per one HTTP route.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java index 9640b82..8739d4b 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
@@ -49,7 +49,7 @@ shutdownState = new ShutdownState(); objectUnderTest = new EventsBrokerMessageConsumer( - eventsBrokerDynamicItem, eventListener, shutdownState, "topicName"); + eventsBrokerDynamicItem, eventListener, shutdownState, "topicName", null); } @Test