Adapt to the latest BrokerApi interface
The latest BrokerApiInterface requires the implementation of three
new interfaces :
- receiveAsync(String streamName, String groupId, Consumer<Event> consumer)
- disconnect(String topic, String groupId)
- Set<TopicSubscriberWithGroupId> topicSubscribersWithGroupId()
Implement the above interfaces for kinesis, so that kinesis streams
subscriptions can be managed by providing an explicit GroupId.
Bug: Issue 327473326
Change-Id: I5f5849c1ffd0a4a3a760794aae4589e99cff0cee
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/CheckpointResetter.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/CheckpointResetter.java
index 86fad53..df03b61 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/CheckpointResetter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/CheckpointResetter.java
@@ -55,8 +55,8 @@
this.dynamoDbAsyncClient = dynamoDbAsyncClient;
}
- public void setAllShardsToBeginning(String streamName) {
- String leaseTable = cosumerLeaseName(configuration.getApplicationName(), streamName);
+ public void setAllShardsToBeginning(String streamName, String groupId) {
+ String leaseTable = cosumerLeaseName(groupId, streamName);
try {
for (String shard : getAllShards(leaseTable)) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
index 89476cf..6291152 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
@@ -228,8 +228,8 @@
: System.getProperty(name);
}
- public static String cosumerLeaseName(String applicationName, String streamName) {
- return String.format("%s-%s", applicationName, streamName);
+ public static String cosumerLeaseName(String groupId, String streamName) {
+ return String.format("%s-%s", groupId, streamName);
}
public Long getShutdownTimeoutMs() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
index 0316c06..675d801 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
@@ -16,10 +16,13 @@
import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.common.Nullable;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import java.util.Collections;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@@ -27,15 +30,19 @@
class KinesisBrokerApi implements BrokerApi {
private final KinesisConsumer.Factory consumerFactory;
+ private final Configuration configuration;
private final KinesisPublisher kinesisPublisher;
private final Set<KinesisConsumer> consumers;
@Inject
public KinesisBrokerApi(
- KinesisPublisher kinesisPublisher, KinesisConsumer.Factory consumerFactory) {
+ KinesisPublisher kinesisPublisher,
+ KinesisConsumer.Factory consumerFactory,
+ Configuration configuration) {
this.kinesisPublisher = kinesisPublisher;
this.consumerFactory = consumerFactory;
+ this.configuration = configuration;
this.consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
}
@@ -46,9 +53,12 @@
@Override
public void receiveAsync(String streamName, Consumer<Event> eventConsumer) {
- KinesisConsumer consumer = consumerFactory.create(streamName, eventConsumer);
- consumers.add(consumer);
- consumer.subscribe(streamName, eventConsumer);
+ receive(streamName, eventConsumer, null);
+ }
+
+ @Override
+ public void receiveAsync(String streamName, String groupId, Consumer<Event> consumer) {
+ receive(streamName, consumer, groupId);
}
@Override
@@ -65,9 +75,40 @@
}
@Override
+ public void disconnect(String topic, String groupId) {
+ Set<KinesisConsumer> consumersOfTopic =
+ consumers
+ .parallelStream()
+ .filter(c -> topic.equals(c.getStreamName()) && groupId.equals(c.getGroupId()))
+ .collect(Collectors.toSet());
+
+ consumersOfTopic.forEach(KinesisConsumer::shutdown);
+ consumers.removeAll(consumersOfTopic);
+ }
+
+ @Override
public void replayAllEvents(String topic) {
consumers.stream()
.filter(subscriber -> topic.equals(subscriber.getStreamName()))
.forEach(KinesisConsumer::resetOffset);
}
+
+ @Override
+ public Set<TopicSubscriberWithGroupId> topicSubscribersWithGroupId() {
+ return consumers.stream()
+ .map(
+ s ->
+ TopicSubscriberWithGroupId.topicSubscriberWithGroupId(
+ s.getGroupId(),
+ TopicSubscriber.topicSubscriber(s.getStreamName(), s.getMessageProcessor())))
+ .collect(Collectors.toSet());
+ }
+
+ private void receive(
+ String streamName, Consumer<Event> eventConsumer, @Nullable String maybeGroupId) {
+ String groupId = Optional.ofNullable(maybeGroupId).orElse(configuration.getApplicationName());
+ KinesisConsumer consumer = consumerFactory.create(streamName, groupId, eventConsumer);
+ consumers.add(consumer);
+ consumer.subscribe(streamName, eventConsumer);
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
index b563e26..d17595c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
@@ -17,6 +17,7 @@
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -26,7 +27,10 @@
class KinesisConsumer {
interface Factory {
- KinesisConsumer create(String topic, Consumer<Event> messageProcessor);
+ KinesisConsumer create(
+ @Assisted("topic") String topic,
+ @Assisted("groupId") String groupId,
+ Consumer<Event> messageProcessor);
}
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -34,6 +38,8 @@
private final CheckpointResetter checkpointResetter;
private final Configuration configuration;
private final ExecutorService executor;
+
+ private final String groupId;
private Scheduler kinesisScheduler;
private java.util.function.Consumer<Event> messageProcessor;
@@ -45,11 +51,13 @@
SchedulerProvider.Factory schedulerFactory,
CheckpointResetter checkpointResetter,
Configuration configuration,
- @ConsumerExecutor ExecutorService executor) {
+ @ConsumerExecutor ExecutorService executor,
+ @Assisted("groupId") String groupId) {
this.schedulerFactory = schedulerFactory;
this.checkpointResetter = checkpointResetter;
this.configuration = configuration;
this.executor = executor;
+ this.groupId = groupId;
}
public void subscribe(String streamName, java.util.function.Consumer<Event> messageProcessor) {
@@ -57,12 +65,14 @@
this.messageProcessor = messageProcessor;
logger.atInfo().log("Subscribe kinesis consumer to stream [%s]", streamName);
- runReceiver(messageProcessor);
+ runReceiver(groupId, messageProcessor);
}
- private void runReceiver(java.util.function.Consumer<Event> messageProcessor) {
+ private void runReceiver(String groupId, java.util.function.Consumer<Event> messageProcessor) {
this.kinesisScheduler =
- schedulerFactory.create(streamName, resetOffset.getAndSet(false), messageProcessor).get();
+ schedulerFactory
+ .create(streamName, groupId, resetOffset.getAndSet(false), messageProcessor)
+ .get();
executor.execute(kinesisScheduler);
}
@@ -88,10 +98,14 @@
return streamName;
}
+ public String getGroupId() {
+ return groupId;
+ }
+
public void resetOffset() {
// Move all checkpoints (if any) to TRIM_HORIZON, so that the consumer
// scheduler will start consuming from beginning.
- checkpointResetter.setAllShardsToBeginning(streamName);
+ checkpointResetter.setAllShardsToBeginning(streamName, groupId);
// Even when no checkpoints have been persisted, instruct the consumer
// scheduler to start from TRIM_HORIZON, irrespective of 'initialPosition'
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
index 09083df..deffd67 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
@@ -33,7 +33,8 @@
class SchedulerProvider implements Provider<Scheduler> {
interface Factory {
SchedulerProvider create(
- String streamName,
+ @Assisted("streamName") String streamName,
+ @Assisted("groupId") String groupId,
boolean fromBeginning,
java.util.function.Consumer<Event> messageProcessor);
}
@@ -51,7 +52,8 @@
DynamoDbAsyncClient dynamoDbAsyncClient,
CloudWatchAsyncClient cloudWatchAsyncClient,
KinesisRecordProcessorFactory.Factory kinesisRecordProcessorFactory,
- @Assisted String streamName,
+ @Assisted("streamName") String streamName,
+ @Assisted("groupId") String groupId,
@Assisted boolean fromBeginning,
@Assisted java.util.function.Consumer<Event> messageProcessor) {
this.configuration = configuration;
@@ -61,7 +63,7 @@
this.configsBuilder =
new ConfigsBuilder(
streamName,
- cosumerLeaseName(configuration.getApplicationName(), streamName),
+ cosumerLeaseName(groupId, streamName),
kinesisAsyncClient,
dynamoDbAsyncClient,
cloudWatchAsyncClient,
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
index 7449440..7b61d10 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
@@ -124,6 +124,45 @@
@Test
@GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
@GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
+ public void shouldConsumeEventBySubscribingWithSpecificGroupId() throws Exception {
+ String streamName = UUID.randomUUID().toString();
+ String groupId = UUID.randomUUID().toString();
+ createStreamAndWait(streamName, STREAM_CREATION_TIMEOUT);
+
+ EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
+ kinesisBroker().receiveAsync(streamName, groupId, eventConsumerCounter);
+
+ Event event = eventMessage();
+ kinesisBroker().send(streamName, event);
+ WaitUtil.waitUntil(
+ () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
+ compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
+
+ assertThat(countSubscribers(streamName, groupId)).isEqualTo(1L);
+ }
+
+ @Test
+ @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
+ @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
+ public void shouldDisconnectOnlySubscriberForSpecificGroupId() throws Exception {
+ String streamName = UUID.randomUUID().toString();
+ String groupId1 = UUID.randomUUID().toString();
+ String groupId2 = UUID.randomUUID().toString();
+ createStreamAndWait(streamName, STREAM_CREATION_TIMEOUT);
+
+ EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
+ kinesisBroker().receiveAsync(streamName, groupId1, eventConsumerCounter);
+ kinesisBroker().receiveAsync(streamName, groupId2, eventConsumerCounter);
+
+ kinesisBroker().disconnect(streamName, groupId2);
+
+ assertThat(countSubscribers(streamName, groupId2)).isEqualTo(0L);
+ assertThat(countSubscribers(streamName, groupId1)).isEqualTo(1L);
+ }
+
+ @Test
+ @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
+ @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
public void shouldConsumeAnEventWithoutInstanceId() throws Exception {
String streamName = UUID.randomUUID().toString();
createStreamAndWait(streamName, STREAM_CREATION_TIMEOUT);
@@ -277,6 +316,12 @@
assertThat(event.instanceId).isEqualTo(expectedEvent.instanceId);
}
+ private long countSubscribers(String streamName, String groupId) {
+ return kinesisBroker().topicSubscribersWithGroupId().stream()
+ .filter(t -> groupId.equals(t.groupId()) && streamName.equals(t.topicSubscriber().topic()))
+ .count();
+ }
+
private static class EventConsumerCounter implements Consumer<Event> {
List<Event> consumedMessages = new ArrayList<>();