Merge branch 'stable-3.2' into stable-3.3

* stable-3.2:
  Set to Gerrit v3.2.6
  Set to Gerrit v3.1.11
  Initial events-broker implementation for Gerrit v3.0.x

Change-Id: I36f52f9501d10811a59a739c802ead4785d4db1d
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..16f1fc2
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,30 @@
+# Compiled class file
+*.class
+
+# Log file
+*.log
+
+# BlueJ files
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+
+# Maven target
+target
+
+# IntelliJ
+.idea
+*.iml
diff --git a/BUILD b/BUILD
new file mode 100644
index 0000000..6ac0fac
--- /dev/null
+++ b/BUILD
@@ -0,0 +1,26 @@
+package(default_visibility = ["//visibility:public"])
+
+load("@rules_java//java:defs.bzl", "java_library")
+load("//tools/bzl:junit.bzl", "junit_tests")
+load(
+    "//tools/bzl:plugin.bzl",
+    "PLUGIN_DEPS",
+    "PLUGIN_DEPS_NEVERLINK",
+    "PLUGIN_TEST_DEPS",
+)
+
+java_library(
+    name = "events-broker",
+    srcs = glob(["src/main/java/**/*.java"]),
+    deps = PLUGIN_DEPS_NEVERLINK,
+)
+
+junit_tests(
+    name = "events_broker_tests",
+    size = "small",
+    srcs = glob(["src/test/java/**/*.java"]),
+    tags = ["events-broker"],
+    deps = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
+        ":events-broker",
+    ],
+)
diff --git a/Jenkinsfile b/Jenkinsfile
new file mode 100644
index 0000000..a55ac4e
--- /dev/null
+++ b/Jenkinsfile
@@ -0,0 +1,2 @@
+pluginPipeline(formatCheckId: 'gerritforge:events-broker-format',
+               buildCheckId: 'gerritforge:events-broker-build')
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..261eeb9
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..5541da1
--- /dev/null
+++ b/README.md
@@ -0,0 +1,6 @@
+# Events Broker API for Gerrit Code Review
+
+API of a generic events broker for use with Gerrit Code Review.
+
+Enables the de-coupling between Gerrit, plugins and the different implementations
+of a generic events broker.
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..a67801a
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,170 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.gerritforge</groupId>
+    <artifactId>events-broker</artifactId>
+    <version>3.2.6</version>
+    <packaging>jar</packaging>
+
+    <name>events-broker</name>
+    <description>Events Broker API for use with Gerrit Code Review</description>
+    <url>https://gerrit.googlesource.com/modules/events-broker</url>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
+    <scm>
+        <url>https://gerrit.googlesource.com/modules/events-broker</url>
+        <connection>https://gerrit.googlesource.com/modules/events-broker.git</connection>
+    </scm>
+
+    <developers>
+        <developer>
+            <name>Luca Milanesio</name>
+        </developer>
+        <developer>
+            <name>Marcin Czech</name>
+        </developer>
+        <developer>
+            <name>Antonio Barone</name>
+        </developer>
+    </developers>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <gerrit.version>3.2.6</gerrit.version>
+        <auto-value.version>1.7.4</auto-value.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.google.gerrit</groupId>
+            <artifactId>gerrit-plugin-api</artifactId>
+            <version>${gerrit.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.gerrit</groupId>
+            <artifactId>gerrit-acceptance-framework</artifactId>
+            <version>${gerrit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk15on</artifactId>
+            <version>1.61</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                    <annotationProcessorPaths>
+                        <path>
+                            <groupId>com.google.auto.value</groupId>
+                            <artifactId>auto-value</artifactId>
+                            <version>${auto-value.version}</version>
+                        </path>
+                    </annotationProcessorPaths>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.1.0</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>3.1.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-javadocs</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-release-plugin</artifactId>
+                <version>2.5.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-gpg-plugin</artifactId>
+                <version>1.6</version>
+                <executions>
+                    <execution>
+                        <id>sign-artifacts</id>
+                        <phase>verify</phase>
+                        <goals>
+                            <goal>sign</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>com.theoryinpractise</groupId>
+                <artifactId>googleformatter-maven-plugin</artifactId>
+                <version>1.7.3</version>
+                <executions>
+                    <execution>
+                        <id>reformat-sources</id>
+                        <configuration>
+                            <includeStale>false</includeStale>
+                            <style>GOOGLE</style>
+                            <formatMain>true</formatMain>
+                            <formatTest>true</formatTest>
+                            <filterModified>false</filterModified>
+                            <skip>false</skip>
+                            <fixImports>true</fixImports>
+                            <maxLineLength>100</maxLineLength>
+                        </configuration>
+                        <goals>
+                            <goal>format</goal>
+                        </goals>
+                        <phase>process-sources</phase>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <distributionManagement>
+        <repository>
+            <id>sonatype-nexus-staging</id>
+            <name>Sonatype Nexus Staging</name>
+            <url>https://oss.sonatype.org/service/local/staging/deploy/maven2</url>
+        </repository>
+    </distributionManagement>
+</project>
+
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java
new file mode 100644
index 0000000..a74bdc2
--- /dev/null
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java
@@ -0,0 +1,71 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
+import com.google.gerrit.server.events.Event;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+/** API for sending/receiving events through a message Broker. */
+public interface BrokerApi {
+
+  /**
+   * Creates a {@link EventMessage} for an event
+   *
+   * @param instanceId {@link UUID} of the Gerrit instance originating the event
+   * @param event Gerrit event
+   * @return {@link EventMessage} object
+   */
+  default EventMessage newMessage(UUID instanceId, Event event) {
+
+    return new EventMessage(new Header(UUID.randomUUID(), instanceId), event);
+  }
+
+  /**
+   * Send an message to a topic.
+   *
+   * @param topic topic name
+   * @param message to be send to the topic
+   * @return true if the message was successfully sent. False otherwise.
+   */
+  boolean send(String topic, EventMessage message);
+
+  /**
+   * Receive asynchronously a message from a topic.
+   *
+   * @param topic topic name
+   * @param consumer an operation that accepts and process a single message
+   */
+  void receiveAsync(String topic, Consumer<EventMessage> consumer);
+
+  /**
+   * Get the active subscribers
+   *
+   * @return {@link Set} of the topics subscribers
+   */
+  Set<TopicSubscriber> topicSubscribers();
+
+  /** Disconnect from broker and cancel all active consumers */
+  void disconnect();
+
+  /**
+   * Redeliver all stored messages for specified topic
+   *
+   * @param topic topic name
+   */
+  void replayAllEvents(String topic);
+}
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApiModule.java b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApiModule.java
new file mode 100644
index 0000000..ae6f216
--- /dev/null
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApiModule.java
@@ -0,0 +1,37 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Scopes;
+
+public class BrokerApiModule extends AbstractModule {
+  DynamicItem<BrokerApi> currentBrokerApi;
+
+  @Inject(optional = true)
+  public void setPreviousBrokerApi(DynamicItem<BrokerApi> currentBrokerApi) {
+    this.currentBrokerApi = currentBrokerApi;
+  }
+
+  @Override
+  protected void configure() {
+    if (currentBrokerApi == null) {
+      DynamicItem.itemOf(binder(), BrokerApi.class);
+      DynamicItem.bind(binder(), BrokerApi.class).to(InProcessBrokerApi.class).in(Scopes.SINGLETON);
+    }
+  }
+}
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/EventGsonProvider.java b/src/main/java/com/gerritforge/gerrit/eventbroker/EventGsonProvider.java
new file mode 100644
index 0000000..bf55621
--- /dev/null
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/EventGsonProvider.java
@@ -0,0 +1,85 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import com.google.common.base.Supplier;
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.change.ChangeKeyAdapter;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventDeserializer;
+import com.google.gerrit.server.events.EventTypes;
+import com.google.gerrit.server.events.ProjectNameKeyAdapter;
+import com.google.gerrit.server.events.SupplierDeserializer;
+import com.google.gerrit.server.events.SupplierSerializer;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import com.google.inject.Provider;
+import java.lang.reflect.Type;
+
+public class EventGsonProvider implements Provider<Gson> {
+
+  public static class EventSerializer implements JsonSerializer<Event> {
+
+    @Override
+    public JsonElement serialize(Event src, Type typeOfSrc, JsonSerializationContext context) {
+      String type = src.getType();
+
+      Class<?> cls = EventTypes.getClass(type);
+      if (cls == null) {
+        throw new JsonParseException("Unknown event type: " + type);
+      }
+
+      return context.serialize(src, cls);
+    }
+  }
+
+  public class ProjectNameKeyDeserializer implements JsonDeserializer<Project.NameKey> {
+
+    @Override
+    public Project.NameKey deserialize(
+        JsonElement json, Type type, JsonDeserializationContext context) throws JsonParseException {
+      if (!json.isJsonPrimitive()) {
+        throw new JsonParseException("Not a primitive type");
+      }
+
+      JsonPrimitive jsonPrimitive = (JsonPrimitive) json;
+      if (!jsonPrimitive.isString()) {
+        throw new JsonParseException("Not a string");
+      }
+
+      return Project.nameKey(jsonPrimitive.getAsString());
+    }
+  }
+
+  @Override
+  public Gson get() {
+    return new GsonBuilder()
+        .registerTypeAdapter(Event.class, new EventDeserializer())
+        .registerTypeAdapter(Event.class, new EventSerializer())
+        .registerTypeAdapter(Supplier.class, new SupplierSerializer())
+        .registerTypeAdapter(Supplier.class, new SupplierDeserializer())
+        .registerTypeAdapter(Change.Key.class, new ChangeKeyAdapter())
+        .registerTypeAdapter(Project.NameKey.class, new ProjectNameKeyAdapter())
+        .create();
+  }
+}
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/EventMessage.java b/src/main/java/com/gerritforge/gerrit/eventbroker/EventMessage.java
new file mode 100644
index 0000000..204173e
--- /dev/null
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/EventMessage.java
@@ -0,0 +1,97 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.gerrit.server.events.Event;
+import java.util.UUID;
+
+/**
+ * Enrich an existing {@link Event} object with the information about the source that produced it,
+ * including the Gerrit server instance id. Additionally this class contains an event-id, event-type
+ * and event-created-on fields.
+ */
+public class EventMessage {
+
+  private final Header header;
+  private final Event body;
+
+  public Header getHeader() {
+    return header;
+  }
+
+  /**
+   * Returns deserialized {@code Event} object
+   *
+   * @return {@code Event} class instance
+   */
+  public Event getEvent() {
+    return body;
+  }
+
+  /** Contains all additional information required to successfully send an {@code Event} object. */
+  public static class Header {
+    /** Unique event id. */
+    public final UUID eventId;
+
+    /** Gerrit server instance id from which event was sent. */
+    public final UUID sourceInstanceId;
+
+    /** @deprecated required for interoperability with older JSON wire protocols */
+    public final String eventType;
+
+    public Header(UUID eventId, UUID sourceInstanceId) {
+      this.eventId = eventId;
+      this.sourceInstanceId = sourceInstanceId;
+      this.eventType = "";
+    }
+
+    /** Validate if all required header fields are not null. */
+    public void validate() {
+      requireNonNull(eventId, "EventId cannot be null");
+      requireNonNull(sourceInstanceId, "Source Instance ID cannot be null");
+    }
+
+    @Override
+    public String toString() {
+      return "{" + "eventId=" + eventId + ", sourceInstanceId=" + sourceInstanceId + '}';
+    }
+  }
+
+  /**
+   * Creates a new instance which can be send as a message via {@link BrokerApi}.
+   *
+   * @param header message header object, contains all additional information required to properly
+   *     send the message
+   * @param event {@link Event} object
+   */
+  public EventMessage(Header header, Event event) {
+    this.header = header;
+    this.body = event;
+  }
+
+  /** Validate if all required fields are not null. */
+  public void validate() {
+    requireNonNull(header, "Header cannot be null");
+    requireNonNull(body, "Event cannot be null");
+    header.validate();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Header='%s', Body='%s'", header, body);
+  }
+}
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
new file mode 100644
index 0000000..894e4ed
--- /dev/null
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
@@ -0,0 +1,106 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import static com.gerritforge.gerrit.eventbroker.TopicSubscriber.topicSubscriber;
+
+import com.google.common.collect.EvictingQueue;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.MapMaker;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.flogger.FluentLogger;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+
+public class InProcessBrokerApi implements BrokerApi {
+  private static final FluentLogger log = FluentLogger.forEnclosingClass();
+
+  private static final Integer DEFAULT_MESSAGE_QUEUE_SIZE = 100;
+
+  private final Map<String, EvictingQueue<EventMessage>> messagesQueueMap;
+  private final Map<String, EventBus> eventBusMap;
+  private final Set<TopicSubscriber> topicSubscribers;
+
+  public InProcessBrokerApi() {
+    this.eventBusMap = new MapMaker().concurrencyLevel(1).makeMap();
+    this.messagesQueueMap = new MapMaker().concurrencyLevel(1).makeMap();
+    this.topicSubscribers = new HashSet<>();
+  }
+
+  @Override
+  public boolean send(String topic, EventMessage message) {
+    EventBus topicEventConsumers = eventBusMap.get(topic);
+    try {
+      if (topicEventConsumers != null) {
+        topicEventConsumers.post(message);
+      }
+    } catch (RuntimeException e) {
+      log.atSevere().withCause(e).log();
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public void receiveAsync(String topic, Consumer<EventMessage> eventConsumer) {
+    EventBus topicEventConsumers = eventBusMap.get(topic);
+    if (topicEventConsumers == null) {
+      topicEventConsumers = new EventBus(topic);
+      eventBusMap.put(topic, topicEventConsumers);
+    }
+
+    topicEventConsumers.register(eventConsumer);
+    topicSubscribers.add(topicSubscriber(topic, eventConsumer));
+
+    EvictingQueue<EventMessage> messageQueue = EvictingQueue.create(DEFAULT_MESSAGE_QUEUE_SIZE);
+    messagesQueueMap.put(topic, messageQueue);
+    topicEventConsumers.register(new EventBusMessageRecorder(messageQueue));
+  }
+
+  @Override
+  public Set<TopicSubscriber> topicSubscribers() {
+    return ImmutableSet.copyOf(topicSubscribers);
+  }
+
+  @Override
+  public void disconnect() {
+    this.eventBusMap.clear();
+  }
+
+  @Override
+  public void replayAllEvents(String topic) {
+    if (messagesQueueMap.containsKey(topic)) {
+      messagesQueueMap.get(topic).stream().forEach(eventMessage -> send(topic, eventMessage));
+    }
+  }
+
+  private class EventBusMessageRecorder {
+    private EvictingQueue messagesQueue;
+
+    public EventBusMessageRecorder(EvictingQueue messagesQueue) {
+      this.messagesQueue = messagesQueue;
+    }
+
+    @Subscribe
+    public void recordCustomerChange(EventMessage e) {
+      if (!messagesQueue.contains(e)) {
+        messagesQueue.add(e);
+      }
+    }
+  }
+}
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/TopicSubscriber.java b/src/main/java/com/gerritforge/gerrit/eventbroker/TopicSubscriber.java
new file mode 100644
index 0000000..afe09f2
--- /dev/null
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/TopicSubscriber.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import com.google.auto.value.AutoValue;
+import java.util.function.Consumer;
+
+@AutoValue
+public abstract class TopicSubscriber {
+  public static TopicSubscriber topicSubscriber(String topic, Consumer<EventMessage> consumer) {
+    return new AutoValue_TopicSubscriber(topic, consumer);
+  }
+
+  public abstract String topic();
+
+  public abstract Consumer<EventMessage> consumer();
+}
diff --git a/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java b/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
new file mode 100644
index 0000000..dd0edfa
--- /dev/null
+++ b/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
@@ -0,0 +1,258 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import static com.gerritforge.gerrit.eventbroker.TopicSubscriber.topicSubscriber;
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerApiTest {
+
+  @Captor ArgumentCaptor<EventMessage> eventCaptor;
+  Consumer<EventMessage> eventConsumer;
+
+  BrokerApi brokerApiUnderTest;
+  UUID instanceId = UUID.randomUUID();
+  private Gson gson = new Gson();
+
+  @Before
+  public void setup() {
+    brokerApiUnderTest = new InProcessBrokerApi();
+    eventConsumer = mockEventConsumer();
+  }
+
+  @Test
+  public void shouldSendEvent() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+
+    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+
+    assertThat(brokerApiUnderTest.send("topic", wrap(event))).isTrue();
+    compareWithExpectedEvent(eventConsumer, eventCaptor, event);
+  }
+
+  private EventMessage wrap(ProjectCreatedEvent event) {
+    return brokerApiUnderTest.newMessage(instanceId, event);
+  }
+
+  @Test
+  public void shouldRegisterConsumerPerTopic() {
+    Consumer<EventMessage> secondConsumer = mockEventConsumer();
+    ArgumentCaptor<EventMessage> secondArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
+
+    ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
+    ProjectCreatedEvent eventForTopic2 = testProjectCreatedEvent("Project name 2");
+
+    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+    brokerApiUnderTest.receiveAsync("topic2", secondConsumer);
+    brokerApiUnderTest.send("topic", wrap(eventForTopic));
+    brokerApiUnderTest.send("topic2", wrap(eventForTopic2));
+
+    compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
+    compareWithExpectedEvent(secondConsumer, secondArgCaptor, eventForTopic2);
+  }
+
+  @Test
+  public void shouldReturnMapOfConsumersPerTopic() {
+    Consumer<EventMessage> firstConsumerTopicA = mockEventConsumer();
+
+    Consumer<EventMessage> secondConsumerTopicA = mockEventConsumer();
+    Consumer<EventMessage> thirdConsumerTopicB = mockEventConsumer();
+
+    brokerApiUnderTest.receiveAsync("TopicA", firstConsumerTopicA);
+    brokerApiUnderTest.receiveAsync("TopicA", secondConsumerTopicA);
+    brokerApiUnderTest.receiveAsync("TopicB", thirdConsumerTopicB);
+
+    Set<TopicSubscriber> consumersMap = brokerApiUnderTest.topicSubscribers();
+
+    assertThat(consumersMap).isNotNull();
+    assertThat(consumersMap).isNotEmpty();
+    assertThat(consumersMap)
+        .containsExactly(
+            topicSubscriber("TopicA", firstConsumerTopicA),
+            topicSubscriber("TopicA", secondConsumerTopicA),
+            topicSubscriber("TopicB", thirdConsumerTopicB));
+  }
+
+  @Test
+  public void shouldDeliverEventToAllRegisteredConsumers() {
+    Consumer<EventMessage> secondConsumer = mockEventConsumer();
+    ArgumentCaptor<EventMessage> secondArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
+
+    ProjectCreatedEvent event = testProjectCreatedEvent("Project name");
+
+    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+    brokerApiUnderTest.receiveAsync("topic", secondConsumer);
+    brokerApiUnderTest.send("topic", wrap(event));
+
+    compareWithExpectedEvent(eventConsumer, eventCaptor, event);
+    compareWithExpectedEvent(secondConsumer, secondArgCaptor, event);
+  }
+
+  @Test
+  public void shouldReceiveEventsOnlyFromRegisteredTopic() {
+
+    ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
+
+    ProjectCreatedEvent eventForTopic2 = testProjectCreatedEvent("Project name 2");
+
+    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+    brokerApiUnderTest.send("topic", wrap(eventForTopic));
+    brokerApiUnderTest.send("topic2", wrap(eventForTopic2));
+
+    compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
+  }
+
+  @Test
+  public void shouldNotRegisterTheSameConsumerTwicePerTopic() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+
+    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+    brokerApiUnderTest.send("topic", wrap(event));
+
+    compareWithExpectedEvent(eventConsumer, eventCaptor, event);
+  }
+
+  @Test
+  public void shouldReconnectSubscribers() {
+    ArgumentCaptor<EventMessage> newConsumerArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
+
+    ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
+
+    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+    brokerApiUnderTest.send("topic", wrap(eventForTopic));
+
+    compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
+
+    Consumer<EventMessage> newConsumer = mockEventConsumer();
+
+    clearInvocations(eventConsumer);
+
+    brokerApiUnderTest.disconnect();
+    brokerApiUnderTest.receiveAsync("topic", newConsumer);
+    brokerApiUnderTest.send("topic", wrap(eventForTopic));
+
+    compareWithExpectedEvent(newConsumer, newConsumerArgCaptor, eventForTopic);
+    verify(eventConsumer, never()).accept(eventCaptor.capture());
+  }
+
+  @Test
+  public void shouldDisconnectSubscribers() {
+    ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
+
+    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+    brokerApiUnderTest.disconnect();
+
+    brokerApiUnderTest.send("topic", wrap(eventForTopic));
+
+    verify(eventConsumer, never()).accept(eventCaptor.capture());
+  }
+
+  @Test
+  public void shouldBeAbleToSwitchBrokerAndReconnectSubscribers() {
+    ArgumentCaptor<EventMessage> newConsumerArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
+
+    ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
+
+    BrokerApi secondaryBroker = new InProcessBrokerApi();
+    brokerApiUnderTest.disconnect();
+    secondaryBroker.receiveAsync("topic", eventConsumer);
+
+    clearInvocations(eventConsumer);
+
+    brokerApiUnderTest.send("topic", wrap(eventForTopic));
+    verify(eventConsumer, never()).accept(eventCaptor.capture());
+
+    clearInvocations(eventConsumer);
+    secondaryBroker.send("topic", wrap(eventForTopic));
+
+    compareWithExpectedEvent(eventConsumer, newConsumerArgCaptor, eventForTopic);
+  }
+
+  @Test
+  public void shouldReplayAllEvents() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+
+    brokerApiUnderTest.receiveAsync("topic", eventConsumer);
+
+    assertThat(brokerApiUnderTest.send("topic", wrap(event))).isTrue();
+
+    verify(eventConsumer, times(1)).accept(eventCaptor.capture());
+    compareWithExpectedEvent(eventConsumer, eventCaptor, event);
+    reset(eventConsumer);
+
+    brokerApiUnderTest.replayAllEvents("topic");
+    verify(eventConsumer, times(1)).accept(eventCaptor.capture());
+    compareWithExpectedEvent(eventConsumer, eventCaptor, event);
+  }
+
+  @Test
+  public void shouldSkipReplayAllEventsWhenTopicDoesNotExists() {
+    brokerApiUnderTest.replayAllEvents("unexistentTopic");
+    verify(eventConsumer, times(0)).accept(eventCaptor.capture());
+  }
+
+  private ProjectCreatedEvent testProjectCreatedEvent(String s) {
+    ProjectCreatedEvent eventForTopic = new ProjectCreatedEvent();
+    eventForTopic.projectName = s;
+    return eventForTopic;
+  }
+
+  private interface Subscriber extends Consumer<EventMessage> {
+
+    @Override
+    @Subscribe
+    void accept(EventMessage eventMessage);
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> Consumer<T> mockEventConsumer() {
+    return (Consumer<T>) Mockito.mock(Subscriber.class);
+  }
+
+  private void compareWithExpectedEvent(
+      Consumer<EventMessage> eventConsumer,
+      ArgumentCaptor<EventMessage> eventCaptor,
+      Event expectedEvent) {
+    verify(eventConsumer, times(1)).accept(eventCaptor.capture());
+    assertThat(eventCaptor.getValue().getEvent()).isEqualTo(expectedEvent);
+  }
+
+  private JsonObject eventToJson(Event event) {
+    return gson.toJsonTree(event).getAsJsonObject();
+  }
+}