Merge branch 'stable-3.4' into stable-3.5
* stable-3.4:
Remove Maven legacy pom.xml
Make events-broker a Gerrit plugin
Provide subscribers with consumer's group id
Remove actual implementation of InProcessBrokerApi
Set event-broker and gerrit version to v3.4.8
Add interface ExtendedBrokerApi to consume messages with groupId
Change-Id: Ica5f4f1a8d8d2a86b96266e0198e4721fcac15ce
diff --git a/BUILD b/BUILD
index 6ac0fac..86e9d62 100644
--- a/BUILD
+++ b/BUILD
@@ -7,12 +7,13 @@
"PLUGIN_DEPS",
"PLUGIN_DEPS_NEVERLINK",
"PLUGIN_TEST_DEPS",
+ "gerrit_plugin",
)
-java_library(
+gerrit_plugin(
name = "events-broker",
srcs = glob(["src/main/java/**/*.java"]),
- deps = PLUGIN_DEPS_NEVERLINK,
+ deps = [],
)
junit_tests(
diff --git a/pom.xml b/pom.xml
deleted file mode 100644
index 4478cc8..0000000
--- a/pom.xml
+++ /dev/null
@@ -1,174 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.gerritforge</groupId>
- <artifactId>events-broker</artifactId>
- <version>3.5.6</version>
- <packaging>jar</packaging>
-
- <name>events-broker</name>
- <description>Events Broker API for use with Gerrit Code Review</description>
- <url>https://gerrit.googlesource.com/modules/events-broker</url>
-
- <licenses>
- <license>
- <name>The Apache Software License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- </license>
- </licenses>
-
- <scm>
- <url>https://gerrit.googlesource.com/modules/events-broker</url>
- <connection>https://gerrit.googlesource.com/modules/events-broker.git</connection>
- </scm>
-
- <developers>
- <developer>
- <name>Luca Milanesio</name>
- </developer>
- <developer>
- <name>Marcin Czech</name>
- </developer>
- <developer>
- <name>Antonio Barone</name>
- </developer>
- </developers>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <auto-value.version>1.7.4</auto-value.version>
- <gerrit.version>${project.version}</gerrit.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>com.google.gerrit</groupId>
- <artifactId>gerrit-plugin-api</artifactId>
- <version>${gerrit.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.google.gerrit</groupId>
- <artifactId>gerrit-acceptance-framework</artifactId>
- <version>${gerrit.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-jdk15on</artifactId>
- <version>1.61</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.12</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.6.1</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- <annotationProcessorPaths>
- <path>
- <groupId>com.google.auto.value</groupId>
- <artifactId>auto-value</artifactId>
- <version>${auto-value.version}</version>
- </path>
- </annotationProcessorPaths>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>3.1.0</version>
- <executions>
- <execution>
- <id>attach-sources</id>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <version>3.1.1</version>
- <configuration>
- <!-- Workaround to https://bugs.openjdk.java.net/browse/JDK-8212233 -->
- <source>8</source>
- </configuration>
- <executions>
- <execution>
- <id>attach-javadocs</id>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-release-plugin</artifactId>
- <version>2.5.3</version>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-gpg-plugin</artifactId>
- <version>1.6</version>
- <executions>
- <execution>
- <id>sign-artifacts</id>
- <phase>verify</phase>
- <goals>
- <goal>sign</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>com.theoryinpractise</groupId>
- <artifactId>googleformatter-maven-plugin</artifactId>
- <version>1.7.3</version>
- <executions>
- <execution>
- <id>reformat-sources</id>
- <configuration>
- <includeStale>false</includeStale>
- <style>GOOGLE</style>
- <formatMain>true</formatMain>
- <formatTest>true</formatTest>
- <filterModified>false</filterModified>
- <skip>false</skip>
- <fixImports>true</fixImports>
- <maxLineLength>100</maxLineLength>
- </configuration>
- <goals>
- <goal>format</goal>
- </goals>
- <phase>process-sources</phase>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- <distributionManagement>
- <repository>
- <id>sonatype-nexus-staging</id>
- <name>Sonatype Nexus Staging</name>
- <url>https://oss.sonatype.org/service/local/staging/deploy/maven2</url>
- </repository>
- </distributionManagement>
-</project>
-
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/ExtendedBrokerApi.java b/src/main/java/com/gerritforge/gerrit/eventbroker/ExtendedBrokerApi.java
new file mode 100644
index 0000000..fa7cb74
--- /dev/null
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/ExtendedBrokerApi.java
@@ -0,0 +1,38 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import com.google.gerrit.server.events.Event;
+import java.util.Set;
+import java.util.function.Consumer;
+
+public interface ExtendedBrokerApi extends BrokerApi {
+
+ /**
+ * Receive asynchronously a message from a topic using a consumer's group id.
+ *
+ * @param topic topic name
+ * @param groupId the group identifier that consumer belongs to for that topic
+ * @param consumer an operation that accepts and process a single message
+ */
+ void receiveAsync(String topic, String groupId, Consumer<Event> consumer);
+
+ /**
+ * Get the active subscribers with their consumer's group id.
+ *
+ * @return {@link Set} of the topics subscribers using a consumer's group id..
+ */
+ Set<TopicSubscriberWithGroupId> topicSubscribersWithGroupId();
+}
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
index a039d24..804a2fd 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
@@ -15,65 +15,40 @@
package com.gerritforge.gerrit.eventbroker;
import static com.gerritforge.gerrit.eventbroker.TopicSubscriber.topicSubscriber;
+import static com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId.topicSubscriberWithGroupId;
-import com.google.common.collect.EvictingQueue;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.MapMaker;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
import com.google.common.flogger.FluentLogger;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.gerrit.server.events.Event;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
-public class InProcessBrokerApi implements BrokerApi {
+public class InProcessBrokerApi implements ExtendedBrokerApi {
private static final FluentLogger log = FluentLogger.forEnclosingClass();
-
- private static final Integer DEFAULT_MESSAGE_QUEUE_SIZE = 100;
-
- private final Map<String, EvictingQueue<Event>> messagesQueueMap;
- private final Map<String, EventBus> eventBusMap;
private final Set<TopicSubscriber> topicSubscribers;
+ private final Set<TopicSubscriberWithGroupId> topicSubscribersWithGroupId;
public InProcessBrokerApi() {
- this.eventBusMap = new MapMaker().concurrencyLevel(1).makeMap();
- this.messagesQueueMap = new MapMaker().concurrencyLevel(1).makeMap();
this.topicSubscribers = new HashSet<>();
+ this.topicSubscribersWithGroupId = new HashSet<>();
}
@Override
public ListenableFuture<Boolean> send(String topic, Event message) {
- EventBus topicEventConsumers = eventBusMap.get(topic);
- SettableFuture<Boolean> future = SettableFuture.create();
-
- if (topicEventConsumers != null) {
- topicEventConsumers.post(message);
- future.set(true);
- } else {
- future.set(false);
- }
-
- return future;
+ return unsupported();
}
@Override
public void receiveAsync(String topic, Consumer<Event> eventConsumer) {
- EventBus topicEventConsumers = eventBusMap.get(topic);
- if (topicEventConsumers == null) {
- topicEventConsumers = new EventBus(topic);
- eventBusMap.put(topic, topicEventConsumers);
- }
-
- topicEventConsumers.register(eventConsumer);
topicSubscribers.add(topicSubscriber(topic, eventConsumer));
+ }
- EvictingQueue<Event> messageQueue = EvictingQueue.create(DEFAULT_MESSAGE_QUEUE_SIZE);
- messagesQueueMap.put(topic, messageQueue);
- topicEventConsumers.register(new EventBusMessageRecorder(messageQueue));
+ @Override
+ public void receiveAsync(String topic, String groupId, Consumer<Event> eventConsumer) {
+ topicSubscribersWithGroupId.add(
+ topicSubscriberWithGroupId(groupId, topicSubscriber(topic, eventConsumer)));
}
@Override
@@ -82,29 +57,23 @@
}
@Override
+ public Set<TopicSubscriberWithGroupId> topicSubscribersWithGroupId() {
+ return ImmutableSet.copyOf(topicSubscribersWithGroupId);
+ }
+
+ @Override
public void disconnect() {
- this.eventBusMap.clear();
+ this.topicSubscribers.clear();
+ this.topicSubscribersWithGroupId.clear();
}
@Override
public void replayAllEvents(String topic) {
- if (messagesQueueMap.containsKey(topic)) {
- messagesQueueMap.get(topic).stream().forEach(eventMessage -> send(topic, eventMessage));
- }
+ unsupported();
}
- private static class EventBusMessageRecorder {
- private EvictingQueue messagesQueue;
-
- public EventBusMessageRecorder(EvictingQueue messagesQueue) {
- this.messagesQueue = messagesQueue;
- }
-
- @Subscribe
- public void recordCustomerChange(Event e) {
- if (!messagesQueue.contains(e)) {
- messagesQueue.add(e);
- }
- }
+ private <T> T unsupported() {
+ throw new UnsupportedOperationException(
+ "InProcessBrokerApi is not intended to be used as a real broker");
}
}
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/TopicSubscriberWithGroupId.java b/src/main/java/com/gerritforge/gerrit/eventbroker/TopicSubscriberWithGroupId.java
new file mode 100644
index 0000000..3ce2d56
--- /dev/null
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/TopicSubscriberWithGroupId.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import com.google.auto.value.AutoValue;
+
+@AutoValue
+public abstract class TopicSubscriberWithGroupId {
+ public static TopicSubscriberWithGroupId topicSubscriberWithGroupId(
+ String groupId, TopicSubscriber topicSubscriber) {
+ return new AutoValue_TopicSubscriberWithGroupId(groupId, topicSubscriber);
+ }
+
+ public abstract String groupId();
+
+ public abstract TopicSubscriber topicSubscriber();
+}
diff --git a/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java b/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
deleted file mode 100644
index 5f6c73d..0000000
--- a/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
+++ /dev/null
@@ -1,289 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.gerrit.eventbroker;
-
-import static com.gerritforge.gerrit.eventbroker.TopicSubscriber.topicSubscriber;
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import com.google.common.eventbus.Subscribe;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.ProjectCreatedEvent;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Consumer;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class BrokerApiTest {
-
- public static final int SEND_FUTURE_TIMEOUT = 1;
- @Captor ArgumentCaptor<Event> eventCaptor;
- Consumer<Event> eventConsumer;
-
- BrokerApi brokerApiUnderTest;
- UUID instanceId = UUID.randomUUID();
- private Gson gson = new Gson();
-
- @Before
- public void setup() {
- brokerApiUnderTest = new InProcessBrokerApi();
- eventConsumer = mockEventConsumer();
- }
-
- @Test
- public void shouldSendEvent() throws InterruptedException, TimeoutException, ExecutionException {
- ProjectCreatedEvent event = new ProjectCreatedEvent();
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-
- assertThat(brokerApiUnderTest.send("topic", wrap(event)).get(1, TimeUnit.SECONDS)).isTrue();
-
- compareWithExpectedEvent(eventConsumer, eventCaptor, event);
- }
-
- private Event wrap(ProjectCreatedEvent event) {
- return event;
- }
-
- @Test
- public void shouldRegisterConsumerPerTopic()
- throws InterruptedException, TimeoutException, ExecutionException {
- Consumer<Event> secondConsumer = mockEventConsumer();
- ArgumentCaptor<Event> secondArgCaptor = ArgumentCaptor.forClass(Event.class);
-
- ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
- ProjectCreatedEvent eventForTopic2 = testProjectCreatedEvent("Project name 2");
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest.receiveAsync("topic2", secondConsumer);
- brokerApiUnderTest
- .send("topic", wrap(eventForTopic))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
- brokerApiUnderTest
- .send("topic2", wrap(eventForTopic2))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
- compareWithExpectedEvent(secondConsumer, secondArgCaptor, eventForTopic2);
- }
-
- @Test
- public void shouldReturnMapOfConsumersPerTopic() {
- Consumer<Event> firstConsumerTopicA = mockEventConsumer();
-
- Consumer<Event> secondConsumerTopicA = mockEventConsumer();
- Consumer<Event> thirdConsumerTopicB = mockEventConsumer();
-
- brokerApiUnderTest.receiveAsync("TopicA", firstConsumerTopicA);
- brokerApiUnderTest.receiveAsync("TopicA", secondConsumerTopicA);
- brokerApiUnderTest.receiveAsync("TopicB", thirdConsumerTopicB);
-
- Set<TopicSubscriber> consumersMap = brokerApiUnderTest.topicSubscribers();
-
- assertThat(consumersMap).isNotNull();
- assertThat(consumersMap).isNotEmpty();
- assertThat(consumersMap)
- .containsExactly(
- topicSubscriber("TopicA", firstConsumerTopicA),
- topicSubscriber("TopicA", secondConsumerTopicA),
- topicSubscriber("TopicB", thirdConsumerTopicB));
- }
-
- @Test
- public void shouldDeliverAsynchronouslyEventToAllRegisteredConsumers()
- throws InterruptedException, TimeoutException, ExecutionException {
- Consumer<Event> secondConsumer = mockEventConsumer();
- ArgumentCaptor<Event> secondArgCaptor = ArgumentCaptor.forClass(Event.class);
-
- ProjectCreatedEvent event = testProjectCreatedEvent("Project name");
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest.receiveAsync("topic", secondConsumer);
- brokerApiUnderTest.send("topic", wrap(event)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- compareWithExpectedEvent(eventConsumer, eventCaptor, event);
- compareWithExpectedEvent(secondConsumer, secondArgCaptor, event);
- }
-
- @Test
- public void shouldReceiveEventsOnlyFromRegisteredTopic()
- throws InterruptedException, TimeoutException, ExecutionException {
-
- ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
-
- ProjectCreatedEvent eventForTopic2 = testProjectCreatedEvent("Project name 2");
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest
- .send("topic", wrap(eventForTopic))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
- brokerApiUnderTest
- .send("topic2", wrap(eventForTopic2))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
- }
-
- @Test
- public void shouldNotRegisterTheSameConsumerTwicePerTopic()
- throws InterruptedException, TimeoutException, ExecutionException {
- ProjectCreatedEvent event = new ProjectCreatedEvent();
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest.send("topic", wrap(event)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- compareWithExpectedEvent(eventConsumer, eventCaptor, event);
- }
-
- @Test
- public void shouldReconnectSubscribers()
- throws InterruptedException, TimeoutException, ExecutionException {
- ArgumentCaptor<Event> newConsumerArgCaptor = ArgumentCaptor.forClass(Event.class);
-
- ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest
- .send("topic", wrap(eventForTopic))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
-
- Consumer<Event> newConsumer = mockEventConsumer();
-
- clearInvocations(eventConsumer);
-
- brokerApiUnderTest.disconnect();
- brokerApiUnderTest.receiveAsync("topic", newConsumer);
- brokerApiUnderTest
- .send("topic", wrap(eventForTopic))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- compareWithExpectedEvent(newConsumer, newConsumerArgCaptor, eventForTopic);
- verify(eventConsumer, never()).accept(eventCaptor.capture());
- }
-
- @Test
- public void shouldDisconnectSubscribers()
- throws InterruptedException, TimeoutException, ExecutionException {
- ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
- brokerApiUnderTest.disconnect();
-
- brokerApiUnderTest
- .send("topic", wrap(eventForTopic))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- verify(eventConsumer, never()).accept(eventCaptor.capture());
- }
-
- @Test
- public void shouldBeAbleToSwitchBrokerAndReconnectSubscribers()
- throws InterruptedException, TimeoutException, ExecutionException {
- ArgumentCaptor<Event> newConsumerArgCaptor = ArgumentCaptor.forClass(Event.class);
-
- ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
-
- BrokerApi secondaryBroker = new InProcessBrokerApi();
- brokerApiUnderTest.disconnect();
- secondaryBroker.receiveAsync("topic", eventConsumer);
-
- clearInvocations(eventConsumer);
-
- brokerApiUnderTest
- .send("topic", wrap(eventForTopic))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
- verify(eventConsumer, never()).accept(eventCaptor.capture());
-
- clearInvocations(eventConsumer);
- secondaryBroker.send("topic", wrap(eventForTopic)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- compareWithExpectedEvent(eventConsumer, newConsumerArgCaptor, eventForTopic);
- }
-
- @Test
- public void shouldReplayAllEvents()
- throws InterruptedException, TimeoutException, ExecutionException {
- ProjectCreatedEvent event = new ProjectCreatedEvent();
-
- brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-
- assertThat(
- brokerApiUnderTest
- .send("topic", wrap(event))
- .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS))
- .isTrue();
-
- verify(eventConsumer, times(1)).accept(eventCaptor.capture());
- compareWithExpectedEvent(eventConsumer, eventCaptor, event);
- reset(eventConsumer);
-
- brokerApiUnderTest.replayAllEvents("topic");
- verify(eventConsumer, times(1)).accept(eventCaptor.capture());
- compareWithExpectedEvent(eventConsumer, eventCaptor, event);
- }
-
- @Test
- public void shouldSkipReplayAllEventsWhenTopicDoesNotExists() {
- brokerApiUnderTest.replayAllEvents("unexistentTopic");
- verify(eventConsumer, times(0)).accept(eventCaptor.capture());
- }
-
- private ProjectCreatedEvent testProjectCreatedEvent(String s) {
- ProjectCreatedEvent eventForTopic = new ProjectCreatedEvent();
- eventForTopic.projectName = s;
- return eventForTopic;
- }
-
- private interface Subscriber extends Consumer<Event> {
-
- @Override
- @Subscribe
- void accept(Event eventMessage);
- }
-
- @SuppressWarnings("unchecked")
- private <T> Consumer<T> mockEventConsumer() {
- return (Consumer<T>) Mockito.mock(Subscriber.class);
- }
-
- private void compareWithExpectedEvent(
- Consumer<Event> eventConsumer, ArgumentCaptor<Event> eventCaptor, Event expectedEvent) {
- verify(eventConsumer, times(1)).accept(eventCaptor.capture());
- assertThat(eventCaptor.getValue()).isEqualTo(expectedEvent);
- }
-
- private JsonObject eventToJson(Event event) {
- return gson.toJsonTree(event).getAsJsonObject();
- }
-}
diff --git a/src/test/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApiTest.java b/src/test/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApiTest.java
new file mode 100644
index 0000000..eee2bdf
--- /dev/null
+++ b/src/test/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApiTest.java
@@ -0,0 +1,129 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import static com.gerritforge.gerrit.eventbroker.TopicSubscriber.topicSubscriber;
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+
+import com.google.gerrit.server.events.Event;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InProcessBrokerApiTest {
+
+ public static final int SEND_FUTURE_TIMEOUT = 1;
+ Consumer<Event> eventConsumer;
+
+ BrokerApi brokerApiUnderTest;
+ UUID instanceId = UUID.randomUUID();
+
+ @Before
+ public void setup() {
+ brokerApiUnderTest = new InProcessBrokerApi();
+ eventConsumer = mockEventConsumer();
+ }
+
+ @Test
+ public void sendEventShouldNotBeSupported() {
+ assertThrows(UnsupportedOperationException.class, () -> brokerApiUnderTest.send("topic", null));
+ }
+
+ @Test
+ public void shouldRegisterConsumerPerTopic() {
+ Consumer<Event> secondConsumer = mockEventConsumer();
+ brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+ brokerApiUnderTest.receiveAsync("topic2", secondConsumer);
+ assertThat(brokerApiUnderTest.topicSubscribers().size()).isEqualTo(2);
+ }
+
+ @Test
+ public void shouldReturnMapOfConsumersPerTopic() {
+ Consumer<Event> firstConsumerTopicA = mockEventConsumer();
+
+ Consumer<Event> secondConsumerTopicA = mockEventConsumer();
+ Consumer<Event> thirdConsumerTopicB = mockEventConsumer();
+
+ brokerApiUnderTest.receiveAsync("TopicA", firstConsumerTopicA);
+ brokerApiUnderTest.receiveAsync("TopicA", secondConsumerTopicA);
+ brokerApiUnderTest.receiveAsync("TopicB", thirdConsumerTopicB);
+
+ Set<TopicSubscriber> consumersMap = brokerApiUnderTest.topicSubscribers();
+
+ assertThat(consumersMap).isNotNull();
+ assertThat(consumersMap).isNotEmpty();
+ assertThat(consumersMap)
+ .containsExactly(
+ topicSubscriber("TopicA", firstConsumerTopicA),
+ topicSubscriber("TopicA", secondConsumerTopicA),
+ topicSubscriber("TopicB", thirdConsumerTopicB));
+ }
+
+ @Test
+ public void shouldDeliverAsynchronouslyEventToAllRegisteredConsumers() {
+ Consumer<Event> secondConsumer = mockEventConsumer();
+ brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+ brokerApiUnderTest.receiveAsync("topic", secondConsumer);
+ assertThat(brokerApiUnderTest.topicSubscribers().size()).isEqualTo(2);
+ }
+
+ @Test
+ public void shouldNotRegisterTheSameConsumerTwicePerTopic() {
+ brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+ brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+
+ assertThat(brokerApiUnderTest.topicSubscribers().size()).isEqualTo(1);
+ }
+
+ @Test
+ public void shouldReconnectSubscribers() {
+ brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+ assertThat(brokerApiUnderTest.topicSubscribers()).isNotEmpty();
+
+ Consumer<Event> newConsumer = mockEventConsumer();
+
+ brokerApiUnderTest.disconnect();
+ assertThat(brokerApiUnderTest.topicSubscribers()).isEmpty();
+
+ brokerApiUnderTest.receiveAsync("topic", newConsumer);
+ assertThat(brokerApiUnderTest.topicSubscribers()).isNotEmpty();
+ }
+
+ @Test
+ public void shouldDisconnectSubscribers() {
+ brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+ brokerApiUnderTest.disconnect();
+ assertThat(brokerApiUnderTest.topicSubscribers()).isEmpty();
+ }
+
+ @Test
+ public void replayAllEventsShouldNotBeSupported() {
+ assertThrows(
+ UnsupportedOperationException.class, () -> brokerApiUnderTest.replayAllEvents("topic"));
+ }
+
+ private static class Subscriber<T> implements Consumer<T> {
+
+ @Override
+ public void accept(T eventMessage) {}
+ }
+
+ private <T> Consumer<T> mockEventConsumer() {
+ return new Subscriber<>();
+ }
+}