Merge changes Ib4153e78,I8569d093
* changes:
Introduce InitStep
Only publish stream events when sendStreamEvents is set.
diff --git a/BUILD b/BUILD
index 9714d33..40c9a43 100644
--- a/BUILD
+++ b/BUILD
@@ -12,6 +12,7 @@
manifest_entries = [
"Gerrit-PluginName: events-gcloud-pubsub",
"Gerrit-Module: com.googlesource.gerrit.plugins.pubsub.Module",
+ "Gerrit-InitStep: com.googlesource.gerrit.plugins.pubsub.InitConfig",
"Implementation-Title: Gerrit events listener to send events to an external GCloud PubSub broker",
"Implementation-URL: https://gerrit.googlesource.com/plugins/events-gcloud-pubsub",
],
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/InitConfig.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/InitConfig.java
new file mode 100644
index 0000000..d7f15d6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/InitConfig.java
@@ -0,0 +1,107 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.pubsub;
+
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.ACK_DEADLINE_SECONDS_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_ACK_DEADLINE_SECONDS;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_NUMBER_OF_SUBSCRIBERS;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_SEND_STREAM_EVENTS;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_SHUTDOWN_TIMEOUT;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_STREAM_EVENTS_TOPIC;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_SUBSCTIPRION_TIMEOUT;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.GCLOUD_PROJECT_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.NUMBER_OF_SUBSCRIBERS_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.PRIVATE_KEY_LOCATION_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.SEND_STREAM_EVENTS_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.SHUTDOWN_TIMEOUT_SECONDS_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.STREAM_EVENTS_TOPIC_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.SUBSCRIPTION_ID_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.SUBSCRIPTION_TIMEOUT_SECONDS_FIELD;
+
+import com.google.common.base.Strings;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.pgm.init.api.ConsoleUI;
+import com.google.gerrit.pgm.init.api.InitStep;
+import com.google.gerrit.pgm.init.api.Section;
+import com.google.gerrit.server.config.GerritInstanceIdProvider;
+import com.google.inject.Inject;
+
+public class InitConfig implements InitStep {
+ private final Section pluginSection;
+ private final String pluginName;
+ private final ConsoleUI ui;
+ private final GerritInstanceIdProvider gerritInstanceIdProvider;
+
+ @Inject
+ InitConfig(
+ Section.Factory sections,
+ @PluginName String pluginName,
+ GerritInstanceIdProvider gerritInstanceIdProvider,
+ ConsoleUI ui) {
+ this.pluginName = pluginName;
+ this.ui = ui;
+ this.gerritInstanceIdProvider = gerritInstanceIdProvider;
+ this.pluginSection = sections.get("plugin", pluginName);
+ }
+
+ @Override
+ public void run() throws Exception {
+ ui.header(String.format("%s plugin", pluginName));
+
+ boolean sendStreamEvents = ui.yesno(DEFAULT_SEND_STREAM_EVENTS, "Should send stream events?");
+ pluginSection.set(SEND_STREAM_EVENTS_FIELD, Boolean.toString(sendStreamEvents));
+
+ if (sendStreamEvents) {
+ pluginSection.string(
+ "Stream events topic", STREAM_EVENTS_TOPIC_FIELD, DEFAULT_STREAM_EVENTS_TOPIC);
+ }
+
+ pluginSection.string(
+ "Number of subscribers", NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS);
+
+ pluginSection.string(
+ "Timeout for subscriber ACKs (secs)",
+ ACK_DEADLINE_SECONDS_FIELD,
+ DEFAULT_ACK_DEADLINE_SECONDS);
+
+ pluginSection.string(
+ "Timeout for subscriber connection (secs)",
+ SUBSCRIPTION_TIMEOUT_SECONDS_FIELD,
+ DEFAULT_SUBSCTIPRION_TIMEOUT);
+
+ pluginSection.string(
+ "Timeout for subscriber shutdown (secs)",
+ SHUTDOWN_TIMEOUT_SECONDS_FIELD,
+ DEFAULT_SHUTDOWN_TIMEOUT);
+
+ mandatoryField(GCLOUD_PROJECT_FIELD, "Gcloud Project name", null);
+ mandatoryField(SUBSCRIPTION_ID_FIELD, "Subscriber Id", gerritInstanceIdProvider.get());
+ mandatoryField(PRIVATE_KEY_LOCATION_FIELD, "Private key location", null);
+ }
+
+ private void mandatoryField(String fieldName, String description, String dv) {
+ String providedValue = pluginSection.string(description, fieldName, dv);
+
+ while (Strings.isNullOrEmpty(providedValue) && !ui.isBatch()) {
+ ui.message("'%s' is mandatory. Please specify a value.", fieldName);
+ providedValue = pluginSection.string(description, fieldName, dv);
+ }
+
+ if (Strings.isNullOrEmpty(providedValue) && ui.isBatch()) {
+ System.err.printf(
+ "FATAL [%s plugin]: Could not set '%s' in batch mode. %s will not work%n",
+ pluginName, fieldName, pluginName);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
index 50fad61..24663b5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
@@ -30,17 +30,25 @@
private PubSubApiModule pubSubApiModule;
private EnvironmentChecker environmentChecker;
+ private final PubSubConfiguration configuration;
@Inject
- public Module(PubSubApiModule pubSubApiModule, EnvironmentChecker environmentChecker) {
+ public Module(
+ PubSubApiModule pubSubApiModule,
+ EnvironmentChecker environmentChecker,
+ PubSubConfiguration configuration) {
this.pubSubApiModule = pubSubApiModule;
this.environmentChecker = environmentChecker;
+ this.configuration = configuration;
}
@Override
protected void configure() {
DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
- DynamicSet.bind(binder(), EventListener.class).to(PubSubEventListener.class);
+
+ if (configuration.isSendStreamEvents()) {
+ DynamicSet.bind(binder(), EventListener.class).to(PubSubEventListener.class);
+ }
factory(PubSubPublisher.Factory.class);
factory(PubSubEventSubscriber.Factory.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
index 614d0aa..8203f5c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
@@ -24,11 +24,22 @@
@Singleton
public class PubSubConfiguration {
- private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
- private static final String DEFAULT_ACK_DEADLINE_SECONDS = "10";
- private static final String DEFAULT_SUBSCTIPRION_TIMEOUT = "10";
- private static final String DEFAULT_SHUTDOWN_TIMEOUT = "10";
- private static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit";
+ static final String GCLOUD_PROJECT_FIELD = "gcloudProject";
+ static final String SUBSCRIPTION_ID_FIELD = "subscriptionId";
+ static final String PRIVATE_KEY_LOCATION_FIELD = "privateKeyLocation";
+ static final String STREAM_EVENTS_TOPIC_FIELD = "streamEventsTopic";
+ static final String SEND_STREAM_EVENTS_FIELD = "sendStreamEvents";
+ static final String NUMBER_OF_SUBSCRIBERS_FIELD = "numberOfSubscribers";
+ static final String ACK_DEADLINE_SECONDS_FIELD = "ackDeadlineSeconds";
+ static final String SUBSCRIPTION_TIMEOUT_SECONDS_FIELD = "subscribtionTimeoutInSeconds";
+ static final String SHUTDOWN_TIMEOUT_SECONDS_FIELD = "shutdownTimeoutInSeconds";
+
+ static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
+ static final String DEFAULT_ACK_DEADLINE_SECONDS = "10";
+ static final String DEFAULT_SUBSCTIPRION_TIMEOUT = "10";
+ static final String DEFAULT_SHUTDOWN_TIMEOUT = "10";
+ static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit";
+ static final boolean DEFAULT_SEND_STREAM_EVENTS = false;
private final String gcloudProject;
private final String subscriptionId;
@@ -39,6 +50,7 @@
private final Long shutdownTimeoutInSeconds;
private final String streamEventsTopic;
private final PluginConfig fromGerritConfig;
+ private final boolean sendStreamEvents;
@Inject
public PubSubConfiguration(
@@ -46,24 +58,26 @@
@PluginName String pluginName,
@Nullable @GerritInstanceId String instanceId) {
this.fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
- this.gcloudProject = getMandatoryString("gcloudProject");
- this.subscriptionId = getMandatoryString("subscriptionId", instanceId);
- this.privateKeyLocation = getMandatoryString("privateKeyLocation");
+ this.gcloudProject = getMandatoryString(GCLOUD_PROJECT_FIELD);
+ this.subscriptionId = getMandatoryString(SUBSCRIPTION_ID_FIELD, instanceId);
+ this.privateKeyLocation = getMandatoryString(PRIVATE_KEY_LOCATION_FIELD);
this.streamEventsTopic =
- fromGerritConfig.getString("streamEventsTopic", DEFAULT_STREAM_EVENTS_TOPIC);
+ fromGerritConfig.getString(STREAM_EVENTS_TOPIC_FIELD, DEFAULT_STREAM_EVENTS_TOPIC);
+ this.sendStreamEvents =
+ fromGerritConfig.getBoolean(SEND_STREAM_EVENTS_FIELD, DEFAULT_SEND_STREAM_EVENTS);
this.numberOfSubscribers =
Integer.parseInt(
- fromGerritConfig.getString("numberOfSubscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS));
+ fromGerritConfig.getString(NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS));
this.ackDeadlineSeconds =
Integer.parseInt(
- fromGerritConfig.getString("ackDeadlineSeconds", DEFAULT_ACK_DEADLINE_SECONDS));
+ fromGerritConfig.getString(ACK_DEADLINE_SECONDS_FIELD, DEFAULT_ACK_DEADLINE_SECONDS));
this.subscribtionTimeoutInSeconds =
Long.parseLong(
fromGerritConfig.getString(
- "subscribtionTimeoutInSeconds", DEFAULT_SUBSCTIPRION_TIMEOUT));
+ SUBSCRIPTION_TIMEOUT_SECONDS_FIELD, DEFAULT_SUBSCTIPRION_TIMEOUT));
this.shutdownTimeoutInSeconds =
Long.parseLong(
- fromGerritConfig.getString("shutdownTimeoutInSeconds", DEFAULT_SHUTDOWN_TIMEOUT));
+ fromGerritConfig.getString(SHUTDOWN_TIMEOUT_SECONDS_FIELD, DEFAULT_SHUTDOWN_TIMEOUT));
}
public String getGCloudProject() {
@@ -110,4 +124,8 @@
}
return value;
}
+
+ public boolean isSendStreamEvents() {
+ return sendStreamEvents;
+ }
}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index f3263d3..ef5677d 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -47,3 +47,27 @@
: Optional. Name of the GCloud PubSub topic for stream events. events-gcloud-pubsub plugin exposes
all stream events under this topic name.
Default: gerrit
+
+`plugin.events-gcloud-pubsub.sendStreamEvents`
+: Whether to send stream events to the `streamEventsTopic` topic.
+ Default: false
+
+Gerrit init integration
+-----------------------
+
+The @PLUGIN@ plugin provides an init step that helps to set up the configuration.
+
+```
+*** events-gcloud-pubsub plugin
+***
+
+Should send stream events? [y/N]? y
+Stream events topic [gerrit]:
+Number of subscribers [6]:
+Timeout for subscriber ACKs (secs) [10]:
+Timeout for subscriber connection (secs) [10]:
+Timeout for subscriber shutdown (secs) [10]:
+Gcloud Project name : some_project
+Subscriber Id [6f174800-b1fa-477f-af49-26734f433280]:
+Private key location : /path/ssh/id_rsa
+```
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
index d202f25..e27e946 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
@@ -135,6 +135,7 @@
@GerritConfig(
name = "plugin.events-gcloud-pubsub.privateKeyLocation",
value = PRIVATE_KEY_LOCATION)
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.sendStreamEvents", value = "true")
public void shouldProduceStreamEvents() throws Exception {
String subscriptionId = "gerrit-subscription-id";
String topicId = "gerrit";
@@ -165,6 +166,26 @@
@GerritConfig(
name = "plugin.events-gcloud-pubsub.privateKeyLocation",
value = PRIVATE_KEY_LOCATION)
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.sendStreamEvents", value = "false")
+ public void shouldNotProduceStreamEventsWhenDisabled() throws Exception {
+ String subscriptionId = "gerrit-subscription-id";
+ String topicId = "gerrit";
+ createSubscription(subscriptionId, topicId, channelProvider, credentialsProvider);
+
+ createChange();
+
+ readMessageAndValidate(
+ (pullResponse) -> assertThat(pullResponse.getReceivedMessagesList()).isEmpty(),
+ PROJECT_ID,
+ subscriptionId);
+ }
+
+ @Test
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
+ @GerritConfig(
+ name = "plugin.events-gcloud-pubsub.privateKeyLocation",
+ value = PRIVATE_KEY_LOCATION)
public void shouldConsumeEvent() throws InterruptedException {
Event event = new ProjectCreatedEvent();
event.instanceId = DEFAULT_INSTANCE_ID;