Initial stable-3.0 commit from kafka-events repo

Change-Id: I4b774850a16849456d5a1ca9558667dce76a1f85
diff --git a/BUILD b/BUILD
index ba9f702..c0eab0c 100644
--- a/BUILD
+++ b/BUILD
@@ -17,7 +17,8 @@
     ],
     resources = glob(["src/main/resources/**/*"]),
     deps = [
-        "@kafka_client//jar",
+        "@kafka-client//jar",
+        "@events-broker//jar",
     ],
 )
 
@@ -28,7 +29,8 @@
     deps = [
         ":events-kafka__plugin_test_deps",
         "//lib/testcontainers",
-        "@kafka_client//jar",
+        "@kafka-client//jar",
+        "@events-broker//jar",
         "@testcontainers-kafka//jar",
     ],
 )
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 197cbed..dbcf708 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -2,7 +2,7 @@
 
 def external_plugin_deps():
     maven_jar(
-        name = "kafka_client",
+        name = "kafka-client",
         artifact = "org.apache.kafka:kafka-clients:2.1.0",
         sha1 = "34d9983705c953b97abb01e1cd04647f47272fe5",
     )
@@ -12,3 +12,9 @@
         artifact = "org.testcontainers:kafka:1.15.0",
         sha1 = "d34760b11ab656e08b72c1e2e9b852f037a89f90",
     )
+
+    maven_jar(
+        name = "events-broker",
+        artifact = "com.gerritforge:events-broker:3.0.5",
+        sha1 = "7abf72d2252f975baff666fbbf28b7036767aa81",
+    )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
index b20b312..4dc394b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
@@ -14,28 +14,39 @@
 
 package com.googlesource.gerrit.plugins.kafka;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
+import java.util.Set;
 
 @Singleton
 public class Manager implements LifecycleListener {
 
   private final KafkaPublisher publisher;
+  private final Set<TopicSubscriber> consumers;
+  private final BrokerApi brokerApi;
 
   @Inject
-  public Manager(KafkaPublisher publisher) {
+  public Manager(KafkaPublisher publisher, Set<TopicSubscriber> consumers, BrokerApi brokerApi) {
     this.publisher = publisher;
+    this.consumers = consumers;
+    this.brokerApi = brokerApi;
   }
 
   @Override
   public void start() {
     publisher.start();
+    consumers.forEach(
+        topicSubscriber ->
+            brokerApi.receiveAsync(topicSubscriber.topic(), topicSubscriber.consumer()));
   }
 
   @Override
   public void stop() {
     publisher.stop();
+    brokerApi.disconnect();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index 7f219d3..1be52cb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -14,21 +14,38 @@
 
 package com.googlesource.gerrit.plugins.kafka;
 
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gson.Gson;
 import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.kafka.publish.GsonProvider;
+import com.google.inject.TypeLiteral;
+import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
+import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
+import org.apache.kafka.clients.producer.KafkaProducer;
 
 class Module extends AbstractModule {
 
+  private final KafkaApiModule kafkaBrokerModule;
+
+  @Inject
+  public Module(KafkaApiModule kafkaBrokerModule) {
+    this.kafkaBrokerModule = kafkaBrokerModule;
+  }
+
   @Override
   protected void configure() {
-    bind(Gson.class).toProvider(GsonProvider.class).in(Singleton.class);
+    bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class);
     DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
     DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
+
+    bind(new TypeLiteral<KafkaProducer<String, String>>() {})
+        .toProvider(KafkaProducerProvider.class);
+
+    install(kafkaBrokerModule);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
new file mode 100644
index 0000000..73c7509
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
@@ -0,0 +1,69 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.api;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.collect.Sets;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.google.inject.TypeLiteral;
+import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventDeserializer;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+
+@Singleton
+public class KafkaApiModule extends LifecycleModule {
+  private Set<TopicSubscriber> activeConsumers = Sets.newHashSet();
+  private WorkQueue workQueue;
+  private KafkaSubscriberProperties configuration;
+
+  @Inject
+  public KafkaApiModule(WorkQueue workQueue, KafkaSubscriberProperties configuration) {
+    this.workQueue = workQueue;
+    this.configuration = configuration;
+  }
+
+  @Inject(optional = true)
+  public void setPreviousBrokerApi(DynamicItem<BrokerApi> previousBrokerApi) {
+    if (previousBrokerApi != null && previousBrokerApi.get() != null) {
+      this.activeConsumers = previousBrokerApi.get().topicSubscribers();
+    }
+  }
+
+  @Override
+  protected void configure() {
+
+    bind(ExecutorService.class)
+        .annotatedWith(ConsumerExecutor.class)
+        .toInstance(
+            workQueue.createQueue(configuration.getNumberOfSubscribers(), "kafka-subscriber"));
+
+    bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
+    bind(new TypeLiteral<Deserializer<EventMessage>>() {}).to(KafkaEventDeserializer.class);
+    bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
+
+    DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
new file mode 100644
index 0000000..9a7c66a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -0,0 +1,79 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.api;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
+import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class KafkaBrokerApi implements BrokerApi {
+
+  private final KafkaPublisher publisher;
+  private final Provider<KafkaEventSubscriber> subscriberProvider;
+  private List<KafkaEventSubscriber> subscribers;
+
+  @Inject
+  public KafkaBrokerApi(
+      KafkaPublisher publisher, Provider<KafkaEventSubscriber> subscriberProvider) {
+    this.publisher = publisher;
+    this.subscriberProvider = subscriberProvider;
+    subscribers = new ArrayList<>();
+  }
+
+  @Override
+  public boolean send(String topic, EventMessage event) {
+    return publisher.publish(topic, event);
+  }
+
+  @Override
+  public void receiveAsync(String topic, Consumer<EventMessage> eventConsumer) {
+    KafkaEventSubscriber subscriber = subscriberProvider.get();
+    synchronized (subscribers) {
+      subscribers.add(subscriber);
+    }
+    subscriber.subscribe(topic, eventConsumer);
+  }
+
+  @Override
+  public void disconnect() {
+    for (KafkaEventSubscriber subscriber : subscribers) {
+      subscriber.shutdown();
+    }
+    subscribers.clear();
+  }
+
+  @Override
+  public Set<TopicSubscriber> topicSubscribers() {
+    return subscribers.stream()
+        .map(s -> TopicSubscriber.topicSubscriber(s.getTopic(), s.getMessageProcessor()))
+        .collect(Collectors.toSet());
+  }
+
+  @Override
+  public void replayAllEvents(String topic) {
+    subscribers.stream()
+        .filter(subscriber -> topic.equals(subscriber.getTopic()))
+        .forEach(subscriber -> subscriber.resetOffset());
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/broker/ConsumerExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/broker/ConsumerExecutor.java
new file mode 100644
index 0000000..eb878fa
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/broker/ConsumerExecutor.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.broker;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+public @interface ConsumerExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index c01e34e..72d7f91 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.kafka.config;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Strings;
 import com.google.gerrit.extensions.annotations.PluginName;
@@ -33,6 +34,7 @@
   public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
 
   private final String topic;
+  private final boolean sendAsync;
 
   @Inject
   public KafkaProperties(PluginConfigFactory configFactory, @PluginName String pluginName) {
@@ -40,10 +42,20 @@
     setDefaults();
     PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
     topic = fromGerritConfig.getString("topic", "gerrit");
+    sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
     applyConfig(fromGerritConfig);
     initDockerizedKafkaServer();
   }
 
+  @VisibleForTesting
+  public KafkaProperties(boolean sendAsync) {
+    super();
+    setDefaults();
+    topic = "gerrit";
+    this.sendAsync = sendAsync;
+    initDockerizedKafkaServer();
+  }
+
   private void setDefaults() {
     put("acks", "all");
     put("retries", 0);
@@ -79,4 +91,12 @@
   public String getTopic() {
     return topic;
   }
+
+  public boolean isSendAsync() {
+    return sendAsync;
+  }
+
+  public String getBootstrapServers() {
+    return getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaPublisherProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaPublisherProperties.java
new file mode 100644
index 0000000..17f3ebd
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaPublisherProperties.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.config;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import com.google.inject.Inject;
+
+public class KafkaPublisherProperties extends KafkaProperties {
+  private static final long serialVersionUID = 0L;
+
+  @Inject
+  public KafkaPublisherProperties(
+      PluginConfigFactory configFactory, @PluginName String pluginName) {
+    super(configFactory, pluginName);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
new file mode 100644
index 0000000..52d4726
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
@@ -0,0 +1,64 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.config;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class KafkaSubscriberProperties extends KafkaProperties {
+  private static final long serialVersionUID = 1L;
+  private static final String DEFAULT_POLLING_INTERVAL_MS = "1000";
+  private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
+
+  private final Integer pollingInterval;
+  private final String groupId;
+  private final Integer numberOfSubscribers;
+
+  @Inject
+  public KafkaSubscriberProperties(
+      PluginConfigFactory configFactory, @PluginName String pluginName) {
+    super(configFactory, pluginName);
+
+    this.pollingInterval =
+        Integer.parseInt(getProperty("polling.interval.ms", DEFAULT_POLLING_INTERVAL_MS));
+    this.groupId = getProperty("group.id");
+    this.numberOfSubscribers =
+        Integer.parseInt(getProperty("number.of.subscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS));
+  }
+
+  @VisibleForTesting
+  public KafkaSubscriberProperties(int pollingInterval, String groupId, int numberOfSubscribers) {
+    super(true);
+    this.pollingInterval = pollingInterval;
+    this.groupId = groupId;
+    this.numberOfSubscribers = numberOfSubscribers;
+  }
+
+  public Integer getPollingInterval() {
+    return pollingInterval;
+  }
+
+  public String getGroupId() {
+    return groupId;
+  }
+
+  public Integer getNumberOfSubscribers() {
+    return numberOfSubscribers;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaEventsPublisherMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaEventsPublisherMetrics.java
new file mode 100644
index 0000000..083d032
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaEventsPublisherMetrics.java
@@ -0,0 +1,58 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.publish;
+
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class KafkaEventsPublisherMetrics {
+  private static final String PUBLISHER_SUCCESS_COUNTER = "broker_msg_publisher_success_counter";
+  private static final String PUBLISHER_FAILURE_COUNTER = "broker_msg_publisher_failure_counter";
+
+  private final Counter1<String> brokerPublisherSuccessCounter;
+  private final Counter1<String> brokerPublisherFailureCounter;
+
+  @Inject
+  public KafkaEventsPublisherMetrics(MetricMaker metricMaker) {
+
+    this.brokerPublisherSuccessCounter =
+        metricMaker.newCounter(
+            "kafka/broker/broker_message_publisher_counter",
+            new Description("Number of successfully published messages by the broker publisher")
+                .setRate()
+                .setUnit("messages"),
+            Field.ofString(PUBLISHER_SUCCESS_COUNTER, "Broker message published count"));
+    this.brokerPublisherFailureCounter =
+        metricMaker.newCounter(
+            "kafka/broker/broker_message_publisher_failure_counter",
+            new Description("Number of messages failed to publish by the broker publisher")
+                .setRate()
+                .setUnit("errors"),
+            Field.ofString(PUBLISHER_FAILURE_COUNTER, "Broker failed to publish message count"));
+  }
+
+  public void incrementBrokerPublishedMessage() {
+    brokerPublisherSuccessCounter.increment(PUBLISHER_SUCCESS_COUNTER);
+  }
+
+  public void incrementBrokerFailedToPublishMessage() {
+    brokerPublisherFailureCounter.increment(PUBLISHER_FAILURE_COUNTER);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
index 21b89ce..cc271b5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
@@ -14,9 +14,12 @@
 
 package com.googlesource.gerrit.plugins.kafka.publish;
 
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gson.Gson;
+import com.google.gson.JsonObject;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
@@ -49,4 +52,17 @@
       session.publish(gson.toJson(event));
     }
   }
+
+  public boolean publish(String topic, EventMessage event) {
+    return session.publish(topic, getPayload(event));
+  }
+
+  private String getPayload(EventMessage event) {
+    return gson.toJson(event);
+  }
+
+  @VisibleForTesting
+  public JsonObject eventToJson(Event event) {
+    return gson.toJsonTree(event).getAsJsonObject();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java
new file mode 100644
index 0000000..b1f11f7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java
@@ -0,0 +1,34 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.session;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+public class KafkaProducerProvider implements Provider<KafkaProducer<String, String>> {
+  private final KafkaProperties properties;
+
+  @Inject
+  public KafkaProducerProvider(KafkaProperties properties) {
+    this.properties = properties;
+  }
+
+  @Override
+  public KafkaProducer<String, String> get() {
+    return new KafkaProducer<>(properties);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index fcb5263..bb79cb5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -15,10 +15,14 @@
 package com.googlesource.gerrit.plugins.kafka.session;
 
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import com.googlesource.gerrit.plugins.kafka.publish.KafkaEventsPublisherMetrics;
+import java.util.concurrent.Future;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -26,11 +30,18 @@
 
   private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSession.class);
   private final KafkaProperties properties;
+  private final Provider<KafkaProducer<String, String>> producerProvider;
+  private final KafkaEventsPublisherMetrics publisherMetrics;
   private volatile Producer<String, String> producer;
 
   @Inject
-  public KafkaSession(KafkaProperties properties) {
+  public KafkaSession(
+      Provider<KafkaProducer<String, String>> producerProvider,
+      KafkaProperties properties,
+      KafkaEventsPublisherMetrics publisherMetrics) {
+    this.producerProvider = producerProvider;
     this.properties = properties;
+    this.publisherMetrics = publisherMetrics;
   }
 
   public boolean isOpen() {
@@ -52,7 +63,7 @@
      * ClassNotFoundExceptions
      */
     setConnectionClassLoader();
-    producer = new KafkaProducer<>(properties);
+    producer = producerProvider.get();
     LOGGER.info("Connection established.");
   }
 
@@ -70,6 +81,51 @@
   }
 
   public void publish(String messageBody) {
-    producer.send(new ProducerRecord<>(properties.getTopic(), "" + System.nanoTime(), messageBody));
+    publish(properties.getTopic(), messageBody);
+  }
+
+  public boolean publish(String topic, String messageBody) {
+    if (properties.isSendAsync()) {
+      return publishAsync(topic, messageBody);
+    }
+    return publishSync(topic, messageBody);
+  }
+
+  private boolean publishSync(String topic, String messageBody) {
+
+    try {
+      Future<RecordMetadata> future =
+          producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody));
+      RecordMetadata metadata = future.get();
+      LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
+      publisherMetrics.incrementBrokerPublishedMessage();
+      return true;
+    } catch (Throwable e) {
+      LOGGER.error("Cannot send the message", e);
+      publisherMetrics.incrementBrokerFailedToPublishMessage();
+      return false;
+    }
+  }
+
+  private boolean publishAsync(String topic, String messageBody) {
+    try {
+      Future<RecordMetadata> future =
+          producer.send(
+              new ProducerRecord<>(topic, Long.toString(System.nanoTime()), messageBody),
+              (metadata, e) -> {
+                if (metadata != null && e == null) {
+                  LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
+                  publisherMetrics.incrementBrokerPublishedMessage();
+                } else {
+                  LOGGER.error("Cannot send the message", e);
+                  publisherMetrics.incrementBrokerFailedToPublishMessage();
+                }
+              });
+      return future != null;
+    } catch (Throwable e) {
+      LOGGER.error("Cannot send the message", e);
+      publisherMetrics.incrementBrokerFailedToPublishMessage();
+      return false;
+    }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaConsumerFactory.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaConsumerFactory.java
new file mode 100644
index 0000000..9ec109d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaConsumerFactory.java
@@ -0,0 +1,37 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.subscribe;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+
+@Singleton
+class KafkaConsumerFactory {
+  private KafkaSubscriberProperties config;
+
+  @Inject
+  public KafkaConsumerFactory(KafkaSubscriberProperties configuration) {
+    this.config = configuration;
+  }
+
+  public Consumer<byte[], byte[]> create(Deserializer<byte[]> keyDeserializer) {
+    return new KafkaConsumer<>(config, keyDeserializer, new ByteArrayDeserializer());
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
new file mode 100644
index 0000000..bab2ad0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
@@ -0,0 +1,54 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.subscribe;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+@Singleton
+public class KafkaEventDeserializer implements Deserializer<EventMessage> {
+
+  private final StringDeserializer stringDeserializer = new StringDeserializer();
+  private Gson gson;
+
+  // To be used when providing this deserializer with class name (then need to add a configuration
+  // entry to set the gson.provider
+  public KafkaEventDeserializer() {}
+
+  @Inject
+  public KafkaEventDeserializer(Gson gson) {
+    this.gson = gson;
+  }
+
+  @Override
+  public void configure(Map<String, ?> configs, boolean isKey) {}
+
+  @Override
+  public EventMessage deserialize(String topic, byte[] data) {
+    final EventMessage result =
+        gson.fromJson(stringDeserializer.deserialize(topic, data), EventMessage.class);
+    result.validate();
+
+    return result;
+  }
+
+  @Override
+  public void close() {}
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
new file mode 100644
index 0000000..40415a4
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -0,0 +1,180 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.kafka.subscribe;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+public class KafkaEventSubscriber {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int DELAY_RECONNECT_AFTER_FAILURE_MSEC = 1000;
+
+  private final OneOffRequestContext oneOffCtx;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  private final Deserializer<EventMessage> valueDeserializer;
+  private final KafkaSubscriberProperties configuration;
+  private final ExecutorService executor;
+  private final KafkaEventSubscriberMetrics subscriberMetrics;
+  private final KafkaConsumerFactory consumerFactory;
+  private final Deserializer<byte[]> keyDeserializer;
+
+  private java.util.function.Consumer<EventMessage> messageProcessor;
+  private String topic;
+  private AtomicBoolean resetOffset = new AtomicBoolean(false);
+
+  private volatile ReceiverJob receiver;
+
+  @Inject
+  public KafkaEventSubscriber(
+      KafkaSubscriberProperties configuration,
+      KafkaConsumerFactory consumerFactory,
+      Deserializer<byte[]> keyDeserializer,
+      Deserializer<EventMessage> valueDeserializer,
+      OneOffRequestContext oneOffCtx,
+      @ConsumerExecutor ExecutorService executor,
+      KafkaEventSubscriberMetrics subscriberMetrics) {
+
+    this.configuration = configuration;
+    this.oneOffCtx = oneOffCtx;
+    this.executor = executor;
+    this.subscriberMetrics = subscriberMetrics;
+    this.consumerFactory = consumerFactory;
+    this.keyDeserializer = keyDeserializer;
+    this.valueDeserializer = valueDeserializer;
+  }
+
+  public void subscribe(String topic, java.util.function.Consumer<EventMessage> messageProcessor) {
+    this.topic = topic;
+    this.messageProcessor = messageProcessor;
+    logger.atInfo().log(
+        "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, topic);
+    runReceiver();
+  }
+
+  private void runReceiver() {
+    final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+    try {
+      Thread.currentThread().setContextClassLoader(KafkaEventSubscriber.class.getClassLoader());
+      Consumer<byte[], byte[]> consumer = consumerFactory.create(keyDeserializer);
+      consumer.subscribe(Collections.singleton(topic));
+      receiver = new ReceiverJob(consumer);
+      executor.execute(receiver);
+    } finally {
+      Thread.currentThread().setContextClassLoader(previousClassLoader);
+    }
+  }
+
+  public void shutdown() {
+    closed.set(true);
+    receiver.wakeup();
+  }
+
+  public java.util.function.Consumer<EventMessage> getMessageProcessor() {
+    return messageProcessor;
+  }
+
+  public String getTopic() {
+    return topic;
+  }
+
+  public void resetOffset() {
+    resetOffset.set(true);
+  }
+
+  private class ReceiverJob implements Runnable {
+    private final Consumer<byte[], byte[]> consumer;
+
+    public ReceiverJob(Consumer<byte[], byte[]> consumer) {
+      this.consumer = consumer;
+    }
+
+    public void wakeup() {
+      consumer.wakeup();
+    }
+
+    @Override
+    public void run() {
+      try {
+        consume();
+      } catch (Exception e) {
+        logger.atSevere().withCause(e).log("Consumer loop of topic %s ended", topic);
+      }
+    }
+
+    private void consume() throws InterruptedException {
+      try {
+        while (!closed.get()) {
+          if (resetOffset.getAndSet(false)) {
+            consumer.seekToBeginning(consumer.assignment());
+          }
+          ConsumerRecords<byte[], byte[]> consumerRecords =
+              consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
+          consumerRecords.forEach(
+              consumerRecord -> {
+                try (ManualRequestContext ctx = oneOffCtx.open()) {
+                  EventMessage event =
+                      valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
+                  messageProcessor.accept(event);
+                } catch (Exception e) {
+                  logger.atSevere().withCause(e).log(
+                      "Malformed event '%s': [Exception: %s]",
+                      new String(consumerRecord.value(), UTF_8));
+                  subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
+                }
+              });
+        }
+      } catch (WakeupException e) {
+        // Ignore exception if closing
+        if (!closed.get()) {
+          logger.atSevere().withCause(e).log("Consumer loop of topic %s interrupted", topic);
+          reconnectAfterFailure();
+        }
+      } catch (Exception e) {
+        subscriberMetrics.incrementSubscriberFailedToPollMessages();
+        logger.atSevere().withCause(e).log(
+            "Existing consumer loop of topic %s because of a non-recoverable exception", topic);
+        reconnectAfterFailure();
+      } finally {
+        consumer.close();
+      }
+    }
+
+    private void reconnectAfterFailure() throws InterruptedException {
+      // Random delay with average of DELAY_RECONNECT_AFTER_FAILURE_MSEC
+      // for avoiding hammering exactly at the same interval in case of failure
+      long reconnectDelay =
+          DELAY_RECONNECT_AFTER_FAILURE_MSEC / 2
+              + new Random().nextInt(DELAY_RECONNECT_AFTER_FAILURE_MSEC);
+      Thread.sleep(reconnectDelay);
+      runReceiver();
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
new file mode 100644
index 0000000..81174f9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
@@ -0,0 +1,48 @@
+package com.googlesource.gerrit.plugins.kafka.subscribe;
+
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+class KafkaEventSubscriberMetrics {
+
+  private static final String SUBSCRIBER_POLL_FAILURE_COUNTER =
+      "subscriber_msg_consumer_poll_failure_counter";
+  private static final String SUBSCRIBER_FAILURE_COUNTER =
+      "subscriber_msg_consumer_failure_counter";
+
+  private final Counter1<String> subscriberPollFailureCounter;
+  private final Counter1<String> subscriberFailureCounter;
+
+  @Inject
+  public KafkaEventSubscriberMetrics(MetricMaker metricMaker) {
+    this.subscriberPollFailureCounter =
+        metricMaker.newCounter(
+            "kafka/subscriber/subscriber_message_consumer_poll_failure_counter",
+            new Description("Number of failed attempts to poll messages by the subscriber")
+                .setRate()
+                .setUnit("errors"),
+            Field.ofString(
+                SUBSCRIBER_POLL_FAILURE_COUNTER, "Subscriber failed to poll messages count"));
+    this.subscriberFailureCounter =
+        metricMaker.newCounter(
+            "kafka/subscriber/subscriber_message_consumer_failure_counter",
+            new Description("Number of messages failed to consume by the subscriber consumer")
+                .setRate()
+                .setUnit("errors"),
+            Field.ofString(
+                SUBSCRIBER_FAILURE_COUNTER, "Subscriber failed to consume messages count"));
+  }
+
+  public void incrementSubscriberFailedToPollMessages() {
+    subscriberPollFailureCounter.increment(SUBSCRIBER_POLL_FAILURE_COUNTER);
+  }
+
+  public void incrementSubscriberFailedToConsumeMessage() {
+    subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
+  }
+}
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 685c195..c6dc18e 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -1 +1,27 @@
-This plugin can publish gerrit stream events to an Apache Kafka topic.
+This plugin publishes gerrit stream events to an Apache Kafka topic.
+
+It also provides a Kafka-based implementation of a generic
+[Events Broker Api](https://github.com/GerritForge/events-broker) which can be used by
+Gerrit and other plugins.
+
+Use-cases
+=========
+
+CI/CD Validation
+----------------
+
+Gerrit stream events can be published to the internal network where other subscribers
+can trigger automated jobs (e.g. CI/CD validation) for fetching the changes and validating
+them through build and testing.
+
+__NOTE__: This use-case would require a CI/CD system (e.g. Jenkins, Zuul or other) and
+the development of a Kafka-based subscriber to receive the event and trigger the build.
+
+Events replication
+------------------
+
+Multiple Gerrit masters in a multi-site setup can be informed on the stream events
+happening on every node thanks to the notification to a Kafka pub/sub topic.
+
+__NOTE__: This use-case would require the [multi-site plugin](https://gerrit.googlesource.com/plugins/multi-site)
+on each of the Gerrit masters that are part of the same multi-site cluster.
\ No newline at end of file
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 4db7b38..727b7e5 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -7,7 +7,7 @@
 ---------------------
 
 ```
-[plugin "events-kafka"]
+[plugin "@PLUGIN@"]
         bootstrapServers = localhost:9092
 ```
 
@@ -31,4 +31,20 @@
 | lingerMs            | 1
 | bufferMemory        | 33554432
 | keySerializer       | org.apache.kafka.common.serialization.StringSerializer
-| valueSerializer     | org.apache.kafka.common.serialization.StringSerializer
\ No newline at end of file
+| valueSerializer     | org.apache.kafka.common.serialization.StringSerializer
+
+Additional properties
+---------------------
+
+`plugin.@PLUGIN@.groupId`
+:	Kafka consumer group for receiving messages.
+	Default: Gerrit instance-id
+
+`plugin.@PLUGIN@.pollingIntervalMs`
+:	Polling interval in msec for receiving messages from Kafka topic subscription.
+	Default: 1000
+
+`plugin.@PLUGIN@.sendAsync`
+:	Send messages to Kafka asynchronously, detaching the calling process from the
+	acknowledge of the message being sent.
+	Default: true
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 0cafbdd..fbae0c6 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -16,7 +16,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 
-import com.google.common.base.Supplier;
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
 import com.google.common.collect.Iterables;
 import com.google.gerrit.acceptance.GerritConfig;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
@@ -28,10 +28,7 @@
 import com.google.gerrit.extensions.common.ChangeMessageInfo;
 import com.google.gerrit.server.events.CommentAddedEvent;
 import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventDeserializer;
-import com.google.gerrit.server.events.SupplierDeserializer;
 import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -104,29 +101,18 @@
       }
     }
 
-    // TODO(davido): Remove special ReviewDb case when it is killed
-    // In ReviewDb case 3 events are received in the following order:
-    // 1. refUpdate:          ref: refs/changes/01/1/1
-    // 2. patchset-created:   ref: refs/changes/01/1/1
-    // 3. comment-added:      ref: refs/heads/master
-    int numberOfEvents = 3;
-    if (notesMigration.commitChangeWrites()) {
-      // In NoteDb case the 4 events are received in the following order:
-      // 1. refUpdate:        ref: refs/sequences/changes
-      // 2. refUpdate:        ref: refs/changes/01/1/1
-      // 3. patchset-created: ref: refs/changes/01/1/1
-      // 4. comment-added:    ref: refs/heads/master
-      numberOfEvents = 4;
-    }
+    // There are 6 events are received in the following order:
+    // 1. refUpdate:        ref: refs/sequences/changes
+    // 2. refUpdate:        ref: refs/changes/01/1/1
+    // 3. refUpdate:        ref: refs/changes/01/1/meta
+    // 4. patchset-created: ref: refs/changes/01/1/1
+    // 5. refUpdate:        ref: refs/changes/01/1/meta
+    // 6. comment-added:    ref: refs/heads/master
 
-    assertThat(events).hasSize(numberOfEvents);
+    assertThat(events).hasSize(6);
     String commentAddedEventJson = Iterables.getLast(events);
 
-    Gson gson =
-        new GsonBuilder()
-            .registerTypeAdapter(Event.class, new EventDeserializer())
-            .registerTypeAdapter(Supplier.class, new SupplierDeserializer())
-            .create();
+    Gson gson = new EventGsonProvider().get();
     Event event = gson.fromJson(commentAddedEventJson, Event.class);
     assertThat(event).isInstanceOf(CommentAddedEvent.class);
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
new file mode 100644
index 0000000..eabc833
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -0,0 +1,216 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.api;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.util.IdGenerator;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gson.Gson;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.google.inject.TypeLiteral;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
+import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.testcontainers.containers.KafkaContainer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaBrokerApiTest {
+  private static KafkaContainer kafka;
+
+  private static final int TEST_NUM_SUBSCRIBERS = 1;
+  private static final String TEST_GROUP_ID = KafkaBrokerApiTest.class.getName();
+  private static final int TEST_POLLING_INTERVAL_MSEC = 100;
+  private static final int TEST_THREAD_POOL_SIZE = 10;
+  private static final UUID TEST_INSTANCE_ID = UUID.randomUUID();
+  private static final TimeUnit TEST_TIMOUT_UNIT = TimeUnit.SECONDS;
+  private static final int TEST_TIMEOUT = 30;
+
+  private Injector injector;
+  private KafkaSession session;
+  private Gson gson;
+
+  public static class TestKafkaContainer extends KafkaContainer {
+    public TestKafkaContainer() {
+      addFixedExposedPort(KAFKA_PORT, KAFKA_PORT);
+      addFixedExposedPort(ZOOKEEPER_PORT, ZOOKEEPER_PORT);
+    }
+
+    @Override
+    public String getBootstrapServers() {
+      return String.format("PLAINTEXT://%s:%s", getContainerIpAddress(), KAFKA_PORT);
+    }
+  }
+
+  public static class TestWorkQueue extends WorkQueue {
+
+    @Inject
+    public TestWorkQueue(IdGenerator idGenerator, MetricMaker metrics) {
+      super(idGenerator, TEST_THREAD_POOL_SIZE, metrics);
+    }
+  }
+
+  public static class TestModule extends AbstractModule {
+    private KafkaProperties kafkaProperties;
+
+    public TestModule(KafkaProperties kafkaProperties) {
+      this.kafkaProperties = kafkaProperties;
+    }
+
+    @Override
+    protected void configure() {
+      bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class);
+      bind(MetricMaker.class).toInstance(mock(MetricMaker.class, Answers.RETURNS_DEEP_STUBS));
+      bind(OneOffRequestContext.class)
+          .toInstance(mock(OneOffRequestContext.class, Answers.RETURNS_DEEP_STUBS));
+
+      bind(KafkaProperties.class).toInstance(kafkaProperties);
+      bind(KafkaSession.class).in(Scopes.SINGLETON);
+      KafkaSubscriberProperties kafkaSubscriberProperties =
+          new KafkaSubscriberProperties(
+              TEST_POLLING_INTERVAL_MSEC, TEST_GROUP_ID, TEST_NUM_SUBSCRIBERS);
+      bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
+      bind(new TypeLiteral<KafkaProducer<String, String>>() {})
+          .toProvider(KafkaProducerProvider.class);
+
+      bind(WorkQueue.class).to(TestWorkQueue.class);
+    }
+  }
+
+  public static class TestConsumer implements Consumer<EventMessage> {
+    public final List<EventMessage> messages = new ArrayList<>();
+    private final CountDownLatch lock;
+
+    public TestConsumer(int numMessagesExpected) {
+      lock = new CountDownLatch(numMessagesExpected);
+    }
+
+    @Override
+    public void accept(EventMessage message) {
+      messages.add(message);
+      lock.countDown();
+    }
+
+    public boolean await() {
+      try {
+        return lock.await(TEST_TIMEOUT, TEST_TIMOUT_UNIT);
+      } catch (InterruptedException e) {
+        return false;
+      }
+    }
+  }
+
+  public static class TestHeader extends Header {
+
+    public TestHeader() {
+      super(UUID.randomUUID(), TEST_INSTANCE_ID);
+    }
+  }
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    kafka = new TestKafkaContainer();
+    kafka.start();
+    System.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (kafka != null) {
+      kafka.stop();
+    }
+  }
+
+  public void connectToKafka(KafkaProperties kafkaProperties) {
+    Injector baseInjector = Guice.createInjector(new TestModule(kafkaProperties));
+    WorkQueue testWorkQueue = baseInjector.getInstance(WorkQueue.class);
+    KafkaSubscriberProperties kafkaSubscriberProperties =
+        baseInjector.getInstance(KafkaSubscriberProperties.class);
+    injector =
+        baseInjector.createChildInjector(
+            new KafkaApiModule(testWorkQueue, kafkaSubscriberProperties));
+    session = injector.getInstance(KafkaSession.class);
+    gson = injector.getInstance(Gson.class);
+
+    session.connect();
+  }
+
+  @After
+  public void teardown() {
+    if (session != null) {
+      session.disconnect();
+    }
+  }
+
+  @Test
+  public void shouldSendSyncAndReceiveToTopic() {
+    connectToKafka(new KafkaProperties(false));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = "test_topic_sync";
+    TestConsumer testConsumer = new TestConsumer(1);
+    EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent());
+
+    kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
+    kafkaBrokerApi.send(testTopic, testEventMessage);
+
+    assertThat(testConsumer.await()).isTrue();
+    assertThat(testConsumer.messages).hasSize(1);
+    assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
+  }
+
+  @Test
+  public void shouldSendAsyncAndReceiveToTopic() {
+    connectToKafka(new KafkaProperties(true));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = "test_topic_async";
+    TestConsumer testConsumer = new TestConsumer(1);
+    EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent());
+
+    kafkaBrokerApi.send(testTopic, testEventMessage);
+    kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
+
+    assertThat(testConsumer.await()).isTrue();
+    assertThat(testConsumer.messages).hasSize(1);
+    assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
new file mode 100644
index 0000000..5aa9ca8
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
@@ -0,0 +1,127 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.publish;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.util.concurrent.Futures;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
+import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSessionTest {
+  KafkaSession objectUnderTest;
+  @Mock KafkaProducer<String, String> kafkaProducer;
+  @Mock KafkaProducerProvider producerProvider;
+  @Mock KafkaProperties properties;
+  @Mock KafkaEventsPublisherMetrics publisherMetrics;
+  @Captor ArgumentCaptor<Callback> callbackCaptor;
+
+  RecordMetadata recordMetadata;
+  String message = "sample_message";
+  private String topic = "index";
+
+  @Before
+  public void setUp() {
+    when(producerProvider.get()).thenReturn(kafkaProducer);
+    when(properties.getTopic()).thenReturn(topic);
+
+    recordMetadata = new RecordMetadata(new TopicPartition(topic, 0), 0L, 0L, 0L, 0L, 0, 0);
+
+    objectUnderTest = new KafkaSession(producerProvider, properties, publisherMetrics);
+    objectUnderTest.connect();
+  }
+
+  @Test
+  public void shouldIncrementBrokerMetricCounterWhenMessagePublishedInSyncMode() {
+    when(properties.isSendAsync()).thenReturn(false);
+    when(kafkaProducer.send(any())).thenReturn(Futures.immediateFuture(recordMetadata));
+    objectUnderTest.publish(message);
+    verify(publisherMetrics, only()).incrementBrokerPublishedMessage();
+  }
+
+  @Test
+  public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInSyncMode() {
+    when(properties.isSendAsync()).thenReturn(false);
+    when(kafkaProducer.send(any())).thenReturn(Futures.immediateFailedFuture(new Exception()));
+    objectUnderTest.publish(message);
+    verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
+  }
+
+  @Test
+  public void shouldIncrementBrokerFailedMetricCounterWhenUnexpectedExceptionInSyncMode() {
+    when(properties.isSendAsync()).thenReturn(false);
+    when(kafkaProducer.send(any())).thenThrow(new RuntimeException("Unexpected runtime exception"));
+    try {
+      objectUnderTest.publish(message);
+    } catch (RuntimeException e) {
+      // expected
+    }
+    verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
+  }
+
+  @Test
+  public void shouldIncrementBrokerMetricCounterWhenMessagePublishedInAsyncMode() {
+    when(properties.isSendAsync()).thenReturn(true);
+    when(kafkaProducer.send(any(), any())).thenReturn(Futures.immediateFuture(recordMetadata));
+
+    objectUnderTest.publish(message);
+
+    verify(kafkaProducer).send(any(), callbackCaptor.capture());
+    callbackCaptor.getValue().onCompletion(recordMetadata, null);
+    verify(publisherMetrics, only()).incrementBrokerPublishedMessage();
+  }
+
+  @Test
+  public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInAsyncMode() {
+    when(properties.isSendAsync()).thenReturn(true);
+    when(kafkaProducer.send(any(), any()))
+        .thenReturn(Futures.immediateFailedFuture(new Exception()));
+
+    objectUnderTest.publish(message);
+
+    verify(kafkaProducer).send(any(), callbackCaptor.capture());
+    callbackCaptor.getValue().onCompletion(null, new Exception());
+    verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
+  }
+
+  @Test
+  public void shouldIncrementBrokerFailedMetricCounterWhenUnexpectedExceptionInAsyncMode() {
+    when(properties.isSendAsync()).thenReturn(true);
+    when(kafkaProducer.send(any(), any()))
+        .thenThrow(new RuntimeException("Unexpected runtime exception"));
+    try {
+      objectUnderTest.publish(message);
+    } catch (RuntimeException e) {
+      // expected
+    }
+    verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
new file mode 100644
index 0000000..e456a2a
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
@@ -0,0 +1,64 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.subscribe;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gson.Gson;
+import java.util.UUID;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KafkaEventDeserializerTest {
+  private KafkaEventDeserializer deserializer;
+
+  @Before
+  public void setUp() {
+    final Gson gson = new EventGsonProvider().get();
+    deserializer = new KafkaEventDeserializer(gson);
+  }
+
+  @Test
+  public void kafkaEventDeserializerShouldParseAKafkaEvent() {
+    final UUID eventId = UUID.randomUUID();
+    final String eventType = "event-type";
+    final UUID sourceInstanceId = UUID.randomUUID();
+    final long eventCreatedOn = 10L;
+    final String eventJson =
+        String.format(
+            "{ "
+                + "\"header\": { \"eventId\": \"%s\", \"eventType\": \"%s\", \"sourceInstanceId\": \"%s\", \"eventCreatedOn\": %d },"
+                + "\"body\": { \"type\": \"project-created\" }"
+                + "}",
+            eventId, eventType, sourceInstanceId, eventCreatedOn);
+    final EventMessage event = deserializer.deserialize("ignored", eventJson.getBytes(UTF_8));
+
+    assertThat(event.getHeader().eventId).isEqualTo(eventId);
+    assertThat(event.getHeader().sourceInstanceId).isEqualTo(sourceInstanceId);
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void kafkaEventDeserializerShouldFailForInvalidJson() {
+    deserializer.deserialize("ignored", "this is not a JSON string".getBytes(UTF_8));
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void kafkaEventDeserializerShouldFailForInvalidObjectButValidJSON() {
+    deserializer.deserialize("ignored", "{}".getBytes(UTF_8));
+  }
+}