Merge branch 'stable-3.4' into stable-3.5

* stable-3.4:
  Consume events-broker from source

Change-Id: I68d43e9e9d0cceabff780f869098f26c8207f6cc
diff --git a/BUILD b/BUILD
index c34e07c..e526117 100644
--- a/BUILD
+++ b/BUILD
@@ -12,6 +12,7 @@
     manifest_entries = [
         "Gerrit-PluginName: events-gcloud-pubsub",
         "Gerrit-Module: com.googlesource.gerrit.plugins.pubsub.Module",
+        "Gerrit-InitStep: com.googlesource.gerrit.plugins.pubsub.InitConfig",
         "Implementation-Title: Gerrit events listener to send events to an external GCloud PubSub broker",
         "Implementation-URL: https://gerrit.googlesource.com/plugins/events-gcloud-pubsub",
     ],
@@ -51,7 +52,6 @@
     tags = ["events-gcloud-pubsub"],
     deps = [
         ":events-gcloud-pubsub__plugin_test_deps",
-        "//lib/testcontainers",
         "//plugins/events-broker",
         "@api-common//jar",
         "@gax-grpc//jar",
@@ -86,10 +86,13 @@
     visibility = ["//visibility:public"],
     exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
         ":events-gcloud-pubsub__plugin",
-        "//lib/jackson:jackson-annotations",
-        "//lib/testcontainers",
-        "//lib/testcontainers:docker-java-api",
-        "//lib/testcontainers:docker-java-transport",
+        "@jackson-annotations//jar",
+        "@testcontainers//jar",
+        "@docker-java-api//jar",
+        "@docker-java-transport//jar",
+        "@duct-tape//jar",
+        "@visible-assertions//jar",
+        "@jna//jar",
         "@testcontainers-gcloud//jar",
         "@grpc-api//jar",
         "@gax-grpc//jar",
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index b2c0cbe..29a06a1 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -43,10 +43,56 @@
         sha1 = "11e565f1a65f7e2245238ac5c19875c0ddd25b14",
     )
 
+    TESTCONTAINERS_VERSION = "1.15.3"
+
+    maven_jar(
+        name = "testcontainers",
+        artifact = "org.testcontainers:testcontainers:" + TESTCONTAINERS_VERSION,
+        sha1 = "95c6cfde71c2209f0c29cb14e432471e0b111880",
+    )
+
     maven_jar(
         name = "testcontainers-gcloud",
-        artifact = "org.testcontainers:gcloud:1.15.2",
-        sha1 = "0ad02bb83edc818469e1080995cae409f5d40694",
+        artifact = "org.testcontainers:gcloud:" + TESTCONTAINERS_VERSION,
+        sha1 = "a2908fc7ed7f09df9124314114757314612826ff",
+    )
+
+    maven_jar(
+        name = "duct-tape",
+        artifact = "org.rnorth.duct-tape:duct-tape:1.0.8",
+        sha1 = "92edc22a9ab2f3e17c9bf700aaee377d50e8b530",
+    )
+
+    maven_jar(
+        name = "visible-assertions",
+        artifact = "org.rnorth.visible-assertions:visible-assertions:2.1.2",
+        sha1 = "20d31a578030ec8e941888537267d3123c2ad1c1",
+    )
+
+    maven_jar(
+        name = "jna",
+        artifact = "net.java.dev.jna:jna:5.5.0",
+        sha1 = "0e0845217c4907822403912ad6828d8e0b256208",
+    )
+
+    DOCKER_JAVA_VERS = "3.2.8"
+
+    maven_jar(
+        name = "docker-java-api",
+        artifact = "com.github.docker-java:docker-java-api:" + DOCKER_JAVA_VERS,
+        sha1 = "4ac22a72d546a9f3523cd4b5fabffa77c4a6ec7c",
+    )
+
+    maven_jar(
+        name = "docker-java-transport",
+        artifact = "com.github.docker-java:docker-java-transport:" + DOCKER_JAVA_VERS,
+        sha1 = "c3b5598c67d0a5e2e780bf48f520da26b9915eab",
+    )
+
+    maven_jar(
+        name = "jackson-annotations",
+        artifact = "com.fasterxml.jackson.core:jackson-annotations:2.10.3",
+        sha1 = "0f63b3b1da563767d04d2e4d3fc1ae0cdeffebe7",
     )
 
     maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/InitConfig.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/InitConfig.java
new file mode 100644
index 0000000..d7f15d6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/InitConfig.java
@@ -0,0 +1,107 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.pubsub;
+
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.ACK_DEADLINE_SECONDS_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_ACK_DEADLINE_SECONDS;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_NUMBER_OF_SUBSCRIBERS;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_SEND_STREAM_EVENTS;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_SHUTDOWN_TIMEOUT;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_STREAM_EVENTS_TOPIC;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.DEFAULT_SUBSCTIPRION_TIMEOUT;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.GCLOUD_PROJECT_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.NUMBER_OF_SUBSCRIBERS_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.PRIVATE_KEY_LOCATION_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.SEND_STREAM_EVENTS_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.SHUTDOWN_TIMEOUT_SECONDS_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.STREAM_EVENTS_TOPIC_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.SUBSCRIPTION_ID_FIELD;
+import static com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration.SUBSCRIPTION_TIMEOUT_SECONDS_FIELD;
+
+import com.google.common.base.Strings;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.pgm.init.api.ConsoleUI;
+import com.google.gerrit.pgm.init.api.InitStep;
+import com.google.gerrit.pgm.init.api.Section;
+import com.google.gerrit.server.config.GerritInstanceIdProvider;
+import com.google.inject.Inject;
+
+public class InitConfig implements InitStep {
+  private final Section pluginSection;
+  private final String pluginName;
+  private final ConsoleUI ui;
+  private final GerritInstanceIdProvider gerritInstanceIdProvider;
+
+  @Inject
+  InitConfig(
+      Section.Factory sections,
+      @PluginName String pluginName,
+      GerritInstanceIdProvider gerritInstanceIdProvider,
+      ConsoleUI ui) {
+    this.pluginName = pluginName;
+    this.ui = ui;
+    this.gerritInstanceIdProvider = gerritInstanceIdProvider;
+    this.pluginSection = sections.get("plugin", pluginName);
+  }
+
+  @Override
+  public void run() throws Exception {
+    ui.header(String.format("%s plugin", pluginName));
+
+    boolean sendStreamEvents = ui.yesno(DEFAULT_SEND_STREAM_EVENTS, "Should send stream events?");
+    pluginSection.set(SEND_STREAM_EVENTS_FIELD, Boolean.toString(sendStreamEvents));
+
+    if (sendStreamEvents) {
+      pluginSection.string(
+          "Stream events topic", STREAM_EVENTS_TOPIC_FIELD, DEFAULT_STREAM_EVENTS_TOPIC);
+    }
+
+    pluginSection.string(
+        "Number of subscribers", NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS);
+
+    pluginSection.string(
+        "Timeout for subscriber ACKs (secs)",
+        ACK_DEADLINE_SECONDS_FIELD,
+        DEFAULT_ACK_DEADLINE_SECONDS);
+
+    pluginSection.string(
+        "Timeout for subscriber connection (secs)",
+        SUBSCRIPTION_TIMEOUT_SECONDS_FIELD,
+        DEFAULT_SUBSCTIPRION_TIMEOUT);
+
+    pluginSection.string(
+        "Timeout for subscriber shutdown (secs)",
+        SHUTDOWN_TIMEOUT_SECONDS_FIELD,
+        DEFAULT_SHUTDOWN_TIMEOUT);
+
+    mandatoryField(GCLOUD_PROJECT_FIELD, "Gcloud Project name", null);
+    mandatoryField(SUBSCRIPTION_ID_FIELD, "Subscriber Id", gerritInstanceIdProvider.get());
+    mandatoryField(PRIVATE_KEY_LOCATION_FIELD, "Private key location", null);
+  }
+
+  private void mandatoryField(String fieldName, String description, String dv) {
+    String providedValue = pluginSection.string(description, fieldName, dv);
+
+    while (Strings.isNullOrEmpty(providedValue) && !ui.isBatch()) {
+      ui.message("'%s' is mandatory. Please specify a value.", fieldName);
+      providedValue = pluginSection.string(description, fieldName, dv);
+    }
+
+    if (Strings.isNullOrEmpty(providedValue) && ui.isBatch()) {
+      System.err.printf(
+          "FATAL [%s plugin]: Could not set '%s' in batch mode. %s will not work%n",
+          pluginName, fieldName, pluginName);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
index 50fad61..24663b5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
@@ -30,17 +30,25 @@
 
   private PubSubApiModule pubSubApiModule;
   private EnvironmentChecker environmentChecker;
+  private final PubSubConfiguration configuration;
 
   @Inject
-  public Module(PubSubApiModule pubSubApiModule, EnvironmentChecker environmentChecker) {
+  public Module(
+      PubSubApiModule pubSubApiModule,
+      EnvironmentChecker environmentChecker,
+      PubSubConfiguration configuration) {
     this.pubSubApiModule = pubSubApiModule;
     this.environmentChecker = environmentChecker;
+    this.configuration = configuration;
   }
 
   @Override
   protected void configure() {
     DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
-    DynamicSet.bind(binder(), EventListener.class).to(PubSubEventListener.class);
+
+    if (configuration.isSendStreamEvents()) {
+      DynamicSet.bind(binder(), EventListener.class).to(PubSubEventListener.class);
+    }
     factory(PubSubPublisher.Factory.class);
     factory(PubSubEventSubscriber.Factory.class);
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
index 614d0aa..8203f5c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
@@ -24,11 +24,22 @@
 
 @Singleton
 public class PubSubConfiguration {
-  private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
-  private static final String DEFAULT_ACK_DEADLINE_SECONDS = "10";
-  private static final String DEFAULT_SUBSCTIPRION_TIMEOUT = "10";
-  private static final String DEFAULT_SHUTDOWN_TIMEOUT = "10";
-  private static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit";
+  static final String GCLOUD_PROJECT_FIELD = "gcloudProject";
+  static final String SUBSCRIPTION_ID_FIELD = "subscriptionId";
+  static final String PRIVATE_KEY_LOCATION_FIELD = "privateKeyLocation";
+  static final String STREAM_EVENTS_TOPIC_FIELD = "streamEventsTopic";
+  static final String SEND_STREAM_EVENTS_FIELD = "sendStreamEvents";
+  static final String NUMBER_OF_SUBSCRIBERS_FIELD = "numberOfSubscribers";
+  static final String ACK_DEADLINE_SECONDS_FIELD = "ackDeadlineSeconds";
+  static final String SUBSCRIPTION_TIMEOUT_SECONDS_FIELD = "subscribtionTimeoutInSeconds";
+  static final String SHUTDOWN_TIMEOUT_SECONDS_FIELD = "shutdownTimeoutInSeconds";
+
+  static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
+  static final String DEFAULT_ACK_DEADLINE_SECONDS = "10";
+  static final String DEFAULT_SUBSCTIPRION_TIMEOUT = "10";
+  static final String DEFAULT_SHUTDOWN_TIMEOUT = "10";
+  static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit";
+  static final boolean DEFAULT_SEND_STREAM_EVENTS = false;
 
   private final String gcloudProject;
   private final String subscriptionId;
@@ -39,6 +50,7 @@
   private final Long shutdownTimeoutInSeconds;
   private final String streamEventsTopic;
   private final PluginConfig fromGerritConfig;
+  private final boolean sendStreamEvents;
 
   @Inject
   public PubSubConfiguration(
@@ -46,24 +58,26 @@
       @PluginName String pluginName,
       @Nullable @GerritInstanceId String instanceId) {
     this.fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
-    this.gcloudProject = getMandatoryString("gcloudProject");
-    this.subscriptionId = getMandatoryString("subscriptionId", instanceId);
-    this.privateKeyLocation = getMandatoryString("privateKeyLocation");
+    this.gcloudProject = getMandatoryString(GCLOUD_PROJECT_FIELD);
+    this.subscriptionId = getMandatoryString(SUBSCRIPTION_ID_FIELD, instanceId);
+    this.privateKeyLocation = getMandatoryString(PRIVATE_KEY_LOCATION_FIELD);
     this.streamEventsTopic =
-        fromGerritConfig.getString("streamEventsTopic", DEFAULT_STREAM_EVENTS_TOPIC);
+        fromGerritConfig.getString(STREAM_EVENTS_TOPIC_FIELD, DEFAULT_STREAM_EVENTS_TOPIC);
+    this.sendStreamEvents =
+        fromGerritConfig.getBoolean(SEND_STREAM_EVENTS_FIELD, DEFAULT_SEND_STREAM_EVENTS);
     this.numberOfSubscribers =
         Integer.parseInt(
-            fromGerritConfig.getString("numberOfSubscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS));
+            fromGerritConfig.getString(NUMBER_OF_SUBSCRIBERS_FIELD, DEFAULT_NUMBER_OF_SUBSCRIBERS));
     this.ackDeadlineSeconds =
         Integer.parseInt(
-            fromGerritConfig.getString("ackDeadlineSeconds", DEFAULT_ACK_DEADLINE_SECONDS));
+            fromGerritConfig.getString(ACK_DEADLINE_SECONDS_FIELD, DEFAULT_ACK_DEADLINE_SECONDS));
     this.subscribtionTimeoutInSeconds =
         Long.parseLong(
             fromGerritConfig.getString(
-                "subscribtionTimeoutInSeconds", DEFAULT_SUBSCTIPRION_TIMEOUT));
+                SUBSCRIPTION_TIMEOUT_SECONDS_FIELD, DEFAULT_SUBSCTIPRION_TIMEOUT));
     this.shutdownTimeoutInSeconds =
         Long.parseLong(
-            fromGerritConfig.getString("shutdownTimeoutInSeconds", DEFAULT_SHUTDOWN_TIMEOUT));
+            fromGerritConfig.getString(SHUTDOWN_TIMEOUT_SECONDS_FIELD, DEFAULT_SHUTDOWN_TIMEOUT));
   }
 
   public String getGCloudProject() {
@@ -110,4 +124,8 @@
     }
     return value;
   }
+
+  public boolean isSendStreamEvents() {
+    return sendStreamEvents;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
index df69922..2a030c9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
@@ -14,7 +14,6 @@
 
 package com.googlesource.gerrit.plugins.pubsub;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.api.core.ApiFuture;
 import com.google.api.core.ApiFutureCallback;
 import com.google.api.core.ApiFutures;
@@ -67,10 +66,6 @@
     return publish(gson.toJson(event));
   }
 
-  public ListenableFuture<Boolean> publish(EventMessage event) {
-    return publish(gson.toJson(event));
-  }
-
   private ListenableFuture<Boolean> publish(String eventPayload) {
     ByteString data = ByteString.copyFromUtf8(eventPayload);
     PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index f3263d3..ef5677d 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -47,3 +47,27 @@
 :   Optional. Name of the GCloud PubSub topic for stream events. events-gcloud-pubsub plugin exposes
     all stream events under this topic name.
     Default: gerrit
+
+`plugin.events-gcloud-pubsub.sendStreamEvents`
+:   Whether to send stream events to the `streamEventsTopic` topic.
+    Default: false
+
+Gerrit init integration
+-----------------------
+
+The @PLUGIN@ plugin provides an init step that helps to set up the configuration.
+
+```
+*** events-gcloud-pubsub plugin
+***
+
+Should send stream events?     [y/N]? y
+Stream events topic            [gerrit]:
+Number of subscribers          [6]:
+Timeout for subscriber ACKs (secs) [10]:
+Timeout for subscriber connection (secs) [10]:
+Timeout for subscriber shutdown (secs) [10]:
+Gcloud Project name            : some_project
+Subscriber Id                  [6f174800-b1fa-477f-af49-26734f433280]:
+Private key location           : /path/ssh/id_rsa
+```
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
index d202f25..e27e946 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
@@ -135,6 +135,7 @@
   @GerritConfig(
       name = "plugin.events-gcloud-pubsub.privateKeyLocation",
       value = PRIVATE_KEY_LOCATION)
+  @GerritConfig(name = "plugin.events-gcloud-pubsub.sendStreamEvents", value = "true")
   public void shouldProduceStreamEvents() throws Exception {
     String subscriptionId = "gerrit-subscription-id";
     String topicId = "gerrit";
@@ -165,6 +166,26 @@
   @GerritConfig(
       name = "plugin.events-gcloud-pubsub.privateKeyLocation",
       value = PRIVATE_KEY_LOCATION)
+  @GerritConfig(name = "plugin.events-gcloud-pubsub.sendStreamEvents", value = "false")
+  public void shouldNotProduceStreamEventsWhenDisabled() throws Exception {
+    String subscriptionId = "gerrit-subscription-id";
+    String topicId = "gerrit";
+    createSubscription(subscriptionId, topicId, channelProvider, credentialsProvider);
+
+    createChange();
+
+    readMessageAndValidate(
+        (pullResponse) -> assertThat(pullResponse.getReceivedMessagesList()).isEmpty(),
+        PROJECT_ID,
+        subscriptionId);
+  }
+
+  @Test
+  @GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
+  @GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
+  @GerritConfig(
+      name = "plugin.events-gcloud-pubsub.privateKeyLocation",
+      value = PRIVATE_KEY_LOCATION)
   public void shouldConsumeEvent() throws InterruptedException {
     Event event = new ProjectCreatedEvent();
     event.instanceId = DEFAULT_INSTANCE_ID;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
index 97a2c8d..d88f9d4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
@@ -19,13 +19,12 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.api.core.ApiFutures;
 import com.google.cloud.pubsub.v1.Publisher;
 import com.google.gerrit.json.OutputFormat;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
 import java.io.IOException;
-import java.util.UUID;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -42,9 +41,7 @@
   @Mock PubSubPublisherMetrics pubSubPublisherMetricsMock;
 
   private static final String TOPIC = "foo";
-  private static final EventMessage eventMessage =
-      new EventMessage(
-          new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()), new ProjectCreatedEvent());
+  private static final Event eventMessage = new ProjectCreatedEvent();
 
   @Before
   public void setUp() throws IOException {