Introduce timeout for stream events publishing

Add message publishing timeout to avoid issue when broker is not
returning ACK when publishing the message.

Also move stream events publishing classes to a separate package.

Bug: Issue 14909
Change-Id: Ic07689f082726550daf4b6747b393aad8c6d267d
diff --git a/README.md b/README.md
index 16d3f24..f5f983b 100644
--- a/README.md
+++ b/README.md
@@ -14,27 +14,27 @@
 a generic stream events publisher which will perform the relevant operations.
 
 In order to listen and stream gerrit events, consumers of this API need to
-provide a named annotation with the name of the stream events topic and
+provide a binding for the `StreamEventPublisherConfig` configuration and
 `java.util.concurrent.Executor` binding annotated with `StreamEventPublisherExecutor`
 annotation. A default single threaded implementation (`StreamEventPublisherExecutor`)
 is provided by the library. The last step is to explicitly bind the Stream Events
 Publisher, as such:
 
 ```java
-import com.gerritforge.gerrit.eventbroker.StreamEventPublisher;
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisher;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventListener;
 import com.google.inject.AbstractModule;
-import com.google.inject.TypeLiteral;
-import com.google.inject.name.Names;
 
 public class SomeModule extends AbstractModule {
     @Override
     protected void configure() {
-        bind(new TypeLiteral<String>() {
-        })
-                .annotatedWith(Names.named(StreamEventPublisher.STREAM_EVENTS_TOPIC))
-                .toInstance("name_of_the_stream_events_topic");
+        long messagePublishingTimeout = 1000L;
+
+        bind(StreamEventPublisherConfig.class)
+                .toInstance(new StreamEventPublisherConfig(
+                    "name_of_the_stream_events_topic",
+                    messagePublishingTimeout));
         
         bind(Executor.class).annotatedWith(StreamEventPublisherExecutor.class).toProvider(StreamEventPublisherExecutorProvider.class);
         DynamicSet.bind(binder(), EventListener.class).to(StreamEventPublisher.class);
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisher.java b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisher.java
similarity index 77%
rename from src/main/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisher.java
rename to src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisher.java
index e7db00b..f064e22 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisher.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisher.java
@@ -12,9 +12,10 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.gerritforge.gerrit.eventbroker;
+package com.gerritforge.gerrit.eventbroker.publisher;
 
-import com.gerritforge.gerrit.eventbroker.executor.StreamEventPublisherExecutor;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.publisher.executor.StreamEventPublisherExecutor;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.config.GerritInstanceId;
@@ -22,28 +23,28 @@
 import com.google.gerrit.server.events.EventListener;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.google.inject.name.Named;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 
 @Singleton
 public class StreamEventPublisher implements EventListener {
   private static final FluentLogger log = FluentLogger.forEnclosingClass();
 
-  public static final String STREAM_EVENTS_TOPIC = "stream_events_topic";
   private final DynamicItem<BrokerApi> brokerApiDynamicItem;
-  private final String streamEventsTopic;
+  private final StreamEventPublisherConfig config;
   private final Executor executor;
   private final String instanceId;
 
   @Inject
   public StreamEventPublisher(
       DynamicItem<BrokerApi> brokerApi,
-      @Named(STREAM_EVENTS_TOPIC) String streamEventsTopic,
+      StreamEventPublisherConfig config,
       @StreamEventPublisherExecutor Executor executor,
       @Nullable @GerritInstanceId String instanceId) {
     this.brokerApiDynamicItem = brokerApi;
-    this.streamEventsTopic = streamEventsTopic;
+    this.config = config;
     this.executor = executor;
     this.instanceId = instanceId;
   }
@@ -67,13 +68,20 @@
     @Override
     public void run() {
       if (brokerApi != null && shouldSend(event)) {
+        String streamEventTopic = config.getStreamEventsTopic();
         try {
-          brokerApi.send(streamEventsTopic, event).get();
+          brokerApi
+              .send(streamEventTopic, event)
+              .get(config.getPublishingTimeoutInMillis(), TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          log.atSevere().withCause(e).log(
+              "Timeout when publishing event '{}' to topic '{}", event, streamEventTopic);
+
         } catch (Throwable e) {
           log.atSevere().withCause(e).log(
               "Failed to publish event '{}' to topic '{}' - error: {} - stack trace: {}",
               event,
-              streamEventsTopic,
+              streamEventTopic,
               e.getMessage(),
               e.getStackTrace());
         }
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisherConfig.java b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisherConfig.java
new file mode 100644
index 0000000..9558df9
--- /dev/null
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/StreamEventPublisherConfig.java
@@ -0,0 +1,33 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker.publisher;
+
+public class StreamEventPublisherConfig {
+  private final String streamEventsTopic;
+  private final long publishingTimeoutInMillis;
+
+  public StreamEventPublisherConfig(String streamEventsTopic, long publishingTimeoutInMillis) {
+    this.streamEventsTopic = streamEventsTopic;
+    this.publishingTimeoutInMillis = publishingTimeoutInMillis;
+  }
+
+  public String getStreamEventsTopic() {
+    return streamEventsTopic;
+  }
+
+  public long getPublishingTimeoutInMillis() {
+    return publishingTimeoutInMillis;
+  }
+}
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/executor/StreamEventPublisherExecutor.java b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/executor/StreamEventPublisherExecutor.java
similarity index 92%
rename from src/main/java/com/gerritforge/gerrit/eventbroker/executor/StreamEventPublisherExecutor.java
rename to src/main/java/com/gerritforge/gerrit/eventbroker/publisher/executor/StreamEventPublisherExecutor.java
index 840ba07..c1289c1 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/executor/StreamEventPublisherExecutor.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/executor/StreamEventPublisherExecutor.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.gerritforge.gerrit.eventbroker.executor;
+package com.gerritforge.gerrit.eventbroker.publisher.executor;
 
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/executor/StreamEventPublisherExecutorProvider.java b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/executor/StreamEventPublisherExecutorProvider.java
similarity index 95%
rename from src/main/java/com/gerritforge/gerrit/eventbroker/executor/StreamEventPublisherExecutorProvider.java
rename to src/main/java/com/gerritforge/gerrit/eventbroker/publisher/executor/StreamEventPublisherExecutorProvider.java
index e59ce6a..25394ae 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/executor/StreamEventPublisherExecutorProvider.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/publisher/executor/StreamEventPublisherExecutorProvider.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.gerritforge.gerrit.eventbroker.executor;
+package com.gerritforge.gerrit.eventbroker.publisher.executor;
 
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
diff --git a/src/test/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisherTest.java b/src/test/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisherTest.java
index 3c26381..8f56ff1 100644
--- a/src/test/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisherTest.java
+++ b/src/test/java/com/gerritforge/gerrit/eventbroker/StreamEventPublisherTest.java
@@ -20,6 +20,8 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisher;
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisherConfig;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gerrit.extensions.registration.DynamicItem;
@@ -37,6 +39,9 @@
   @Mock private DynamicItem<BrokerApi> brokerApiDynamicItem;
   @Mock private BrokerApi brokerApi;
   private static final String STREAM_EVENTS_TOPIC = "stream-test-topic";
+  private static final long PUBLISHING_TIMEOUT = 1000L;
+  private static final StreamEventPublisherConfig config =
+      new StreamEventPublisherConfig(STREAM_EVENTS_TOPIC, PUBLISHING_TIMEOUT);
   private static final String INSTANCE_ID = "instance-id";
   private static final Executor EXECUTOR = MoreExecutors.directExecutor();
 
@@ -46,8 +51,7 @@
   public void setup() {
     when(brokerApiDynamicItem.get()).thenReturn(brokerApi);
     when(brokerApi.send(any(), any())).thenReturn(Futures.immediateFuture(true));
-    objectUnderTest =
-        new StreamEventPublisher(brokerApiDynamicItem, STREAM_EVENTS_TOPIC, EXECUTOR, INSTANCE_ID);
+    objectUnderTest = new StreamEventPublisher(brokerApiDynamicItem, config, EXECUTOR, INSTANCE_ID);
   }
 
   @Test
@@ -64,8 +68,7 @@
     Event event = new ProjectCreatedEvent();
     event.instanceId = null;
 
-    objectUnderTest =
-        new StreamEventPublisher(brokerApiDynamicItem, STREAM_EVENTS_TOPIC, EXECUTOR, null);
+    objectUnderTest = new StreamEventPublisher(brokerApiDynamicItem, config, EXECUTOR, null);
     objectUnderTest.onEvent(event);
     verify(brokerApi, times(1)).send(STREAM_EVENTS_TOPIC, event);
   }
@@ -75,8 +78,7 @@
     Event event = new ProjectCreatedEvent();
     event.instanceId = INSTANCE_ID;
 
-    objectUnderTest =
-        new StreamEventPublisher(brokerApiDynamicItem, STREAM_EVENTS_TOPIC, EXECUTOR, null);
+    objectUnderTest = new StreamEventPublisher(brokerApiDynamicItem, config, EXECUTOR, null);
     objectUnderTest.onEvent(event);
     verify(brokerApi, never()).send(STREAM_EVENTS_TOPIC, event);
   }