Adapt to the latest BrokerApi interface
Implement three methods added to the `BrokerApi` interface on `master`
and fix compilation errors.
The added methods allow managing the subscriptions by adding the
`groupId` parameter.
Bug: Issue 327699479
Change-Id: If3e4bd131bfd492d5e452b9989ac8d975991293e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
index 17233b0..6957afc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
@@ -16,12 +16,15 @@
import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
import com.google.common.flogger.FluentLogger;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.common.Nullable;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import java.util.Collections;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@@ -29,6 +32,7 @@
class PubSubBrokerApi implements BrokerApi {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private PubSubConfiguration configuration;
private PubSubPublisher.Factory publisherFactory;
private PubSubEventSubscriber.Factory subscriberFactory;
private Map<String, PubSubPublisher> publishers = new ConcurrentHashMap<>();
@@ -36,7 +40,10 @@
@Inject
public PubSubBrokerApi(
- PubSubPublisher.Factory publisherFactory, PubSubEventSubscriber.Factory subscriberFactory) {
+ PubSubConfiguration configuration,
+ PubSubPublisher.Factory publisherFactory,
+ PubSubEventSubscriber.Factory subscriberFactory) {
+ this.configuration = configuration;
this.publisherFactory = publisherFactory;
this.subscriberFactory = subscriberFactory;
subscribers = Collections.newSetFromMap(new ConcurrentHashMap<>());
@@ -49,12 +56,29 @@
@Override
public void receiveAsync(String topic, Consumer<Event> eventConsumer) {
- PubSubEventSubscriber subscriber = subscriberFactory.create(topic, eventConsumer);
+ receiveAsync(topic, null, eventConsumer);
+ }
+
+ @Override
+ public void receiveAsync(String topic, @Nullable String maybeGroupId, Consumer<Event> consumer) {
+ String groupId = Optional.ofNullable(maybeGroupId).orElse(configuration.getSubscriptionId());
+ PubSubEventSubscriber subscriber = subscriberFactory.create(topic, groupId, consumer);
subscribers.add(subscriber);
subscriber.subscribe();
}
@Override
+ public Set<TopicSubscriberWithGroupId> topicSubscribersWithGroupId() {
+ return subscribers.stream()
+ .map(
+ s ->
+ TopicSubscriberWithGroupId.topicSubscriberWithGroupId(
+ s.getGroupId(),
+ TopicSubscriber.topicSubscriber(s.getTopic(), s.getMessageProcessor())))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
public Set<TopicSubscriber> topicSubscribers() {
return subscribers.stream()
.map(s -> TopicSubscriber.topicSubscriber(s.getTopic(), s.getMessageProcessor()))
@@ -81,6 +105,17 @@
}
@Override
+ public void disconnect(String topic, String groupId) {
+ subscribers.stream()
+ .filter(s -> s.getGroupId().equals(groupId) && topic.equals(s.getTopic()))
+ .forEach(
+ c -> {
+ subscribers.remove(c);
+ c.shutdown();
+ });
+ }
+
+ @Override
public void replayAllEvents(String topic) {
subscribers.stream()
.filter(subscriber -> topic.equals(subscriber.getTopic()))
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
index b8f4d18..3210051 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
@@ -34,7 +34,10 @@
public class PubSubEventSubscriber {
public interface Factory {
- public PubSubEventSubscriber create(String topic, Consumer<Event> messageProcessor);
+ public PubSubEventSubscriber create(
+ @Assisted("topic") String topic,
+ @Assisted("groupId") String groupId,
+ Consumer<Event> messageProcessor);
}
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -43,6 +46,7 @@
private final PubSubSubscriberMetrics subscriberMetrics;
private final OneOffRequestContext oneOffRequestContext;
private final String topic;
+ private final String groupId;
private final Consumer<Event> messageProcessor;
private final SubscriberProvider subscriberProvider;
private final PubSubConfiguration config;
@@ -55,12 +59,14 @@
PubSubConfiguration config,
PubSubSubscriberMetrics subscriberMetrics,
OneOffRequestContext oneOffRequestContext,
- @Assisted String topic,
+ @Assisted("topic") String topic,
+ @Assisted("groupId") String groupId,
@Assisted Consumer<Event> messageProcessor) {
this.eventsDeserializer = eventsDeserializer;
this.subscriberMetrics = subscriberMetrics;
this.oneOffRequestContext = oneOffRequestContext;
this.topic = topic;
+ this.groupId = groupId;
this.messageProcessor = messageProcessor;
this.subscriberProvider = subscriberProvider;
this.config = config;
@@ -68,7 +74,7 @@
public void subscribe() {
try {
- subscriber = subscriberProvider.get(topic, getMessageReceiver());
+ subscriber = subscriberProvider.get(topic, groupId, getMessageReceiver());
subscriber
.startAsync()
.awaitRunning(config.getSubscribtionTimeoutInSeconds(), TimeUnit.SECONDS);
@@ -83,6 +89,10 @@
return topic;
}
+ public String getGroupId() {
+ return groupId;
+ }
+
public Consumer<Event> getMessageProcessor() {
return messageProcessor;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/SubscriberProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/SubscriberProvider.java
index 31024d8..be4258f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/SubscriberProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/SubscriberProvider.java
@@ -53,8 +53,8 @@
this.executor = executor;
}
- public Subscriber get(String topic, MessageReceiver receiver) throws IOException {
- return Subscriber.newBuilder(getOrCreateSubscription(topic).getName(), receiver)
+ public Subscriber get(String topic, String groupId, MessageReceiver receiver) throws IOException {
+ return Subscriber.newBuilder(getOrCreateSubscription(topic, groupId).getName(), receiver)
.setExecutorProvider(FixedExecutorProvider.create(executor))
.setCredentialsProvider(credentials)
.build();
@@ -65,10 +65,14 @@
}
protected Subscription getOrCreateSubscription(String topicId) throws IOException {
+ return getOrCreateSubscription(topicId, pubSubProperties.getSubscriptionId());
+ }
+
+ protected Subscription getOrCreateSubscription(String topicId, String groupId)
+ throws IOException {
try (SubscriptionAdminClient subscriptionAdminClient =
SubscriptionAdminClient.create(createSubscriptionAdminSettings())) {
- String subscriptionName =
- String.format("%s-%s", pubSubProperties.getSubscriptionId(), topicId);
+ String subscriptionName = String.format("%s-%s", groupId, topicId);
ProjectSubscriptionName projectSubscriptionName =
ProjectSubscriptionName.of(pubSubProperties.getGCloudProject(), subscriptionName);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalSubscriberProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalSubscriberProvider.java
index a0da1e5..97e61f8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalSubscriberProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalSubscriberProvider.java
@@ -50,10 +50,10 @@
}
@Override
- public Subscriber get(String topic, MessageReceiver receiver) throws IOException {
+ public Subscriber get(String topic, String groupId, MessageReceiver receiver) throws IOException {
TransportChannelProvider channelProvider = createChannelProvider();
createTopic(channelProvider, pubSubProperties.getGCloudProject(), topic);
- return Subscriber.newBuilder(getOrCreateSubscription(topic).getName(), receiver)
+ return Subscriber.newBuilder(getOrCreateSubscription(topic, groupId).getName(), receiver)
.setChannelProvider(channelProvider)
.setExecutorProvider(FixedExecutorProvider.create(executor))
.setCredentialsProvider(credentials)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
index e27e946..3654397 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
@@ -66,6 +66,7 @@
public class PubSubBrokerApiIT extends LightweightPluginDaemonTest {
private static final String PROJECT_ID = "test_project";
private static final String TOPIC_ID = "test_topic";
+ private static final String GROUP_ID = "test_group";
private static final String SUBSCRIPTION_ID = "test_subscription_id";
private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5);
@@ -203,6 +204,51 @@
TEST_TIMEOUT);
}
+ @Test
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
+ @GerritConfig(
+ name = "plugin.events-gcloud-pubsub.privateKeyLocation",
+ value = PRIVATE_KEY_LOCATION)
+ public void shouldConsumeEventWithGroupId() throws InterruptedException {
+ Event event = new ProjectCreatedEvent();
+ event.instanceId = DEFAULT_INSTANCE_ID;
+ String expectedMessageJson = gson.toJson(event);
+ TestConsumer consumer = new TestConsumer();
+
+ objectUnderTest.receiveAsync(TOPIC_ID, GROUP_ID, consumer);
+
+ objectUnderTest.send(TOPIC_ID, event);
+
+ assertThat(countSubscribers()).isEqualTo(1);
+ WaitUtil.waitUntil(
+ () ->
+ consumer.getMessage() != null
+ && expectedMessageJson.equals(gson.toJson(consumer.getMessage())),
+ TEST_TIMEOUT);
+ }
+
+ @Test
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
+ @GerritConfig(
+ name = "plugin.events-gcloud-pubsub.privateKeyLocation",
+ value = PRIVATE_KEY_LOCATION)
+ public void shouldDisconnectOnlySubscriberForSpecificGroupId() throws Exception {
+ String streamName = "stream_name";
+ String groupId1 = "group1";
+ String groupId2 = "group2";
+ TestConsumer consumer = new TestConsumer();
+
+ objectUnderTest.receiveAsync(streamName, groupId1, consumer);
+ objectUnderTest.receiveAsync(streamName, groupId2, consumer);
+
+ objectUnderTest.disconnect(streamName, groupId2);
+
+ assertThat(countSubscribers(streamName, groupId2)).isEqualTo(0L);
+ assertThat(countSubscribers(streamName, groupId1)).isEqualTo(1L);
+ }
+
private void readMessageAndValidate(Consumer<PullResponse> validate) throws IOException {
readMessageAndValidate(validate, PROJECT_ID, SUBSCRIPTION_ID);
}
@@ -266,6 +312,16 @@
.getName();
}
+ private long countSubscribers() {
+ return countSubscribers(TOPIC_ID, GROUP_ID);
+ }
+
+ private long countSubscribers(String topicId, String groupId) {
+ return objectUnderTest.topicSubscribersWithGroupId().stream()
+ .filter(t -> groupId.equals(t.groupId()) && topicId.equals(t.topicSubscriber().topic()))
+ .count();
+ }
+
private class TestConsumer implements Consumer<Event> {
private Event msg;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
index d9d9fac..f9d6a6d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
@@ -50,6 +50,7 @@
@Captor ArgumentCaptor<Event> eventMessageCaptor;
private static final String TOPIC = "foo";
+ private static final String GROUP_ID = "bar";
private Gson gson = new EventGsonProvider().get();
private EventDeserializer deserializer = new EventDeserializer(gson);
@@ -127,6 +128,7 @@
pubSubSubscriberMetricsMock,
oneOffRequestContext,
TOPIC,
+ GROUP_ID,
consumer)
.getMessageReceiver();
}