Introduce timeout for stream events publishing
Add message publishing timeout to avoid issue when broker is not
returning ACK when publishing the message.
Also move stream events publishing classes to a separate package.
Bug: Issue 14909
Change-Id: Ic07689f082726550daf4b6747b393aad8c6d267d
diff --git a/README.md b/README.md
index 16d3f24..f5f983b 100644
--- a/README.md
+++ b/README.md
@@ -14,27 +14,27 @@
a generic stream events publisher which will perform the relevant operations.
In order to listen and stream gerrit events, consumers of this API need to
-provide a named annotation with the name of the stream events topic and
+provide a binding for the `StreamEventPublisherConfig` configuration and
`java.util.concurrent.Executor` binding annotated with `StreamEventPublisherExecutor`
annotation. A default single threaded implementation (`StreamEventPublisherExecutor`)
is provided by the library. The last step is to explicitly bind the Stream Events
Publisher, as such:
```java
-import com.gerritforge.gerrit.eventbroker.StreamEventPublisher;
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisher;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.events.EventListener;
import com.google.inject.AbstractModule;
-import com.google.inject.TypeLiteral;
-import com.google.inject.name.Names;
public class SomeModule extends AbstractModule {
@Override
protected void configure() {
- bind(new TypeLiteral<String>() {
- })
- .annotatedWith(Names.named(StreamEventPublisher.STREAM_EVENTS_TOPIC))
- .toInstance("name_of_the_stream_events_topic");
+ long messagePublishingTimeout = 1000L;
+
+ bind(StreamEventPublisherConfig.class)
+ .toInstance(new StreamEventPublisherConfig(
+ "name_of_the_stream_events_topic",
+ messagePublishingTimeout));
bind(Executor.class).annotatedWith(StreamEventPublisherExecutor.class).toProvider(StreamEventPublisherExecutorProvider.class);
DynamicSet.bind(binder(), EventListener.class).to(StreamEventPublisher.class);
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisher.java b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisher.java
similarity index 77%
rename from src/main/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisher.java
rename to src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisher.java
index e7db00b..f064e22 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisher.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisher.java
@@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.gerritforge.gerrit.eventbroker;
+package com.gerritforge.gerrit.eventbroker.publisher;
-import com.gerritforge.gerrit.eventbroker.executor.StreamEventPublisherExecutor;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.publisher.executor.StreamEventPublisherExecutor;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.config.GerritInstanceId;
@@ -22,28 +23,28 @@
import com.google.gerrit.server.events.EventListener;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.google.inject.name.Named;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
@Singleton
public class StreamEventPublisher implements EventListener {
private static final FluentLogger log = FluentLogger.forEnclosingClass();
- public static final String STREAM_EVENTS_TOPIC = "stream_events_topic";
private final DynamicItem<BrokerApi> brokerApiDynamicItem;
- private final String streamEventsTopic;
+ private final StreamEventPublisherConfig config;
private final Executor executor;
private final String instanceId;
@Inject
public StreamEventPublisher(
DynamicItem<BrokerApi> brokerApi,
- @Named(STREAM_EVENTS_TOPIC) String streamEventsTopic,
+ StreamEventPublisherConfig config,
@StreamEventPublisherExecutor Executor executor,
@Nullable @GerritInstanceId String instanceId) {
this.brokerApiDynamicItem = brokerApi;
- this.streamEventsTopic = streamEventsTopic;
+ this.config = config;
this.executor = executor;
this.instanceId = instanceId;
}
@@ -67,13 +68,20 @@
@Override
public void run() {
if (brokerApi != null && shouldSend(event)) {
+ String streamEventTopic = config.getStreamEventsTopic();
try {
- brokerApi.send(streamEventsTopic, event).get();
+ brokerApi
+ .send(streamEventTopic, event)
+ .get(config.getPublishingTimeoutInMillis(), TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ log.atSevere().withCause(e).log(
+ "Timeout when publishing event '{}' to topic '{}", event, streamEventTopic);
+
} catch (Throwable e) {
log.atSevere().withCause(e).log(
"Failed to publish event '{}' to topic '{}' - error: {} - stack trace: {}",
event,
- streamEventsTopic,
+ streamEventTopic,
e.getMessage(),
e.getStackTrace());
}
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisherConfig.java b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisherConfig.java
new file mode 100644
index 0000000..9558df9
--- /dev/null
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisherConfig.java
@@ -0,0 +1,33 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker.publisher;
+
+public class StreamEventPublisherConfig {
+ private final String streamEventsTopic;
+ private final long publishingTimeoutInMillis;
+
+ public StreamEventPublisherConfig(String streamEventsTopic, long publishingTimeoutInMillis) {
+ this.streamEventsTopic = streamEventsTopic;
+ this.publishingTimeoutInMillis = publishingTimeoutInMillis;
+ }
+
+ public String getStreamEventsTopic() {
+ return streamEventsTopic;
+ }
+
+ public long getPublishingTimeoutInMillis() {
+ return publishingTimeoutInMillis;
+ }
+}
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/executor/StreamEventPublisherExecutor.java b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/executor/StreamEventPublisherExecutor.java
similarity index 92%
rename from src/main/java/com/gerritforge/gerrit/eventbroker/executor/StreamEventPublisherExecutor.java
rename to src/main/java/com/gerritforge/gerrit/eventbroker/publisher/executor/StreamEventPublisherExecutor.java
index 840ba07..c1289c1 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/executor/StreamEventPublisherExecutor.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/executor/StreamEventPublisherExecutor.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.gerritforge.gerrit.eventbroker.executor;
+package com.gerritforge.gerrit.eventbroker.publisher.executor;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/executor/StreamEventPublisherExecutorProvider.java b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/executor/StreamEventPublisherExecutorProvider.java
similarity index 95%
rename from src/main/java/com/gerritforge/gerrit/eventbroker/executor/StreamEventPublisherExecutorProvider.java
rename to src/main/java/com/gerritforge/gerrit/eventbroker/publisher/executor/StreamEventPublisherExecutorProvider.java
index e59ce6a..25394ae 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/executor/StreamEventPublisherExecutorProvider.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/executor/StreamEventPublisherExecutorProvider.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.gerritforge.gerrit.eventbroker.executor;
+package com.gerritforge.gerrit.eventbroker.publisher.executor;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
diff --git a/src/test/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisherTest.java b/src/test/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisherTest.java
index 3c26381..8f56ff1 100644
--- a/src/test/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisherTest.java
+++ b/src/test/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisherTest.java
@@ -20,6 +20,8 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisher;
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisherConfig;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.registration.DynamicItem;
@@ -37,6 +39,9 @@
@Mock private DynamicItem<BrokerApi> brokerApiDynamicItem;
@Mock private BrokerApi brokerApi;
private static final String STREAM_EVENTS_TOPIC = "stream-test-topic";
+ private static final long PUBLISHING_TIMEOUT = 1000L;
+ private static final StreamEventPublisherConfig config =
+ new StreamEventPublisherConfig(STREAM_EVENTS_TOPIC, PUBLISHING_TIMEOUT);
private static final String INSTANCE_ID = "instance-id";
private static final Executor EXECUTOR = MoreExecutors.directExecutor();
@@ -46,8 +51,7 @@
public void setup() {
when(brokerApiDynamicItem.get()).thenReturn(brokerApi);
when(brokerApi.send(any(), any())).thenReturn(Futures.immediateFuture(true));
- objectUnderTest =
- new StreamEventPublisher(brokerApiDynamicItem, STREAM_EVENTS_TOPIC, EXECUTOR, INSTANCE_ID);
+ objectUnderTest = new StreamEventPublisher(brokerApiDynamicItem, config, EXECUTOR, INSTANCE_ID);
}
@Test
@@ -64,8 +68,7 @@
Event event = new ProjectCreatedEvent();
event.instanceId = null;
- objectUnderTest =
- new StreamEventPublisher(brokerApiDynamicItem, STREAM_EVENTS_TOPIC, EXECUTOR, null);
+ objectUnderTest = new StreamEventPublisher(brokerApiDynamicItem, config, EXECUTOR, null);
objectUnderTest.onEvent(event);
verify(brokerApi, times(1)).send(STREAM_EVENTS_TOPIC, event);
}
@@ -75,8 +78,7 @@
Event event = new ProjectCreatedEvent();
event.instanceId = INSTANCE_ID;
- objectUnderTest =
- new StreamEventPublisher(brokerApiDynamicItem, STREAM_EVENTS_TOPIC, EXECUTOR, null);
+ objectUnderTest = new StreamEventPublisher(brokerApiDynamicItem, config, EXECUTOR, null);
objectUnderTest.onEvent(event);
verify(brokerApi, never()).send(STREAM_EVENTS_TOPIC, event);
}