Merge branch 'stable-3.5' into stable-3.6

* stable-3.5:
  Set project and Gerrit version to v3.5.6
  Remove Maven legacy pom.xml
  Make events-broker a Gerrit plugin
  Provide subscribers with consumer's group id
  Remove actual implementation of InProcessBrokerApi
  Set event-broker and gerrit version to v3.4.8
  Add interface ExtendedBrokerApi to consume messages with groupId

Change-Id: I9f487679f9363081c86272a27bc88e10e8af9101
diff --git a/BUILD b/BUILD
index b409412..2d0d9ae 100644
--- a/BUILD
+++ b/BUILD
@@ -11,6 +11,7 @@
 gerrit_plugin(
     name = "events-broker",
     srcs = glob(["src/main/java/**/*.java"]),
+    deps = [],
 )
 
 junit_tests(
diff --git a/pom.xml b/pom.xml
deleted file mode 100644
index e8985e5..0000000
--- a/pom.xml
+++ /dev/null
@@ -1,170 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <groupId>com.gerritforge</groupId>
-    <artifactId>events-broker</artifactId>
-    <version>3.6.3</version>
-    <packaging>jar</packaging>
-
-    <name>events-broker</name>
-    <description>Events Broker API for use with Gerrit Code Review</description>
-    <url>https://gerrit.googlesource.com/modules/events-broker</url>
-
-    <licenses>
-        <license>
-            <name>The Apache Software License, Version 2.0</name>
-            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-            <distribution>repo</distribution>
-        </license>
-    </licenses>
-
-    <scm>
-        <url>https://gerrit.googlesource.com/modules/events-broker</url>
-        <connection>https://gerrit.googlesource.com/modules/events-broker.git</connection>
-    </scm>
-
-    <developers>
-        <developer>
-            <name>Luca Milanesio</name>
-        </developer>
-        <developer>
-            <name>Marcin Czech</name>
-        </developer>
-        <developer>
-            <name>Antonio Barone</name>
-        </developer>
-    </developers>
-
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <auto-value.version>1.7.4</auto-value.version>
-        <gerrit.version>${project.version}</gerrit.version>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>com.google.gerrit</groupId>
-            <artifactId>gerrit-plugin-api</artifactId>
-            <version>${gerrit.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.google.gerrit</groupId>
-            <artifactId>gerrit-acceptance-framework</artifactId>
-            <version>${gerrit.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.bouncycastle</groupId>
-            <artifactId>bcprov-jdk15on</artifactId>
-            <version>1.61</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.12</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>3.6.1</version>
-                <configuration>
-                    <source>11</source>
-                    <target>11</target>
-                    <annotationProcessorPaths>
-                        <path>
-                            <groupId>com.google.auto.value</groupId>
-                            <artifactId>auto-value</artifactId>
-                            <version>${auto-value.version}</version>
-                        </path>
-                    </annotationProcessorPaths>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-source-plugin</artifactId>
-                <version>3.1.0</version>
-                <executions>
-                    <execution>
-                        <id>attach-sources</id>
-                        <goals>
-                            <goal>jar</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-javadoc-plugin</artifactId>
-                <version>3.4.0</version>
-                <executions>
-                    <execution>
-                        <id>attach-javadocs</id>
-                        <goals>
-                            <goal>jar</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-release-plugin</artifactId>
-                <version>2.5.3</version>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-gpg-plugin</artifactId>
-                <version>1.6</version>
-                <executions>
-                    <execution>
-                        <id>sign-artifacts</id>
-                        <phase>verify</phase>
-                        <goals>
-                            <goal>sign</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>com.theoryinpractise</groupId>
-                <artifactId>googleformatter-maven-plugin</artifactId>
-                <version>1.7.3</version>
-                <executions>
-                    <execution>
-                        <id>reformat-sources</id>
-                        <configuration>
-                            <includeStale>false</includeStale>
-                            <style>GOOGLE</style>
-                            <formatMain>true</formatMain>
-                            <formatTest>true</formatTest>
-                            <filterModified>false</filterModified>
-                            <skip>false</skip>
-                            <fixImports>true</fixImports>
-                            <maxLineLength>100</maxLineLength>
-                        </configuration>
-                        <goals>
-                            <goal>format</goal>
-                        </goals>
-                        <phase>process-sources</phase>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-
-    <distributionManagement>
-        <repository>
-            <id>sonatype-nexus-staging</id>
-            <name>Sonatype Nexus Staging</name>
-            <url>https://oss.sonatype.org/service/local/staging/deploy/maven2</url>
-        </repository>
-    </distributionManagement>
-</project>
-
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/ExtendedBrokerApi.java b/src/main/java/com/gerritforge/gerrit/eventbroker/ExtendedBrokerApi.java
new file mode 100644
index 0000000..fa7cb74
--- /dev/null
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/ExtendedBrokerApi.java
@@ -0,0 +1,38 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import com.google.gerrit.server.events.Event;
+import java.util.Set;
+import java.util.function.Consumer;
+
+public interface ExtendedBrokerApi extends BrokerApi {
+
+  /**
+   * Receive asynchronously a message from a topic using a consumer's group id.
+   *
+   * @param topic topic name
+   * @param groupId the group identifier that consumer belongs to for that topic
+   * @param consumer an operation that accepts and process a single message
+   */
+  void receiveAsync(String topic, String groupId, Consumer<Event> consumer);
+
+  /**
+   * Get the active subscribers with their consumer's group id.
+   *
+   * @return {@link Set} of the topics subscribers using a consumer's group id..
+   */
+  Set<TopicSubscriberWithGroupId> topicSubscribersWithGroupId();
+}
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
index a039d24..804a2fd 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
@@ -15,65 +15,40 @@
 package com.gerritforge.gerrit.eventbroker;
 
 import static com.gerritforge.gerrit.eventbroker.TopicSubscriber.topicSubscriber;
+import static com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId.topicSubscriberWithGroupId;
 
-import com.google.common.collect.EvictingQueue;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.MapMaker;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
 import com.google.common.flogger.FluentLogger;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import com.google.gerrit.server.events.Event;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 import java.util.function.Consumer;
 
-public class InProcessBrokerApi implements BrokerApi {
+public class InProcessBrokerApi implements ExtendedBrokerApi {
   private static final FluentLogger log = FluentLogger.forEnclosingClass();
-
-  private static final Integer DEFAULT_MESSAGE_QUEUE_SIZE = 100;
-
-  private final Map<String, EvictingQueue<Event>> messagesQueueMap;
-  private final Map<String, EventBus> eventBusMap;
   private final Set<TopicSubscriber> topicSubscribers;
+  private final Set<TopicSubscriberWithGroupId> topicSubscribersWithGroupId;
 
   public InProcessBrokerApi() {
-    this.eventBusMap = new MapMaker().concurrencyLevel(1).makeMap();
-    this.messagesQueueMap = new MapMaker().concurrencyLevel(1).makeMap();
     this.topicSubscribers = new HashSet<>();
+    this.topicSubscribersWithGroupId = new HashSet<>();
   }
 
   @Override
   public ListenableFuture<Boolean> send(String topic, Event message) {
-    EventBus topicEventConsumers = eventBusMap.get(topic);
-    SettableFuture<Boolean> future = SettableFuture.create();
-
-    if (topicEventConsumers != null) {
-      topicEventConsumers.post(message);
-      future.set(true);
-    } else {
-      future.set(false);
-    }
-
-    return future;
+    return unsupported();
   }
 
   @Override
   public void receiveAsync(String topic, Consumer<Event> eventConsumer) {
-    EventBus topicEventConsumers = eventBusMap.get(topic);
-    if (topicEventConsumers == null) {
-      topicEventConsumers = new EventBus(topic);
-      eventBusMap.put(topic, topicEventConsumers);
-    }
-
-    topicEventConsumers.register(eventConsumer);
     topicSubscribers.add(topicSubscriber(topic, eventConsumer));
+  }
 
-    EvictingQueue<Event> messageQueue = EvictingQueue.create(DEFAULT_MESSAGE_QUEUE_SIZE);
-    messagesQueueMap.put(topic, messageQueue);
-    topicEventConsumers.register(new EventBusMessageRecorder(messageQueue));
+  @Override
+  public void receiveAsync(String topic, String groupId, Consumer<Event> eventConsumer) {
+    topicSubscribersWithGroupId.add(
+        topicSubscriberWithGroupId(groupId, topicSubscriber(topic, eventConsumer)));
   }
 
   @Override
@@ -82,29 +57,23 @@
   }
 
   @Override
+  public Set<TopicSubscriberWithGroupId> topicSubscribersWithGroupId() {
+    return ImmutableSet.copyOf(topicSubscribersWithGroupId);
+  }
+
+  @Override
   public void disconnect() {
-    this.eventBusMap.clear();
+    this.topicSubscribers.clear();
+    this.topicSubscribersWithGroupId.clear();
   }
 
   @Override
   public void replayAllEvents(String topic) {
-    if (messagesQueueMap.containsKey(topic)) {
-      messagesQueueMap.get(topic).stream().forEach(eventMessage -> send(topic, eventMessage));
-    }
+    unsupported();
   }
 
-  private static class EventBusMessageRecorder {
-    private EvictingQueue messagesQueue;
-
-    public EventBusMessageRecorder(EvictingQueue messagesQueue) {
-      this.messagesQueue = messagesQueue;
-    }
-
-    @Subscribe
-    public void recordCustomerChange(Event e) {
-      if (!messagesQueue.contains(e)) {
-        messagesQueue.add(e);
-      }
-    }
+  private <T> T unsupported() {
+    throw new UnsupportedOperationException(
+        "InProcessBrokerApi is not intended to be used as a real broker");
   }
 }
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/TopicSubscriberWithGroupId.java b/src/main/java/com/gerritforge/gerrit/eventbroker/TopicSubscriberWithGroupId.java
new file mode 100644
index 0000000..3ce2d56
--- /dev/null
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/TopicSubscriberWithGroupId.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import com.google.auto.value.AutoValue;
+
+@AutoValue
+public abstract class TopicSubscriberWithGroupId {
+  public static TopicSubscriberWithGroupId topicSubscriberWithGroupId(
+      String groupId, TopicSubscriber topicSubscriber) {
+    return new AutoValue_TopicSubscriberWithGroupId(groupId, topicSubscriber);
+  }
+
+  public abstract String groupId();
+
+  public abstract TopicSubscriber topicSubscriber();
+}
diff --git a/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java b/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
deleted file mode 100644
index 5f6c73d..0000000
--- a/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
+++ /dev/null
@@ -1,289 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.gerrit.eventbroker;
-
-import static com.gerritforge.gerrit.eventbroker.TopicSubscriber.topicSubscriber;
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import com.google.common.eventbus.Subscribe;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.ProjectCreatedEvent;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Consumer;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class BrokerApiTest {
-
-  public static final int SEND_FUTURE_TIMEOUT = 1;
-  @Captor ArgumentCaptor<Event> eventCaptor;
-  Consumer<Event> eventConsumer;
-
-  BrokerApi brokerApiUnderTest;
-  UUID instanceId = UUID.randomUUID();
-  private Gson gson = new Gson();
-
-  @Before
-  public void setup() {
-    brokerApiUnderTest = new InProcessBrokerApi();
-    eventConsumer = mockEventConsumer();
-  }
-
-  @Test
-  public void shouldSendEvent() throws InterruptedException, TimeoutException, ExecutionException {
-    ProjectCreatedEvent event = new ProjectCreatedEvent();
-
-    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-
-    assertThat(brokerApiUnderTest.send("topic", wrap(event)).get(1, TimeUnit.SECONDS)).isTrue();
-
-    compareWithExpectedEvent(eventConsumer, eventCaptor, event);
-  }
-
-  private Event wrap(ProjectCreatedEvent event) {
-    return event;
-  }
-
-  @Test
-  public void shouldRegisterConsumerPerTopic()
-      throws InterruptedException, TimeoutException, ExecutionException {
-    Consumer<Event> secondConsumer = mockEventConsumer();
-    ArgumentCaptor<Event> secondArgCaptor = ArgumentCaptor.forClass(Event.class);
-
-    ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
-    ProjectCreatedEvent eventForTopic2 = testProjectCreatedEvent("Project name 2");
-
-    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-    brokerApiUnderTest.receiveAsync("topic2", secondConsumer);
-    brokerApiUnderTest
-        .send("topic", wrap(eventForTopic))
-        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-    brokerApiUnderTest
-        .send("topic2", wrap(eventForTopic2))
-        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
-    compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
-    compareWithExpectedEvent(secondConsumer, secondArgCaptor, eventForTopic2);
-  }
-
-  @Test
-  public void shouldReturnMapOfConsumersPerTopic() {
-    Consumer<Event> firstConsumerTopicA = mockEventConsumer();
-
-    Consumer<Event> secondConsumerTopicA = mockEventConsumer();
-    Consumer<Event> thirdConsumerTopicB = mockEventConsumer();
-
-    brokerApiUnderTest.receiveAsync("TopicA", firstConsumerTopicA);
-    brokerApiUnderTest.receiveAsync("TopicA", secondConsumerTopicA);
-    brokerApiUnderTest.receiveAsync("TopicB", thirdConsumerTopicB);
-
-    Set<TopicSubscriber> consumersMap = brokerApiUnderTest.topicSubscribers();
-
-    assertThat(consumersMap).isNotNull();
-    assertThat(consumersMap).isNotEmpty();
-    assertThat(consumersMap)
-        .containsExactly(
-            topicSubscriber("TopicA", firstConsumerTopicA),
-            topicSubscriber("TopicA", secondConsumerTopicA),
-            topicSubscriber("TopicB", thirdConsumerTopicB));
-  }
-
-  @Test
-  public void shouldDeliverAsynchronouslyEventToAllRegisteredConsumers()
-      throws InterruptedException, TimeoutException, ExecutionException {
-    Consumer<Event> secondConsumer = mockEventConsumer();
-    ArgumentCaptor<Event> secondArgCaptor = ArgumentCaptor.forClass(Event.class);
-
-    ProjectCreatedEvent event = testProjectCreatedEvent("Project name");
-
-    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-    brokerApiUnderTest.receiveAsync("topic", secondConsumer);
-    brokerApiUnderTest.send("topic", wrap(event)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
-    compareWithExpectedEvent(eventConsumer, eventCaptor, event);
-    compareWithExpectedEvent(secondConsumer, secondArgCaptor, event);
-  }
-
-  @Test
-  public void shouldReceiveEventsOnlyFromRegisteredTopic()
-      throws InterruptedException, TimeoutException, ExecutionException {
-
-    ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
-
-    ProjectCreatedEvent eventForTopic2 = testProjectCreatedEvent("Project name 2");
-
-    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-    brokerApiUnderTest
-        .send("topic", wrap(eventForTopic))
-        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-    brokerApiUnderTest
-        .send("topic2", wrap(eventForTopic2))
-        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
-    compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
-  }
-
-  @Test
-  public void shouldNotRegisterTheSameConsumerTwicePerTopic()
-      throws InterruptedException, TimeoutException, ExecutionException {
-    ProjectCreatedEvent event = new ProjectCreatedEvent();
-
-    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-    brokerApiUnderTest.send("topic", wrap(event)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
-    compareWithExpectedEvent(eventConsumer, eventCaptor, event);
-  }
-
-  @Test
-  public void shouldReconnectSubscribers()
-      throws InterruptedException, TimeoutException, ExecutionException {
-    ArgumentCaptor<Event> newConsumerArgCaptor = ArgumentCaptor.forClass(Event.class);
-
-    ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
-
-    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-    brokerApiUnderTest
-        .send("topic", wrap(eventForTopic))
-        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
-    compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
-
-    Consumer<Event> newConsumer = mockEventConsumer();
-
-    clearInvocations(eventConsumer);
-
-    brokerApiUnderTest.disconnect();
-    brokerApiUnderTest.receiveAsync("topic", newConsumer);
-    brokerApiUnderTest
-        .send("topic", wrap(eventForTopic))
-        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
-    compareWithExpectedEvent(newConsumer, newConsumerArgCaptor, eventForTopic);
-    verify(eventConsumer, never()).accept(eventCaptor.capture());
-  }
-
-  @Test
-  public void shouldDisconnectSubscribers()
-      throws InterruptedException, TimeoutException, ExecutionException {
-    ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
-
-    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-    brokerApiUnderTest.disconnect();
-
-    brokerApiUnderTest
-        .send("topic", wrap(eventForTopic))
-        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
-    verify(eventConsumer, never()).accept(eventCaptor.capture());
-  }
-
-  @Test
-  public void shouldBeAbleToSwitchBrokerAndReconnectSubscribers()
-      throws InterruptedException, TimeoutException, ExecutionException {
-    ArgumentCaptor<Event> newConsumerArgCaptor = ArgumentCaptor.forClass(Event.class);
-
-    ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
-
-    BrokerApi secondaryBroker = new InProcessBrokerApi();
-    brokerApiUnderTest.disconnect();
-    secondaryBroker.receiveAsync("topic", eventConsumer);
-
-    clearInvocations(eventConsumer);
-
-    brokerApiUnderTest
-        .send("topic", wrap(eventForTopic))
-        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-    verify(eventConsumer, never()).accept(eventCaptor.capture());
-
-    clearInvocations(eventConsumer);
-    secondaryBroker.send("topic", wrap(eventForTopic)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
-    compareWithExpectedEvent(eventConsumer, newConsumerArgCaptor, eventForTopic);
-  }
-
-  @Test
-  public void shouldReplayAllEvents()
-      throws InterruptedException, TimeoutException, ExecutionException {
-    ProjectCreatedEvent event = new ProjectCreatedEvent();
-
-    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-
-    assertThat(
-            brokerApiUnderTest
-                .send("topic", wrap(event))
-                .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS))
-        .isTrue();
-
-    verify(eventConsumer, times(1)).accept(eventCaptor.capture());
-    compareWithExpectedEvent(eventConsumer, eventCaptor, event);
-    reset(eventConsumer);
-
-    brokerApiUnderTest.replayAllEvents("topic");
-    verify(eventConsumer, times(1)).accept(eventCaptor.capture());
-    compareWithExpectedEvent(eventConsumer, eventCaptor, event);
-  }
-
-  @Test
-  public void shouldSkipReplayAllEventsWhenTopicDoesNotExists() {
-    brokerApiUnderTest.replayAllEvents("unexistentTopic");
-    verify(eventConsumer, times(0)).accept(eventCaptor.capture());
-  }
-
-  private ProjectCreatedEvent testProjectCreatedEvent(String s) {
-    ProjectCreatedEvent eventForTopic = new ProjectCreatedEvent();
-    eventForTopic.projectName = s;
-    return eventForTopic;
-  }
-
-  private interface Subscriber extends Consumer<Event> {
-
-    @Override
-    @Subscribe
-    void accept(Event eventMessage);
-  }
-
-  @SuppressWarnings("unchecked")
-  private <T> Consumer<T> mockEventConsumer() {
-    return (Consumer<T>) Mockito.mock(Subscriber.class);
-  }
-
-  private void compareWithExpectedEvent(
-      Consumer<Event> eventConsumer, ArgumentCaptor<Event> eventCaptor, Event expectedEvent) {
-    verify(eventConsumer, times(1)).accept(eventCaptor.capture());
-    assertThat(eventCaptor.getValue()).isEqualTo(expectedEvent);
-  }
-
-  private JsonObject eventToJson(Event event) {
-    return gson.toJsonTree(event).getAsJsonObject();
-  }
-}
diff --git a/src/test/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApiTest.java b/src/test/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApiTest.java
new file mode 100644
index 0000000..eee2bdf
--- /dev/null
+++ b/src/test/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApiTest.java
@@ -0,0 +1,129 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import static com.gerritforge.gerrit.eventbroker.TopicSubscriber.topicSubscriber;
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+
+import com.google.gerrit.server.events.Event;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InProcessBrokerApiTest {
+
+  public static final int SEND_FUTURE_TIMEOUT = 1;
+  Consumer<Event> eventConsumer;
+
+  BrokerApi brokerApiUnderTest;
+  UUID instanceId = UUID.randomUUID();
+
+  @Before
+  public void setup() {
+    brokerApiUnderTest = new InProcessBrokerApi();
+    eventConsumer = mockEventConsumer();
+  }
+
+  @Test
+  public void sendEventShouldNotBeSupported() {
+    assertThrows(UnsupportedOperationException.class, () -> brokerApiUnderTest.send("topic", null));
+  }
+
+  @Test
+  public void shouldRegisterConsumerPerTopic() {
+    Consumer<Event> secondConsumer = mockEventConsumer();
+    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+    brokerApiUnderTest.receiveAsync("topic2", secondConsumer);
+    assertThat(brokerApiUnderTest.topicSubscribers().size()).isEqualTo(2);
+  }
+
+  @Test
+  public void shouldReturnMapOfConsumersPerTopic() {
+    Consumer<Event> firstConsumerTopicA = mockEventConsumer();
+
+    Consumer<Event> secondConsumerTopicA = mockEventConsumer();
+    Consumer<Event> thirdConsumerTopicB = mockEventConsumer();
+
+    brokerApiUnderTest.receiveAsync("TopicA", firstConsumerTopicA);
+    brokerApiUnderTest.receiveAsync("TopicA", secondConsumerTopicA);
+    brokerApiUnderTest.receiveAsync("TopicB", thirdConsumerTopicB);
+
+    Set<TopicSubscriber> consumersMap = brokerApiUnderTest.topicSubscribers();
+
+    assertThat(consumersMap).isNotNull();
+    assertThat(consumersMap).isNotEmpty();
+    assertThat(consumersMap)
+        .containsExactly(
+            topicSubscriber("TopicA", firstConsumerTopicA),
+            topicSubscriber("TopicA", secondConsumerTopicA),
+            topicSubscriber("TopicB", thirdConsumerTopicB));
+  }
+
+  @Test
+  public void shouldDeliverAsynchronouslyEventToAllRegisteredConsumers() {
+    Consumer<Event> secondConsumer = mockEventConsumer();
+    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+    brokerApiUnderTest.receiveAsync("topic", secondConsumer);
+    assertThat(brokerApiUnderTest.topicSubscribers().size()).isEqualTo(2);
+  }
+
+  @Test
+  public void shouldNotRegisterTheSameConsumerTwicePerTopic() {
+    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+
+    assertThat(brokerApiUnderTest.topicSubscribers().size()).isEqualTo(1);
+  }
+
+  @Test
+  public void shouldReconnectSubscribers() {
+    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+    assertThat(brokerApiUnderTest.topicSubscribers()).isNotEmpty();
+
+    Consumer<Event> newConsumer = mockEventConsumer();
+
+    brokerApiUnderTest.disconnect();
+    assertThat(brokerApiUnderTest.topicSubscribers()).isEmpty();
+
+    brokerApiUnderTest.receiveAsync("topic", newConsumer);
+    assertThat(brokerApiUnderTest.topicSubscribers()).isNotEmpty();
+  }
+
+  @Test
+  public void shouldDisconnectSubscribers() {
+    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+    brokerApiUnderTest.disconnect();
+    assertThat(brokerApiUnderTest.topicSubscribers()).isEmpty();
+  }
+
+  @Test
+  public void replayAllEventsShouldNotBeSupported() {
+    assertThrows(
+        UnsupportedOperationException.class, () -> brokerApiUnderTest.replayAllEvents("topic"));
+  }
+
+  private static class Subscriber<T> implements Consumer<T> {
+
+    @Override
+    public void accept(T eventMessage) {}
+  }
+
+  private <T> Consumer<T> mockEventConsumer() {
+    return new Subscriber<>();
+  }
+}