Merge "Make send method async" into stable-3.4
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java
index a74bdc2..a3a13a5 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java
@@ -15,6 +15,7 @@
 package com.gerritforge.gerrit.eventbroker;
 
 import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.server.events.Event;
 import java.util.Set;
 import java.util.UUID;
@@ -36,13 +37,13 @@
   }
 
   /**
-   * Send an message to a topic.
+   * Send a message to a topic.
    *
    * @param topic topic name
    * @param message to be send to the topic
-   * @return true if the message was successfully sent. False otherwise.
+   * @return a future that returns when the message has been sent.
    */
-  boolean send(String topic, EventMessage message);
+  ListenableFuture<Boolean> send(String topic, EventMessage message);
 
   /**
    * Receive asynchronously a message from a topic.
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
index c8daa0e..1e5379e 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
@@ -22,6 +22,8 @@
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -43,17 +45,18 @@
   }
 
   @Override
-  public boolean send(String topic, EventMessage message) {
+  public ListenableFuture<Boolean> send(String topic, EventMessage message) {
     EventBus topicEventConsumers = eventBusMap.get(topic);
-    try {
-      if (topicEventConsumers != null) {
-        topicEventConsumers.post(message);
-      }
-    } catch (RuntimeException e) {
-      log.atSevere().withCause(e).log();
-      return false;
+    SettableFuture<Boolean> future = SettableFuture.create();
+
+    if (topicEventConsumers != null) {
+      topicEventConsumers.post(message);
+      future.set(true);
+    } else {
+      future.set(false);
     }
-    return true;
+
+    return future;
   }
 
   @Override
diff --git a/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java b/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
index dd0edfa..e0c753a 100644
--- a/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
+++ b/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
@@ -29,6 +29,9 @@
 import com.google.gson.JsonObject;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,6 +44,7 @@
 @RunWith(MockitoJUnitRunner.class)
 public class BrokerApiTest {
 
+  public static final int SEND_FUTURE_TIMEOUT = 1;
   @Captor ArgumentCaptor<EventMessage> eventCaptor;
   Consumer<EventMessage> eventConsumer;
 
@@ -55,12 +59,13 @@
   }
 
   @Test
-  public void shouldSendEvent() {
+  public void shouldSendEvent() throws InterruptedException, TimeoutException, ExecutionException {
     ProjectCreatedEvent event = new ProjectCreatedEvent();
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
 
-    assertThat(brokerApiUnderTest.send("topic", wrap(event))).isTrue();
+    assertThat(brokerApiUnderTest.send("topic", wrap(event)).get(1, TimeUnit.SECONDS)).isTrue();
+
     compareWithExpectedEvent(eventConsumer, eventCaptor, event);
   }
 
@@ -69,7 +74,8 @@
   }
 
   @Test
-  public void shouldRegisterConsumerPerTopic() {
+  public void shouldRegisterConsumerPerTopic()
+      throws InterruptedException, TimeoutException, ExecutionException {
     Consumer<EventMessage> secondConsumer = mockEventConsumer();
     ArgumentCaptor<EventMessage> secondArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
 
@@ -78,8 +84,12 @@
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
     brokerApiUnderTest.receiveAsync("topic2", secondConsumer);
-    brokerApiUnderTest.send("topic", wrap(eventForTopic));
-    brokerApiUnderTest.send("topic2", wrap(eventForTopic2));
+    brokerApiUnderTest
+        .send("topic", wrap(eventForTopic))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
+    brokerApiUnderTest
+        .send("topic2", wrap(eventForTopic2))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
     compareWithExpectedEvent(secondConsumer, secondArgCaptor, eventForTopic2);
@@ -108,7 +118,8 @@
   }
 
   @Test
-  public void shouldDeliverEventToAllRegisteredConsumers() {
+  public void shouldDeliverAsynchronouslyEventToAllRegisteredConsumers()
+      throws InterruptedException, TimeoutException, ExecutionException {
     Consumer<EventMessage> secondConsumer = mockEventConsumer();
     ArgumentCaptor<EventMessage> secondArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
 
@@ -116,45 +127,54 @@
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
     brokerApiUnderTest.receiveAsync("topic", secondConsumer);
-    brokerApiUnderTest.send("topic", wrap(event));
+    brokerApiUnderTest.send("topic", wrap(event)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     compareWithExpectedEvent(eventConsumer, eventCaptor, event);
     compareWithExpectedEvent(secondConsumer, secondArgCaptor, event);
   }
 
   @Test
-  public void shouldReceiveEventsOnlyFromRegisteredTopic() {
+  public void shouldReceiveEventsOnlyFromRegisteredTopic()
+      throws InterruptedException, TimeoutException, ExecutionException {
 
     ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
 
     ProjectCreatedEvent eventForTopic2 = testProjectCreatedEvent("Project name 2");
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-    brokerApiUnderTest.send("topic", wrap(eventForTopic));
-    brokerApiUnderTest.send("topic2", wrap(eventForTopic2));
+    brokerApiUnderTest
+        .send("topic", wrap(eventForTopic))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
+    brokerApiUnderTest
+        .send("topic2", wrap(eventForTopic2))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
   }
 
   @Test
-  public void shouldNotRegisterTheSameConsumerTwicePerTopic() {
+  public void shouldNotRegisterTheSameConsumerTwicePerTopic()
+      throws InterruptedException, TimeoutException, ExecutionException {
     ProjectCreatedEvent event = new ProjectCreatedEvent();
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-    brokerApiUnderTest.send("topic", wrap(event));
+    brokerApiUnderTest.send("topic", wrap(event)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     compareWithExpectedEvent(eventConsumer, eventCaptor, event);
   }
 
   @Test
-  public void shouldReconnectSubscribers() {
+  public void shouldReconnectSubscribers()
+      throws InterruptedException, TimeoutException, ExecutionException {
     ArgumentCaptor<EventMessage> newConsumerArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
 
     ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-    brokerApiUnderTest.send("topic", wrap(eventForTopic));
+    brokerApiUnderTest
+        .send("topic", wrap(eventForTopic))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
 
@@ -164,26 +184,32 @@
 
     brokerApiUnderTest.disconnect();
     brokerApiUnderTest.receiveAsync("topic", newConsumer);
-    brokerApiUnderTest.send("topic", wrap(eventForTopic));
+    brokerApiUnderTest
+        .send("topic", wrap(eventForTopic))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     compareWithExpectedEvent(newConsumer, newConsumerArgCaptor, eventForTopic);
     verify(eventConsumer, never()).accept(eventCaptor.capture());
   }
 
   @Test
-  public void shouldDisconnectSubscribers() {
+  public void shouldDisconnectSubscribers()
+      throws InterruptedException, TimeoutException, ExecutionException {
     ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
     brokerApiUnderTest.disconnect();
 
-    brokerApiUnderTest.send("topic", wrap(eventForTopic));
+    brokerApiUnderTest
+        .send("topic", wrap(eventForTopic))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     verify(eventConsumer, never()).accept(eventCaptor.capture());
   }
 
   @Test
-  public void shouldBeAbleToSwitchBrokerAndReconnectSubscribers() {
+  public void shouldBeAbleToSwitchBrokerAndReconnectSubscribers()
+      throws InterruptedException, TimeoutException, ExecutionException {
     ArgumentCaptor<EventMessage> newConsumerArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
 
     ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
@@ -194,22 +220,29 @@
 
     clearInvocations(eventConsumer);
 
-    brokerApiUnderTest.send("topic", wrap(eventForTopic));
+    brokerApiUnderTest
+        .send("topic", wrap(eventForTopic))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
     verify(eventConsumer, never()).accept(eventCaptor.capture());
 
     clearInvocations(eventConsumer);
-    secondaryBroker.send("topic", wrap(eventForTopic));
+    secondaryBroker.send("topic", wrap(eventForTopic)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     compareWithExpectedEvent(eventConsumer, newConsumerArgCaptor, eventForTopic);
   }
 
   @Test
-  public void shouldReplayAllEvents() {
+  public void shouldReplayAllEvents()
+      throws InterruptedException, TimeoutException, ExecutionException {
     ProjectCreatedEvent event = new ProjectCreatedEvent();
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
 
-    assertThat(brokerApiUnderTest.send("topic", wrap(event))).isTrue();
+    assertThat(
+            brokerApiUnderTest
+                .send("topic", wrap(event))
+                .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS))
+        .isTrue();
 
     verify(eventConsumer, times(1)).accept(eventCaptor.capture());
     compareWithExpectedEvent(eventConsumer, eventCaptor, event);