blob: d202f25703edfe7fc4eee4ab54ba2d470ed6a7e7 [file] [log] [blame]
// Copyright (C) 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.plugins.pubsub;
import static com.google.common.truth.Truth.assertThat;
import static java.util.stream.Collectors.counting;
import static java.util.stream.Collectors.groupingBy;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
import com.google.gerrit.acceptance.NoHttpd;
import com.google.gerrit.acceptance.TestPlugin;
import com.google.gerrit.acceptance.WaitUtil;
import com.google.gerrit.acceptance.config.GerritConfig;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventGson;
import com.google.gerrit.server.events.ProjectCreatedEvent;
import com.google.gerrit.server.events.RefUpdatedEvent;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.TopicName;
import com.googlesource.gerrit.plugins.pubsub.local.EnvironmentChecker;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.junit.Test;
import org.testcontainers.containers.PubSubEmulatorContainer;
import org.testcontainers.utility.DockerImageName;
@NoHttpd
@TestPlugin(
name = "events-gcloud-pubsub",
sysModule = "com.googlesource.gerrit.plugins.pubsub.Module")
public class PubSubBrokerApiIT extends LightweightPluginDaemonTest {
private static final String PROJECT_ID = "test_project";
private static final String TOPIC_ID = "test_topic";
private static final String SUBSCRIPTION_ID = "test_subscription_id";
private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5);
private static final String PRIVATE_KEY_LOCATION = "not used in test";
private static final String DEFAULT_INSTANCE_ID = "instance-id";
@Inject @EventGson private Gson gson;
private TransportChannelProvider channelProvider;
private NoCredentialsProvider credentialsProvider;
private ManagedChannel channel;
private BrokerApi objectUnderTest;
public PubSubEmulatorContainer emulator =
new PubSubEmulatorContainer(
DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:316.0.0-emulators"));
@Override
public void setUpTestPlugin() throws Exception {
emulator.start();
String hostport = emulator.getEmulatorEndpoint();
System.setProperty(EnvironmentChecker.PUBSUB_EMULATOR_HOST, hostport);
channel = ManagedChannelBuilder.forTarget(hostport).usePlaintext().build();
channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
credentialsProvider = NoCredentialsProvider.create();
createTopic(TOPIC_ID, channelProvider, credentialsProvider);
super.setUpTestPlugin();
objectUnderTest = plugin.getSysInjector().getInstance(BrokerApi.class);
}
@Override
public void tearDownTestPlugin() {
channel.shutdown();
emulator.close();
super.tearDownTestPlugin();
}
@Test
@GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
@GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
@GerritConfig(
name = "plugin.events-gcloud-pubsub.privateKeyLocation",
value = PRIVATE_KEY_LOCATION)
public void shouldSendEvent() throws IOException {
createSubscription(SUBSCRIPTION_ID, TOPIC_ID, channelProvider, credentialsProvider);
Event event = new ProjectCreatedEvent();
event.instanceId = DEFAULT_INSTANCE_ID;
String expectedMessageJson = gson.toJson(event);
objectUnderTest.send(TOPIC_ID, event);
readMessageAndValidate(
(pullResponse) -> {
assertThat(pullResponse.getReceivedMessagesList()).hasSize(1);
assertThat(pullResponse.getReceivedMessages(0).getMessage().getData().toStringUtf8())
.isEqualTo(expectedMessageJson);
});
}
@Test
@GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
@GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
@GerritConfig(
name = "plugin.events-gcloud-pubsub.privateKeyLocation",
value = PRIVATE_KEY_LOCATION)
public void shouldProduceStreamEvents() throws Exception {
String subscriptionId = "gerrit-subscription-id";
String topicId = "gerrit";
createSubscription(subscriptionId, topicId, channelProvider, credentialsProvider);
createChange();
readMessageAndValidate(
(pullResponse) -> {
List<ReceivedMessage> messages = pullResponse.getReceivedMessagesList();
assertThat(messages).hasSize(4);
Map<String, Long> messageTypeCount =
messages.stream()
.map(m -> gson.fromJson(m.getMessage().getData().toStringUtf8(), Map.class))
.map(m -> m.get("type").toString())
.collect(groupingBy(t -> t, counting()));
assertThat(messageTypeCount.get(RefUpdatedEvent.TYPE)).isEqualTo(3);
assertThat(messageTypeCount.get("patchset-created")).isEqualTo(1);
},
PROJECT_ID,
subscriptionId);
}
@Test
@GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
@GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
@GerritConfig(
name = "plugin.events-gcloud-pubsub.privateKeyLocation",
value = PRIVATE_KEY_LOCATION)
public void shouldConsumeEvent() throws InterruptedException {
Event event = new ProjectCreatedEvent();
event.instanceId = DEFAULT_INSTANCE_ID;
String expectedMessageJson = gson.toJson(event);
TestConsumer consumer = new TestConsumer();
objectUnderTest.receiveAsync(TOPIC_ID, consumer);
objectUnderTest.send(TOPIC_ID, event);
WaitUtil.waitUntil(
() ->
consumer.getMessage() != null
&& expectedMessageJson.equals(gson.toJson(consumer.getMessage())),
TEST_TIMEOUT);
}
private void readMessageAndValidate(Consumer<PullResponse> validate) throws IOException {
readMessageAndValidate(validate, PROJECT_ID, SUBSCRIPTION_ID);
}
private void readMessageAndValidate(
Consumer<PullResponse> validate, String projectId, String subscriptionId) throws IOException {
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(Integer.MAX_VALUE) // make sure that we read all messages
.setSubscription(ProjectSubscriptionName.format(projectId, subscriptionId))
.build();
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
validate.accept(pullResponse);
}
}
private void createTopic(
String topicId,
TransportChannelProvider channelProvider,
NoCredentialsProvider credentialsProvider)
throws IOException {
TopicAdminSettings topicAdminSettings =
TopicAdminSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build();
try (TopicAdminClient topicAdminClient = TopicAdminClient.create(topicAdminSettings)) {
TopicName topicName = TopicName.of(PROJECT_ID, topicId);
topicAdminClient.createTopic(topicName);
}
}
private void createSubscription(
String subscriptionId,
String topicId,
TransportChannelProvider channelProvider,
NoCredentialsProvider credentialsProvider)
throws IOException {
SubscriptionAdminSettings subscriptionAdminSettings =
SubscriptionAdminSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build();
SubscriptionAdminClient subscriptionAdminClient =
SubscriptionAdminClient.create(subscriptionAdminSettings);
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(PROJECT_ID, subscriptionId);
subscriptionAdminClient
.createSubscription(
subscriptionName,
TopicName.of(PROJECT_ID, topicId),
PushConfig.getDefaultInstance(),
10)
.getName();
}
private class TestConsumer implements Consumer<Event> {
private Event msg;
@Override
public void accept(Event msg) {
this.msg = msg;
}
public Event getMessage() {
return msg;
}
}
}