Allow subscribing to the topic with custom group id
In the past, when utilizing pull-replication as a standalone solution
in conjunction with a broker (inclusive of plugins such as events-kafka,
events-aws-kinesis, events-gcloud-pubsub), the consumer's groupId was
defined either within the events-* plugin configuration or, in the
absence of configuration, as "gerrit.instance-id."
Thanks to a new feature in events-broker, since v3.4.8-4-gd9f859f, it is
possible to define a different group id by configuring
"replication.eventBrokerGroupId" in the replication.conf.
NOTE: This feature is applicable only if the the events-* plugin
implements the ExtendedBrokerApi interface, which supports the ability
to subscribe to a topic with a custom group id.
Bug: Issue 299327285
Change-Id: I33d056ebd3e80b6a4c9d8eb02a0c2c1b591e8eb5
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 5308294..72a6b74 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -152,7 +152,7 @@
if (replicationConfig.getBoolean("replication", "consumeStreamEvents", false)) {
install(new StreamEventModule());
} else if (eventBrokerTopic != null) {
- install(new EventsBrokerConsumerModule(eventBrokerTopic));
+ install(new EventsBrokerConsumerModule(eventBrokerTopic, replicationConfig));
}
DynamicSet.setOf(binder(), ReplicationStateListener.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
index ce1a076..a8d5cf4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
@@ -17,21 +17,28 @@
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.inject.Scopes;
import com.google.inject.name.Names;
+import com.google.inject.util.Providers;
+import org.eclipse.jgit.lib.Config;
public class EventsBrokerConsumerModule extends LifecycleModule {
public static final String STREAM_EVENTS_TOPIC_NAME = "stream_events_topic_name";
+ public static final String STREAM_EVENTS_GROUP_ID = "stream_events_group_id";
private final String topicName;
+ private final Config config;
- public EventsBrokerConsumerModule(String topicName) {
+ public EventsBrokerConsumerModule(String topicName, Config config) {
this.topicName = topicName;
+ this.config = config;
}
@Override
protected void configure() {
bind(EventsBrokerMessageConsumer.class).in(Scopes.SINGLETON);
bind(String.class).annotatedWith(Names.named(STREAM_EVENTS_TOPIC_NAME)).toInstance(topicName);
-
+ bind(String.class)
+ .annotatedWith(Names.named(STREAM_EVENTS_GROUP_ID))
+ .toProvider(Providers.of(config.getString("replication", null, "eventBrokerGroupId")));
listener().to(EventsBrokerMessageConsumer.class);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
index d06681b..13b038e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
@@ -14,9 +14,13 @@
package com.googlesource.gerrit.plugins.replication.pull.event;
+import static com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule.STREAM_EVENTS_GROUP_ID;
import static com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule.STREAM_EVENTS_TOPIC_NAME;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.common.Nullable;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.restapi.AuthException;
@@ -29,22 +33,26 @@
public class EventsBrokerMessageConsumer implements Consumer<Event>, LifecycleListener {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final DynamicItem<BrokerApi> eventsBroker;
private final StreamEventListener eventListener;
private final ShutdownState shutdownState;
private final String eventsTopicName;
+ private final String groupId;
@Inject
public EventsBrokerMessageConsumer(
DynamicItem<BrokerApi> eventsBroker,
StreamEventListener eventListener,
ShutdownState shutdownState,
- @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName) {
+ @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName,
+ @Nullable @Named(STREAM_EVENTS_GROUP_ID) String groupId) {
this.eventsBroker = eventsBroker;
this.eventListener = eventListener;
this.shutdownState = shutdownState;
this.eventsTopicName = eventsTopicName;
+ this.groupId = groupId;
}
@Override
@@ -59,7 +67,21 @@
@Override
public void start() {
- eventsBroker.get().receiveAsync(eventsTopicName, this);
+ BrokerApi brokerApi = eventsBroker.get();
+ if (groupId == null) {
+ brokerApi.receiveAsync(eventsTopicName, this);
+ return;
+ }
+
+ if (!(eventsBroker instanceof ExtendedBrokerApi)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Failed to load the pull-replication plugin: %s does not support the custom group-id '%s'.\n"
+ + "Remove replication.eventBrokerGroupId from replication.config or install a different event-broker plugin.",
+ eventsBroker.getClass(), groupId));
+ }
+
+ ((ExtendedBrokerApi) brokerApi).receiveAsync(eventsTopicName, groupId, this);
}
@Override
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 6287773..0c3e02c 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -222,6 +222,16 @@
Default: unset
+replication.eventBrokerGroupId
+: Optional group id for consumers. Used for keeping the offset of the
+ the last acked message on the topic with a specific group id; supported
+ only if the event-broker plugin supports the ExtendedBrokerApi
+ interface.
+ When not set, the messages received are acked using a global group id
+ shared by all subscribers in Gerrit.
+
+ Default: unset
+
replication.maxConnectionsPerRoute
: Maximum number of HTTP connections per one HTTP route.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
index 9640b82..8739d4b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
@@ -49,7 +49,7 @@
shutdownState = new ShutdownState();
objectUnderTest =
new EventsBrokerMessageConsumer(
- eventsBrokerDynamicItem, eventListener, shutdownState, "topicName");
+ eventsBrokerDynamicItem, eventListener, shutdownState, "topicName", null);
}
@Test