Merge branch 'stable-3.4' into stable-3.5
* stable-3.4:
Consume events-broker from source
Change-Id: I68d43e9e9d0cceabff780f869098f26c8207f6cc
diff --git a/BUILD b/BUILD
index c34e07c..e526117 100644
--- a/BUILD
+++ b/BUILD
@@ -12,6 +12,7 @@
manifest_entries = [
"Gerrit-PluginName: events-gcloud-pubsub",
"Gerrit-Module: com.googlesource.gerrit.plugins.pubsub.Module",
+ "Gerrit-InitStep: com.googlesource.gerrit.plugins.pubsub.InitConfig",
"Implementation-Title: Gerrit events listener to send events to an external GCloud PubSub broker",
"Implementation-URL: https://gerrit.googlesource.com/plugins/events-gcloud-pubsub",
],
@@ -51,7 +52,6 @@
tags = ["events-gcloud-pubsub"],
deps = [
":events-gcloud-pubsub__plugin_test_deps",
- "//lib/testcontainers",
"//plugins/events-broker",
"@api-common//jar",
"@gax-grpc//jar",
@@ -86,10 +86,13 @@
visibility = ["//visibility:public"],
exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
":events-gcloud-pubsub__plugin",
- "//lib/jackson:jackson-annotations",
- "//lib/testcontainers",
- "//lib/testcontainers:docker-java-api",
- "//lib/testcontainers:docker-java-transport",
+ "@jackson-annotations//jar",
+ "@testcontainers//jar",
+ "@docker-java-api//jar",
+ "@docker-java-transport//jar",
+ "@duct-tape//jar",
+ "@visible-assertions//jar",
+ "@jna//jar",
"@testcontainers-gcloud//jar",
"@grpc-api//jar",
"@gax-grpc//jar",
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index b2c0cbe..29a06a1 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -43,10 +43,56 @@
sha1 = "11e565f1a65f7e2245238ac5c19875c0ddd25b14",
)
+ TESTCONTAINERS_VERSION = "1.15.3"
+
+ maven_jar(
+ name = "testcontainers",
+ artifact = "org.testcontainers:testcontainers:" + TESTCONTAINERS_VERSION,
+ sha1 = "95c6cfde71c2209f0c29cb14e432471e0b111880",
+ )
+
maven_jar(
name = "testcontainers-gcloud",
- artifact = "org.testcontainers:gcloud:1.15.2",
- sha1 = "0ad02bb83edc818469e1080995cae409f5d40694",
+ artifact = "org.testcontainers:gcloud:" + TESTCONTAINERS_VERSION,
+ sha1 = "a2908fc7ed7f09df9124314114757314612826ff",
+ )
+
+ maven_jar(
+ name = "duct-tape",
+ artifact = "org.rnorth.duct-tape:duct-tape:1.0.8",
+ sha1 = "92edc22a9ab2f3e17c9bf700aaee377d50e8b530",
+ )
+
+ maven_jar(
+ name = "visible-assertions",
+ artifact = "org.rnorth.visible-assertions:visible-assertions:2.1.2",
+ sha1 = "20d31a578030ec8e941888537267d3123c2ad1c1",
+ )
+
+ maven_jar(
+ name = "jna",
+ artifact = "net.java.dev.jna:jna:5.5.0",
+ sha1 = "0e0845217c4907822403912ad6828d8e0b256208",
+ )
+
+ DOCKER_JAVA_VERS = "3.2.8"
+
+ maven_jar(
+ name = "docker-java-api",
+ artifact = "com.github.docker-java:docker-java-api:" + DOCKER_JAVA_VERS,
+ sha1 = "4ac22a72d546a9f3523cd4b5fabffa77c4a6ec7c",
+ )
+
+ maven_jar(
+ name = "docker-java-transport",
+ artifact = "com.github.docker-java:docker-java-transport:" + DOCKER_JAVA_VERS,
+ sha1 = "c3b5598c67d0a5e2e780bf48f520da26b9915eab",
+ )
+
+ maven_jar(
+ name = "jackson-annotations",
+ artifact = "com.fasterxml.jackson.core:jackson-annotations:2.10.3",
+ sha1 = "0f63b3b1da563767d04d2e4d3fc1ae0cdeffebe7",
)
maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/InitConfig.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/InitConfig.java
new file mode 100644
index 0000000..d7f15d6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/InitConfig.java
@@ -0,0 +1,107 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.pubsub;
+
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.ACK_DEADLINE_SECONDS_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_ACK_DEADLINE_SECONDS;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_NUMBER_OF_SUBSCRIBERS;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_SEND_STREAM_EVENTS;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_SHUTDOWN_TIMEOUT;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_STREAM_EVENTS_TOPIC;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_SUBSCTIPRION_TIMEOUT;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.GCLOUD_PROJECT_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.NUMBER_OF_SUBSCRIBERS_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.PRIVATE_KEY_LOCATION_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.SEND_STREAM_EVENTS_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.SHUTDOWN_TIMEOUT_SECONDS_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.STREAM_EVENTS_TOPIC_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.SUBSCRIPTION_ID_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.SUBSCRIPTION_TIMEOUT_SECONDS_FIELD;
+
+import com.google.common.base.Strings;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.pgm.init.api.ConsoleUI;
+import com.google.gerrit.pgm.init.api.InitStep;
+import com.google.gerrit.pgm.init.api.Section;
+import com.google.gerrit.server.config.GerritInstanceIdProvider;
+import com.google.inject.Inject;
+
+public class InitConfig implements InitStep {
+ private final Section pluginSection;
+ private final String pluginName;
+ private final ConsoleUI ui;
+ private final GerritInstanceIdProvider gerritInstanceIdProvider;
+
+ @Inject
+ InitConfig(
+ Section.Factory sections,
+ @PluginName String pluginName,
+ GerritInstanceIdProvider gerritInstanceIdProvider,
+ ConsoleUI ui) {
+ this.pluginName = pluginName;
+ this.ui = ui;
+ this.gerritInstanceIdProvider = gerritInstanceIdProvider;
+ this.pluginSection = sections.get("plugin", pluginName);
+ }
+
+ @Override
+ public void run() throws Exception {
+ ui.header(String.format("%s plugin", pluginName));
+
+ boolean sendStreamEvents = ui.yesno(DEFAULT_SEND_STREAM_EVENTS, "Should send stream events?");
+ pluginSection.set(SEND_STREAM_EVENTS_FIELD, Boolean.toString(sendStreamEvents));
+
+ if (sendStreamEvents) {
+ pluginSection.string(
+ "Stream events topic", STREAM_EVENTS_TOPIC_FIELD, DEFAULT_STREAM_EVENTS_TOPIC);
+ }
+
+ pluginSection.string(
+ "Number of subscribers", NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS);
+
+ pluginSection.string(
+ "Timeout for subscriber ACKs (secs)",
+ ACK_DEADLINE_SECONDS_FIELD,
+ DEFAULT_ACK_DEADLINE_SECONDS);
+
+ pluginSection.string(
+ "Timeout for subscriber connection (secs)",
+ SUBSCRIPTION_TIMEOUT_SECONDS_FIELD,
+ DEFAULT_SUBSCTIPRION_TIMEOUT);
+
+ pluginSection.string(
+ "Timeout for subscriber shutdown (secs)",
+ SHUTDOWN_TIMEOUT_SECONDS_FIELD,
+ DEFAULT_SHUTDOWN_TIMEOUT);
+
+ mandatoryField(GCLOUD_PROJECT_FIELD, "Gcloud Project name", null);
+ mandatoryField(SUBSCRIPTION_ID_FIELD, "Subscriber Id", gerritInstanceIdProvider.get());
+ mandatoryField(PRIVATE_KEY_LOCATION_FIELD, "Private key location", null);
+ }
+
+ private void mandatoryField(String fieldName, String description, String dv) {
+ String providedValue = pluginSection.string(description, fieldName, dv);
+
+ while (Strings.isNullOrEmpty(providedValue) && !ui.isBatch()) {
+ ui.message("'%s' is mandatory. Please specify a value.", fieldName);
+ providedValue = pluginSection.string(description, fieldName, dv);
+ }
+
+ if (Strings.isNullOrEmpty(providedValue) && ui.isBatch()) {
+ System.err.printf(
+ "FATAL [%s plugin]: Could not set '%s' in batch mode. %s will not work%n",
+ pluginName, fieldName, pluginName);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
index 50fad61..24663b5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
@@ -30,17 +30,25 @@
private PubSubApiModule pubSubApiModule;
private EnvironmentChecker environmentChecker;
+ private final PubSubConfiguration configuration;
@Inject
- public Module(PubSubApiModule pubSubApiModule, EnvironmentChecker environmentChecker) {
+ public Module(
+ PubSubApiModule pubSubApiModule,
+ EnvironmentChecker environmentChecker,
+ PubSubConfiguration configuration) {
this.pubSubApiModule = pubSubApiModule;
this.environmentChecker = environmentChecker;
+ this.configuration = configuration;
}
@Override
protected void configure() {
DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
- DynamicSet.bind(binder(), EventListener.class).to(PubSubEventListener.class);
+
+ if (configuration.isSendStreamEvents()) {
+ DynamicSet.bind(binder(), EventListener.class).to(PubSubEventListener.class);
+ }
factory(PubSubPublisher.Factory.class);
factory(PubSubEventSubscriber.Factory.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
index 614d0aa..8203f5c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
@@ -24,11 +24,22 @@
@Singleton
public class PubSubConfiguration {
- private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
- private static final String DEFAULT_ACK_DEADLINE_SECONDS = "10";
- private static final String DEFAULT_SUBSCTIPRION_TIMEOUT = "10";
- private static final String DEFAULT_SHUTDOWN_TIMEOUT = "10";
- private static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit";
+ static final String GCLOUD_PROJECT_FIELD = "gcloudProject";
+ static final String SUBSCRIPTION_ID_FIELD = "subscriptionId";
+ static final String PRIVATE_KEY_LOCATION_FIELD = "privateKeyLocation";
+ static final String STREAM_EVENTS_TOPIC_FIELD = "streamEventsTopic";
+ static final String SEND_STREAM_EVENTS_FIELD = "sendStreamEvents";
+ static final String NUMBER_OF_SUBSCRIBERS_FIELD = "numberOfSubscribers";
+ static final String ACK_DEADLINE_SECONDS_FIELD = "ackDeadlineSeconds";
+ static final String SUBSCRIPTION_TIMEOUT_SECONDS_FIELD = "subscribtionTimeoutInSeconds";
+ static final String SHUTDOWN_TIMEOUT_SECONDS_FIELD = "shutdownTimeoutInSeconds";
+
+ static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
+ static final String DEFAULT_ACK_DEADLINE_SECONDS = "10";
+ static final String DEFAULT_SUBSCTIPRION_TIMEOUT = "10";
+ static final String DEFAULT_SHUTDOWN_TIMEOUT = "10";
+ static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit";
+ static final boolean DEFAULT_SEND_STREAM_EVENTS = false;
private final String gcloudProject;
private final String subscriptionId;
@@ -39,6 +50,7 @@
private final Long shutdownTimeoutInSeconds;
private final String streamEventsTopic;
private final PluginConfig fromGerritConfig;
+ private final boolean sendStreamEvents;
@Inject
public PubSubConfiguration(
@@ -46,24 +58,26 @@
@PluginName String pluginName,
@Nullable @GerritInstanceId String instanceId) {
this.fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
- this.gcloudProject = getMandatoryString("gcloudProject");
- this.subscriptionId = getMandatoryString("subscriptionId", instanceId);
- this.privateKeyLocation = getMandatoryString("privateKeyLocation");
+ this.gcloudProject = getMandatoryString(GCLOUD_PROJECT_FIELD);
+ this.subscriptionId = getMandatoryString(SUBSCRIPTION_ID_FIELD, instanceId);
+ this.privateKeyLocation = getMandatoryString(PRIVATE_KEY_LOCATION_FIELD);
this.streamEventsTopic =
- fromGerritConfig.getString("streamEventsTopic", DEFAULT_STREAM_EVENTS_TOPIC);
+ fromGerritConfig.getString(STREAM_EVENTS_TOPIC_FIELD, DEFAULT_STREAM_EVENTS_TOPIC);
+ this.sendStreamEvents =
+ fromGerritConfig.getBoolean(SEND_STREAM_EVENTS_FIELD, DEFAULT_SEND_STREAM_EVENTS);
this.numberOfSubscribers =
Integer.parseInt(
- fromGerritConfig.getString("numberOfSubscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS));
+ fromGerritConfig.getString(NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS));
this.ackDeadlineSeconds =
Integer.parseInt(
- fromGerritConfig.getString("ackDeadlineSeconds", DEFAULT_ACK_DEADLINE_SECONDS));
+ fromGerritConfig.getString(ACK_DEADLINE_SECONDS_FIELD, DEFAULT_ACK_DEADLINE_SECONDS));
this.subscribtionTimeoutInSeconds =
Long.parseLong(
fromGerritConfig.getString(
- "subscribtionTimeoutInSeconds", DEFAULT_SUBSCTIPRION_TIMEOUT));
+ SUBSCRIPTION_TIMEOUT_SECONDS_FIELD, DEFAULT_SUBSCTIPRION_TIMEOUT));
this.shutdownTimeoutInSeconds =
Long.parseLong(
- fromGerritConfig.getString("shutdownTimeoutInSeconds", DEFAULT_SHUTDOWN_TIMEOUT));
+ fromGerritConfig.getString(SHUTDOWN_TIMEOUT_SECONDS_FIELD, DEFAULT_SHUTDOWN_TIMEOUT));
}
public String getGCloudProject() {
@@ -110,4 +124,8 @@
}
return value;
}
+
+ public boolean isSendStreamEvents() {
+ return sendStreamEvents;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
index df69922..2a030c9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
@@ -14,7 +14,6 @@
package com.googlesource.gerrit.plugins.pubsub;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
@@ -67,10 +66,6 @@
return publish(gson.toJson(event));
}
- public ListenableFuture<Boolean> publish(EventMessage event) {
- return publish(gson.toJson(event));
- }
-
private ListenableFuture<Boolean> publish(String eventPayload) {
ByteString data = ByteString.copyFromUtf8(eventPayload);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index f3263d3..ef5677d 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -47,3 +47,27 @@
: Optional. Name of the GCloud PubSub topic for stream events. events-gcloud-pubsub plugin exposes
all stream events under this topic name.
Default: gerrit
+
+`plugin.events-gcloud-pubsub.sendStreamEvents`
+: Whether to send stream events to the `streamEventsTopic` topic.
+ Default: false
+
+Gerrit init integration
+-----------------------
+
+The @PLUGIN@ plugin provides an init step that helps to set up the configuration.
+
+```
+*** events-gcloud-pubsub plugin
+***
+
+Should send stream events? [y/N]? y
+Stream events topic [gerrit]:
+Number of subscribers [6]:
+Timeout for subscriber ACKs (secs) [10]:
+Timeout for subscriber connection (secs) [10]:
+Timeout for subscriber shutdown (secs) [10]:
+Gcloud Project name : some_project
+Subscriber Id [6f174800-b1fa-477f-af49-26734f433280]:
+Private key location : /path/ssh/id_rsa
+```
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
index d202f25..e27e946 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
@@ -135,6 +135,7 @@
@GerritConfig(
name = "plugin.events-gcloud-pubsub.privateKeyLocation",
value = PRIVATE_KEY_LOCATION)
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.sendStreamEvents", value = "true")
public void shouldProduceStreamEvents() throws Exception {
String subscriptionId = "gerrit-subscription-id";
String topicId = "gerrit";
@@ -165,6 +166,26 @@
@GerritConfig(
name = "plugin.events-gcloud-pubsub.privateKeyLocation",
value = PRIVATE_KEY_LOCATION)
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.sendStreamEvents", value = "false")
+ public void shouldNotProduceStreamEventsWhenDisabled() throws Exception {
+ String subscriptionId = "gerrit-subscription-id";
+ String topicId = "gerrit";
+ createSubscription(subscriptionId, topicId, channelProvider, credentialsProvider);
+
+ createChange();
+
+ readMessageAndValidate(
+ (pullResponse) -> assertThat(pullResponse.getReceivedMessagesList()).isEmpty(),
+ PROJECT_ID,
+ subscriptionId);
+ }
+
+ @Test
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
+ @GerritConfig(
+ name = "plugin.events-gcloud-pubsub.privateKeyLocation",
+ value = PRIVATE_KEY_LOCATION)
public void shouldConsumeEvent() throws InterruptedException {
Event event = new ProjectCreatedEvent();
event.instanceId = DEFAULT_INSTANCE_ID;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
index 97a2c8d..d88f9d4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
@@ -19,13 +19,12 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.gerrit.json.OutputFormat;
+import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.ProjectCreatedEvent;
import java.io.IOException;
-import java.util.UUID;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -42,9 +41,7 @@
@Mock PubSubPublisherMetrics pubSubPublisherMetricsMock;
private static final String TOPIC = "foo";
- private static final EventMessage eventMessage =
- new EventMessage(
- new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()), new ProjectCreatedEvent());
+ private static final Event eventMessage = new ProjectCreatedEvent();
@Before
public void setUp() throws IOException {