Add stream event publishing timeout

Add timeout configuration for publishing stream events.

Also use StreamEventPublisherModule to bind StreamEventPublisher.

Bug: Issue 14907
Change-Id: I0f452eec502589a2ec18a748d72eb14cf8304b15
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 06cb71c..bfffbfe 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -9,7 +9,7 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.5.0-alpha-202108041529",
-        sha1 = "309fe8cc08c46593d9990d4e5c448cc85e5a62b0",
+        artifact = "com.gerritforge:events-broker:3.5.0-alpha-202108301155",
+        sha1 = "ef4d94bb4ba1d136cd90ea901776f03a25bcb517",
     )
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 81e1c12..8497882 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -57,6 +57,8 @@
   private static final String NUM_STRIPED_LOCKS = "numStripedLocks";
   private static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
 
+  private static final long DEFALULT_STREAM_EVENT_PUBLISH_TIMEOUT = 30000;
+
   private final Supplier<Cache> cache;
   private final Supplier<Event> event;
   private final Supplier<Index> index;
@@ -190,6 +192,17 @@
     }
   }
 
+  private static long getLong(
+      Supplier<Config> cfg, String section, String subSection, String name, long defaultValue) {
+    try {
+      return cfg.get().getLong(section, subSection, name, defaultValue);
+    } catch (IllegalArgumentException e) {
+      log.error("invalid value for {}; using default value {}", name, defaultValue);
+      log.debug("Failed to retrieve long value: {}", e.getMessage(), e);
+      return defaultValue;
+    }
+  }
+
   public static class Projects {
     public static final String SECTION = "projects";
     public static final String PATTERN_KEY = "pattern";
@@ -293,15 +306,28 @@
 
   public static class Broker {
     static final String BROKER_SECTION = "broker";
+    static final String STREAM_EVENT_PUBLISH_TIMEOUT = "streamEventPublishTimeoutMs";
     private final Config cfg;
+    private long streamEventPublishTimeout;
 
     Broker(Supplier<Config> cfgSupplier) {
       cfg = cfgSupplier.get();
+      streamEventPublishTimeout =
+          getLong(
+              cfgSupplier,
+              BROKER_SECTION,
+              null,
+              STREAM_EVENT_PUBLISH_TIMEOUT,
+              DEFALULT_STREAM_EVENT_PUBLISH_TIMEOUT);
     }
 
     public String getTopic(String topicKey, String defValue) {
       return MoreObjects.firstNonNull(cfg.getString(BROKER_SECTION, null, topicKey), defValue);
     }
+
+    public long getStreamEventPublishTimeout() {
+      return streamEventPublishTimeout;
+    }
   }
 
   static boolean getBoolean(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
index 94d7867..b055f26 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
@@ -14,13 +14,12 @@
 
 package com.googlesource.gerrit.plugins.multisite.event;
 
-import com.gerritforge.gerrit.eventbroker.StreamEventPublisher;
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisherConfig;
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisherModule;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.gerrit.server.events.EventListener;
 import com.google.inject.Inject;
-import com.google.inject.TypeLiteral;
-import com.google.inject.name.Names;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
@@ -36,10 +35,13 @@
   @Override
   protected void configure() {
     DynamicSet.bind(binder(), EventListener.class).to(ProjectVersionRefUpdate.class);
-    bind(new TypeLiteral<String>() {})
-        .annotatedWith(Names.named(StreamEventPublisher.STREAM_EVENTS_TOPIC))
-        .toInstance(EventTopic.STREAM_EVENT_TOPIC.topic(configuration));
 
-    DynamicSet.bind(binder(), EventListener.class).to(StreamEventPublisher.class);
+    bind(StreamEventPublisherConfig.class)
+        .toInstance(
+            new StreamEventPublisherConfig(
+                EventTopic.STREAM_EVENT_TOPIC.topic(configuration),
+                configuration.broker().getStreamEventPublishTimeout()));
+
+    install(new StreamEventPublisherModule());
   }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 7eedc6a..e217994 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -70,6 +70,10 @@
 :   Name of the topic to use for publishing cache eviction events
     Defaults to GERRIT.EVENT.PROJECT.LIST
 
+```broker.streamEventPublishTimeoutMs```
+:   The timeout in milliseconds for publishing stream events.
+    Defaults to 30000 (30 seconds).
+
 ```ref-database.enabled```
 :   Enable the use of a shared ref-database
     Defaults: true