Merge "Make send method async" into stable-3.4
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java
index a74bdc2..a3a13a5 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java
@@ -15,6 +15,7 @@
package com.gerritforge.gerrit.eventbroker;
import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.server.events.Event;
import java.util.Set;
import java.util.UUID;
@@ -36,13 +37,13 @@
}
/**
- * Send an message to a topic.
+ * Send a message to a topic.
*
* @param topic topic name
* @param message to be send to the topic
- * @return true if the message was successfully sent. False otherwise.
+ * @return a future that returns when the message has been sent.
*/
- boolean send(String topic, EventMessage message);
+ ListenableFuture<Boolean> send(String topic, EventMessage message);
/**
* Receive asynchronously a message from a topic.
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
index c8daa0e..1e5379e 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
@@ -22,6 +22,8 @@
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -43,17 +45,18 @@
}
@Override
- public boolean send(String topic, EventMessage message) {
+ public ListenableFuture<Boolean> send(String topic, EventMessage message) {
EventBus topicEventConsumers = eventBusMap.get(topic);
- try {
- if (topicEventConsumers != null) {
- topicEventConsumers.post(message);
- }
- } catch (RuntimeException e) {
- log.atSevere().withCause(e).log();
- return false;
+ SettableFuture<Boolean> future = SettableFuture.create();
+
+ if (topicEventConsumers != null) {
+ topicEventConsumers.post(message);
+ future.set(true);
+ } else {
+ future.set(false);
}
- return true;
+
+ return future;
}
@Override
diff --git a/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java b/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
index dd0edfa..e0c753a 100644
--- a/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
+++ b/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
@@ -29,6 +29,9 @@
import com.google.gson.JsonObject;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
@@ -41,6 +44,7 @@
@RunWith(MockitoJUnitRunner.class)
public class BrokerApiTest {
+ public static final int SEND_FUTURE_TIMEOUT = 1;
@Captor ArgumentCaptor<EventMessage> eventCaptor;
Consumer<EventMessage> eventConsumer;
@@ -55,12 +59,13 @@
}
@Test
- public void shouldSendEvent() {
+ public void shouldSendEvent() throws InterruptedException, TimeoutException, ExecutionException {
ProjectCreatedEvent event = new ProjectCreatedEvent();
brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- assertThat(brokerApiUnderTest.send("topic", wrap(event))).isTrue();
+ assertThat(brokerApiUnderTest.send("topic", wrap(event)).get(1, TimeUnit.SECONDS)).isTrue();
+
compareWithExpectedEvent(eventConsumer, eventCaptor, event);
}
@@ -69,7 +74,8 @@
}
@Test
- public void shouldRegisterConsumerPerTopic() {
+ public void shouldRegisterConsumerPerTopic()
+ throws InterruptedException, TimeoutException, ExecutionException {
Consumer<EventMessage> secondConsumer = mockEventConsumer();
ArgumentCaptor<EventMessage> secondArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
@@ -78,8 +84,12 @@
brokerApiUnderTest.receiveAsync("topic", eventConsumer);
brokerApiUnderTest.receiveAsync("topic2", secondConsumer);
- brokerApiUnderTest.send("topic", wrap(eventForTopic));
- brokerApiUnderTest.send("topic2", wrap(eventForTopic2));
+ brokerApiUnderTest
+ .send("topic", wrap(eventForTopic))
+ .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
+ brokerApiUnderTest
+ .send("topic2", wrap(eventForTopic2))
+ .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
compareWithExpectedEvent(secondConsumer, secondArgCaptor, eventForTopic2);
@@ -108,7 +118,8 @@
}
@Test
- public void shouldDeliverEventToAllRegisteredConsumers() {
+ public void shouldDeliverAsynchronouslyEventToAllRegisteredConsumers()
+ throws InterruptedException, TimeoutException, ExecutionException {
Consumer<EventMessage> secondConsumer = mockEventConsumer();
ArgumentCaptor<EventMessage> secondArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
@@ -116,45 +127,54 @@
brokerApiUnderTest.receiveAsync("topic", eventConsumer);
brokerApiUnderTest.receiveAsync("topic", secondConsumer);
- brokerApiUnderTest.send("topic", wrap(event));
+ brokerApiUnderTest.send("topic", wrap(event)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
compareWithExpectedEvent(eventConsumer, eventCaptor, event);
compareWithExpectedEvent(secondConsumer, secondArgCaptor, event);
}
@Test
- public void shouldReceiveEventsOnlyFromRegisteredTopic() {
+ public void shouldReceiveEventsOnlyFromRegisteredTopic()
+ throws InterruptedException, TimeoutException, ExecutionException {
ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
ProjectCreatedEvent eventForTopic2 = testProjectCreatedEvent("Project name 2");
brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest.send("topic", wrap(eventForTopic));
- brokerApiUnderTest.send("topic2", wrap(eventForTopic2));
+ brokerApiUnderTest
+ .send("topic", wrap(eventForTopic))
+ .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
+ brokerApiUnderTest
+ .send("topic2", wrap(eventForTopic2))
+ .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
}
@Test
- public void shouldNotRegisterTheSameConsumerTwicePerTopic() {
+ public void shouldNotRegisterTheSameConsumerTwicePerTopic()
+ throws InterruptedException, TimeoutException, ExecutionException {
ProjectCreatedEvent event = new ProjectCreatedEvent();
brokerApiUnderTest.receiveAsync("topic", eventConsumer);
brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest.send("topic", wrap(event));
+ brokerApiUnderTest.send("topic", wrap(event)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
compareWithExpectedEvent(eventConsumer, eventCaptor, event);
}
@Test
- public void shouldReconnectSubscribers() {
+ public void shouldReconnectSubscribers()
+ throws InterruptedException, TimeoutException, ExecutionException {
ArgumentCaptor<EventMessage> newConsumerArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest.send("topic", wrap(eventForTopic));
+ brokerApiUnderTest
+ .send("topic", wrap(eventForTopic))
+ .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
@@ -164,26 +184,32 @@
brokerApiUnderTest.disconnect();
brokerApiUnderTest.receiveAsync("topic", newConsumer);
- brokerApiUnderTest.send("topic", wrap(eventForTopic));
+ brokerApiUnderTest
+ .send("topic", wrap(eventForTopic))
+ .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
compareWithExpectedEvent(newConsumer, newConsumerArgCaptor, eventForTopic);
verify(eventConsumer, never()).accept(eventCaptor.capture());
}
@Test
- public void shouldDisconnectSubscribers() {
+ public void shouldDisconnectSubscribers()
+ throws InterruptedException, TimeoutException, ExecutionException {
ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
brokerApiUnderTest.receiveAsync("topic", eventConsumer);
brokerApiUnderTest.disconnect();
- brokerApiUnderTest.send("topic", wrap(eventForTopic));
+ brokerApiUnderTest
+ .send("topic", wrap(eventForTopic))
+ .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
verify(eventConsumer, never()).accept(eventCaptor.capture());
}
@Test
- public void shouldBeAbleToSwitchBrokerAndReconnectSubscribers() {
+ public void shouldBeAbleToSwitchBrokerAndReconnectSubscribers()
+ throws InterruptedException, TimeoutException, ExecutionException {
ArgumentCaptor<EventMessage> newConsumerArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
@@ -194,22 +220,29 @@
clearInvocations(eventConsumer);
- brokerApiUnderTest.send("topic", wrap(eventForTopic));
+ brokerApiUnderTest
+ .send("topic", wrap(eventForTopic))
+ .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
verify(eventConsumer, never()).accept(eventCaptor.capture());
clearInvocations(eventConsumer);
- secondaryBroker.send("topic", wrap(eventForTopic));
+ secondaryBroker.send("topic", wrap(eventForTopic)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
compareWithExpectedEvent(eventConsumer, newConsumerArgCaptor, eventForTopic);
}
@Test
- public void shouldReplayAllEvents() {
+ public void shouldReplayAllEvents()
+ throws InterruptedException, TimeoutException, ExecutionException {
ProjectCreatedEvent event = new ProjectCreatedEvent();
brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- assertThat(brokerApiUnderTest.send("topic", wrap(event))).isTrue();
+ assertThat(
+ brokerApiUnderTest
+ .send("topic", wrap(event))
+ .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS))
+ .isTrue();
verify(eventConsumer, times(1)).accept(eventCaptor.capture());
compareWithExpectedEvent(eventConsumer, eventCaptor, event);