Add stream event publishing timeout
Add timeout configuration for publishing stream events.
Also use StreamEventPublisherModule to bind StreamEventPublisher.
Bug: Issue 14907
Change-Id: I0f452eec502589a2ec18a748d72eb14cf8304b15
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 06cb71c..bfffbfe 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -9,7 +9,7 @@
maven_jar(
name = "events-broker",
- artifact = "com.gerritforge:events-broker:3.5.0-alpha-202108041529",
- sha1 = "309fe8cc08c46593d9990d4e5c448cc85e5a62b0",
+ artifact = "com.gerritforge:events-broker:3.5.0-alpha-202108301155",
+ sha1 = "ef4d94bb4ba1d136cd90ea901776f03a25bcb517",
)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 81e1c12..8497882 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -57,6 +57,8 @@
private static final String NUM_STRIPED_LOCKS = "numStripedLocks";
private static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
+ private static final long DEFALULT_STREAM_EVENT_PUBLISH_TIMEOUT = 30000;
+
private final Supplier<Cache> cache;
private final Supplier<Event> event;
private final Supplier<Index> index;
@@ -190,6 +192,17 @@
}
}
+ private static long getLong(
+ Supplier<Config> cfg, String section, String subSection, String name, long defaultValue) {
+ try {
+ return cfg.get().getLong(section, subSection, name, defaultValue);
+ } catch (IllegalArgumentException e) {
+ log.error("invalid value for {}; using default value {}", name, defaultValue);
+ log.debug("Failed to retrieve long value: {}", e.getMessage(), e);
+ return defaultValue;
+ }
+ }
+
public static class Projects {
public static final String SECTION = "projects";
public static final String PATTERN_KEY = "pattern";
@@ -293,15 +306,28 @@
public static class Broker {
static final String BROKER_SECTION = "broker";
+ static final String STREAM_EVENT_PUBLISH_TIMEOUT = "streamEventPublishTimeoutMs";
private final Config cfg;
+ private long streamEventPublishTimeout;
Broker(Supplier<Config> cfgSupplier) {
cfg = cfgSupplier.get();
+ streamEventPublishTimeout =
+ getLong(
+ cfgSupplier,
+ BROKER_SECTION,
+ null,
+ STREAM_EVENT_PUBLISH_TIMEOUT,
+ DEFALULT_STREAM_EVENT_PUBLISH_TIMEOUT);
}
public String getTopic(String topicKey, String defValue) {
return MoreObjects.firstNonNull(cfg.getString(BROKER_SECTION, null, topicKey), defValue);
}
+
+ public long getStreamEventPublishTimeout() {
+ return streamEventPublishTimeout;
+ }
}
static boolean getBoolean(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
index 94d7867..b055f26 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
@@ -14,13 +14,12 @@
package com.googlesource.gerrit.plugins.multisite.event;
-import com.gerritforge.gerrit.eventbroker.StreamEventPublisher;
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisherConfig;
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisherModule;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.events.EventListener;
import com.google.inject.Inject;
-import com.google.inject.TypeLiteral;
-import com.google.inject.name.Names;
import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
@@ -36,10 +35,13 @@
@Override
protected void configure() {
DynamicSet.bind(binder(), EventListener.class).to(ProjectVersionRefUpdate.class);
- bind(new TypeLiteral<String>() {})
- .annotatedWith(Names.named(StreamEventPublisher.STREAM_EVENTS_TOPIC))
- .toInstance(EventTopic.STREAM_EVENT_TOPIC.topic(configuration));
- DynamicSet.bind(binder(), EventListener.class).to(StreamEventPublisher.class);
+ bind(StreamEventPublisherConfig.class)
+ .toInstance(
+ new StreamEventPublisherConfig(
+ EventTopic.STREAM_EVENT_TOPIC.topic(configuration),
+ configuration.broker().getStreamEventPublishTimeout()));
+
+ install(new StreamEventPublisherModule());
}
}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 7eedc6a..e217994 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -70,6 +70,10 @@
: Name of the topic to use for publishing cache eviction events
Defaults to GERRIT.EVENT.PROJECT.LIST
+```broker.streamEventPublishTimeoutMs```
+: The timeout in milliseconds for publishing stream events.
+ Defaults to 30000 (30 seconds).
+
```ref-database.enabled```
: Enable the use of a shared ref-database
Defaults: true