Remove actual implementation of InProcessBrokerApi
The InProcessBrokerApi main purpose was to allow to register
the events listeners independently from the actual implementation
of the broker-api.
The implementation went unfortunately well beyond the initial scope
and started to simulate a full in-process broker implementation
which would not make sense to use in production.
Also, the fact of silently mocking the real behaviour was potentially
leaving configuration and setup issues unnoticed with the drawback
of not notifing the Gerrit admins that something wasn't quite right.
Throw an exception of unsupported method for all of those calls
that are not supposed to be used as actual implementation of a
broker.
Change-Id: I92100d3d447aba32af58cd05e065affb402b7372
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
index a039d24..e6a3c62 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
@@ -16,64 +16,30 @@
import static com.gerritforge.gerrit.eventbroker.TopicSubscriber.topicSubscriber;
-import com.google.common.collect.EvictingQueue;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.MapMaker;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
import com.google.common.flogger.FluentLogger;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.gerrit.server.events.Event;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
public class InProcessBrokerApi implements BrokerApi {
private static final FluentLogger log = FluentLogger.forEnclosingClass();
-
- private static final Integer DEFAULT_MESSAGE_QUEUE_SIZE = 100;
-
- private final Map<String, EvictingQueue<Event>> messagesQueueMap;
- private final Map<String, EventBus> eventBusMap;
private final Set<TopicSubscriber> topicSubscribers;
public InProcessBrokerApi() {
- this.eventBusMap = new MapMaker().concurrencyLevel(1).makeMap();
- this.messagesQueueMap = new MapMaker().concurrencyLevel(1).makeMap();
this.topicSubscribers = new HashSet<>();
}
@Override
public ListenableFuture<Boolean> send(String topic, Event message) {
- EventBus topicEventConsumers = eventBusMap.get(topic);
- SettableFuture<Boolean> future = SettableFuture.create();
-
- if (topicEventConsumers != null) {
- topicEventConsumers.post(message);
- future.set(true);
- } else {
- future.set(false);
- }
-
- return future;
+ return unsupported();
}
@Override
public void receiveAsync(String topic, Consumer<Event> eventConsumer) {
- EventBus topicEventConsumers = eventBusMap.get(topic);
- if (topicEventConsumers == null) {
- topicEventConsumers = new EventBus(topic);
- eventBusMap.put(topic, topicEventConsumers);
- }
-
- topicEventConsumers.register(eventConsumer);
topicSubscribers.add(topicSubscriber(topic, eventConsumer));
-
- EvictingQueue<Event> messageQueue = EvictingQueue.create(DEFAULT_MESSAGE_QUEUE_SIZE);
- messagesQueueMap.put(topic, messageQueue);
- topicEventConsumers.register(new EventBusMessageRecorder(messageQueue));
}
@Override
@@ -83,28 +49,16 @@
@Override
public void disconnect() {
- this.eventBusMap.clear();
+ this.topicSubscribers.clear();
}
@Override
public void replayAllEvents(String topic) {
- if (messagesQueueMap.containsKey(topic)) {
- messagesQueueMap.get(topic).stream().forEach(eventMessage -> send(topic, eventMessage));
- }
+ unsupported();
}
- private static class EventBusMessageRecorder {
- private EvictingQueue messagesQueue;
-
- public EventBusMessageRecorder(EvictingQueue messagesQueue) {
- this.messagesQueue = messagesQueue;
- }
-
- @Subscribe
- public void recordCustomerChange(Event e) {
- if (!messagesQueue.contains(e)) {
- messagesQueue.add(e);
- }
- }
+ private <T> T unsupported() {
+ throw new UnsupportedOperationException(
+ "InProcessBrokerApi is not intended to be used as a real broker");
}
}
diff --git a/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java b/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
deleted file mode 100644
index 5f6c73d..0000000
--- a/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
+++ /dev/null
@@ -1,289 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.gerrit.eventbroker;
-
-import static com.gerritforge.gerrit.eventbroker.TopicSubscriber.topicSubscriber;
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import com.google.common.eventbus.Subscribe;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.ProjectCreatedEvent;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Consumer;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class BrokerApiTest {
-
- public static final int SEND_FUTURE_TIMEOUT = 1;
- @Captor ArgumentCaptor<Event> eventCaptor;
- Consumer<Event> eventConsumer;
-
- BrokerApi brokerApiUnderTest;
- UUID instanceId = UUID.randomUUID();
- private Gson gson = new Gson();
-
- @Before
- public void setup() {
- brokerApiUnderTest = new InProcessBrokerApi();
- eventConsumer = mockEventConsumer();
- }
-
- @Test
- public void shouldSendEvent() throws InterruptedException, TimeoutException, ExecutionException {
- ProjectCreatedEvent event = new ProjectCreatedEvent();
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-
- assertThat(brokerApiUnderTest.send("topic", wrap(event)).get(1, TimeUnit.SECONDS)).isTrue();
-
- compareWithExpectedEvent(eventConsumer, eventCaptor, event);
- }
-
- private Event wrap(ProjectCreatedEvent event) {
- return event;
- }
-
- @Test
- public void shouldRegisterConsumerPerTopic()
- throws InterruptedException, TimeoutException, ExecutionException {
- Consumer<Event> secondConsumer = mockEventConsumer();
- ArgumentCaptor<Event> secondArgCaptor = ArgumentCaptor.forClass(Event.class);
-
- ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
- ProjectCreatedEvent eventForTopic2 = testProjectCreatedEvent("Project name 2");
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest.receiveAsync("topic2", secondConsumer);
- brokerApiUnderTest
- .send("topic", wrap(eventForTopic))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
- brokerApiUnderTest
- .send("topic2", wrap(eventForTopic2))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
- compareWithExpectedEvent(secondConsumer, secondArgCaptor, eventForTopic2);
- }
-
- @Test
- public void shouldReturnMapOfConsumersPerTopic() {
- Consumer<Event> firstConsumerTopicA = mockEventConsumer();
-
- Consumer<Event> secondConsumerTopicA = mockEventConsumer();
- Consumer<Event> thirdConsumerTopicB = mockEventConsumer();
-
- brokerApiUnderTest.receiveAsync("TopicA", firstConsumerTopicA);
- brokerApiUnderTest.receiveAsync("TopicA", secondConsumerTopicA);
- brokerApiUnderTest.receiveAsync("TopicB", thirdConsumerTopicB);
-
- Set<TopicSubscriber> consumersMap = brokerApiUnderTest.topicSubscribers();
-
- assertThat(consumersMap).isNotNull();
- assertThat(consumersMap).isNotEmpty();
- assertThat(consumersMap)
- .containsExactly(
- topicSubscriber("TopicA", firstConsumerTopicA),
- topicSubscriber("TopicA", secondConsumerTopicA),
- topicSubscriber("TopicB", thirdConsumerTopicB));
- }
-
- @Test
- public void shouldDeliverAsynchronouslyEventToAllRegisteredConsumers()
- throws InterruptedException, TimeoutException, ExecutionException {
- Consumer<Event> secondConsumer = mockEventConsumer();
- ArgumentCaptor<Event> secondArgCaptor = ArgumentCaptor.forClass(Event.class);
-
- ProjectCreatedEvent event = testProjectCreatedEvent("Project name");
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest.receiveAsync("topic", secondConsumer);
- brokerApiUnderTest.send("topic", wrap(event)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- compareWithExpectedEvent(eventConsumer, eventCaptor, event);
- compareWithExpectedEvent(secondConsumer, secondArgCaptor, event);
- }
-
- @Test
- public void shouldReceiveEventsOnlyFromRegisteredTopic()
- throws InterruptedException, TimeoutException, ExecutionException {
-
- ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
-
- ProjectCreatedEvent eventForTopic2 = testProjectCreatedEvent("Project name 2");
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest
- .send("topic", wrap(eventForTopic))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
- brokerApiUnderTest
- .send("topic2", wrap(eventForTopic2))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
- }
-
- @Test
- public void shouldNotRegisterTheSameConsumerTwicePerTopic()
- throws InterruptedException, TimeoutException, ExecutionException {
- ProjectCreatedEvent event = new ProjectCreatedEvent();
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest.send("topic", wrap(event)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- compareWithExpectedEvent(eventConsumer, eventCaptor, event);
- }
-
- @Test
- public void shouldReconnectSubscribers()
- throws InterruptedException, TimeoutException, ExecutionException {
- ArgumentCaptor<Event> newConsumerArgCaptor = ArgumentCaptor.forClass(Event.class);
-
- ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest
- .send("topic", wrap(eventForTopic))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
-
- Consumer<Event> newConsumer = mockEventConsumer();
-
- clearInvocations(eventConsumer);
-
- brokerApiUnderTest.disconnect();
- brokerApiUnderTest.receiveAsync("topic", newConsumer);
- brokerApiUnderTest
- .send("topic", wrap(eventForTopic))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- compareWithExpectedEvent(newConsumer, newConsumerArgCaptor, eventForTopic);
- verify(eventConsumer, never()).accept(eventCaptor.capture());
- }
-
- @Test
- public void shouldDisconnectSubscribers()
- throws InterruptedException, TimeoutException, ExecutionException {
- ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest.disconnect();
-
- brokerApiUnderTest
- .send("topic", wrap(eventForTopic))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- verify(eventConsumer, never()).accept(eventCaptor.capture());
- }
-
- @Test
- public void shouldBeAbleToSwitchBrokerAndReconnectSubscribers()
- throws InterruptedException, TimeoutException, ExecutionException {
- ArgumentCaptor<Event> newConsumerArgCaptor = ArgumentCaptor.forClass(Event.class);
-
- ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
-
- BrokerApi secondaryBroker = new InProcessBrokerApi();
- brokerApiUnderTest.disconnect();
- secondaryBroker.receiveAsync("topic", eventConsumer);
-
- clearInvocations(eventConsumer);
-
- brokerApiUnderTest
- .send("topic", wrap(eventForTopic))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
- verify(eventConsumer, never()).accept(eventCaptor.capture());
-
- clearInvocations(eventConsumer);
- secondaryBroker.send("topic", wrap(eventForTopic)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- compareWithExpectedEvent(eventConsumer, newConsumerArgCaptor, eventForTopic);
- }
-
- @Test
- public void shouldReplayAllEvents()
- throws InterruptedException, TimeoutException, ExecutionException {
- ProjectCreatedEvent event = new ProjectCreatedEvent();
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-
- assertThat(
- brokerApiUnderTest
- .send("topic", wrap(event))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS))
- .isTrue();
-
- verify(eventConsumer, times(1)).accept(eventCaptor.capture());
- compareWithExpectedEvent(eventConsumer, eventCaptor, event);
- reset(eventConsumer);
-
- brokerApiUnderTest.replayAllEvents("topic");
- verify(eventConsumer, times(1)).accept(eventCaptor.capture());
- compareWithExpectedEvent(eventConsumer, eventCaptor, event);
- }
-
- @Test
- public void shouldSkipReplayAllEventsWhenTopicDoesNotExists() {
- brokerApiUnderTest.replayAllEvents("unexistentTopic");
- verify(eventConsumer, times(0)).accept(eventCaptor.capture());
- }
-
- private ProjectCreatedEvent testProjectCreatedEvent(String s) {
- ProjectCreatedEvent eventForTopic = new ProjectCreatedEvent();
- eventForTopic.projectName = s;
- return eventForTopic;
- }
-
- private interface Subscriber extends Consumer<Event> {
-
- @Override
- @Subscribe
- void accept(Event eventMessage);
- }
-
- @SuppressWarnings("unchecked")
- private <T> Consumer<T> mockEventConsumer() {
- return (Consumer<T>) Mockito.mock(Subscriber.class);
- }
-
- private void compareWithExpectedEvent(
- Consumer<Event> eventConsumer, ArgumentCaptor<Event> eventCaptor, Event expectedEvent) {
- verify(eventConsumer, times(1)).accept(eventCaptor.capture());
- assertThat(eventCaptor.getValue()).isEqualTo(expectedEvent);
- }
-
- private JsonObject eventToJson(Event event) {
- return gson.toJsonTree(event).getAsJsonObject();
- }
-}
diff --git a/src/test/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApiTest.java b/src/test/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApiTest.java
new file mode 100644
index 0000000..eee2bdf
--- /dev/null
+++ b/src/test/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApiTest.java
@@ -0,0 +1,129 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import static com.gerritforge.gerrit.eventbroker.TopicSubscriber.topicSubscriber;
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+
+import com.google.gerrit.server.events.Event;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InProcessBrokerApiTest {
+
+ public static final int SEND_FUTURE_TIMEOUT = 1;
+ Consumer<Event> eventConsumer;
+
+ BrokerApi brokerApiUnderTest;
+ UUID instanceId = UUID.randomUUID();
+
+ @Before
+ public void setup() {
+ brokerApiUnderTest = new InProcessBrokerApi();
+ eventConsumer = mockEventConsumer();
+ }
+
+ @Test
+ public void sendEventShouldNotBeSupported() {
+ assertThrows(UnsupportedOperationException.class, () -> brokerApiUnderTest.send("topic", null));
+ }
+
+ @Test
+ public void shouldRegisterConsumerPerTopic() {
+ Consumer<Event> secondConsumer = mockEventConsumer();
+ brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+ brokerApiUnderTest.receiveAsync("topic2", secondConsumer);
+ assertThat(brokerApiUnderTest.topicSubscribers().size()).isEqualTo(2);
+ }
+
+ @Test
+ public void shouldReturnMapOfConsumersPerTopic() {
+ Consumer<Event> firstConsumerTopicA = mockEventConsumer();
+
+ Consumer<Event> secondConsumerTopicA = mockEventConsumer();
+ Consumer<Event> thirdConsumerTopicB = mockEventConsumer();
+
+ brokerApiUnderTest.receiveAsync("TopicA", firstConsumerTopicA);
+ brokerApiUnderTest.receiveAsync("TopicA", secondConsumerTopicA);
+ brokerApiUnderTest.receiveAsync("TopicB", thirdConsumerTopicB);
+
+ Set<TopicSubscriber> consumersMap = brokerApiUnderTest.topicSubscribers();
+
+ assertThat(consumersMap).isNotNull();
+ assertThat(consumersMap).isNotEmpty();
+ assertThat(consumersMap)
+ .containsExactly(
+ topicSubscriber("TopicA", firstConsumerTopicA),
+ topicSubscriber("TopicA", secondConsumerTopicA),
+ topicSubscriber("TopicB", thirdConsumerTopicB));
+ }
+
+ @Test
+ public void shouldDeliverAsynchronouslyEventToAllRegisteredConsumers() {
+ Consumer<Event> secondConsumer = mockEventConsumer();
+ brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+ brokerApiUnderTest.receiveAsync("topic", secondConsumer);
+ assertThat(brokerApiUnderTest.topicSubscribers().size()).isEqualTo(2);
+ }
+
+ @Test
+ public void shouldNotRegisterTheSameConsumerTwicePerTopic() {
+ brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+ brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+
+ assertThat(brokerApiUnderTest.topicSubscribers().size()).isEqualTo(1);
+ }
+
+ @Test
+ public void shouldReconnectSubscribers() {
+ brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+ assertThat(brokerApiUnderTest.topicSubscribers()).isNotEmpty();
+
+ Consumer<Event> newConsumer = mockEventConsumer();
+
+ brokerApiUnderTest.disconnect();
+ assertThat(brokerApiUnderTest.topicSubscribers()).isEmpty();
+
+ brokerApiUnderTest.receiveAsync("topic", newConsumer);
+ assertThat(brokerApiUnderTest.topicSubscribers()).isNotEmpty();
+ }
+
+ @Test
+ public void shouldDisconnectSubscribers() {
+ brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+ brokerApiUnderTest.disconnect();
+ assertThat(brokerApiUnderTest.topicSubscribers()).isEmpty();
+ }
+
+ @Test
+ public void replayAllEventsShouldNotBeSupported() {
+ assertThrows(
+ UnsupportedOperationException.class, () -> brokerApiUnderTest.replayAllEvents("topic"));
+ }
+
+ private static class Subscriber<T> implements Consumer<T> {
+
+ @Override
+ public void accept(T eventMessage) {}
+ }
+
+ private <T> Consumer<T> mockEventConsumer() {
+ return new Subscriber<>();
+ }
+}