Merge branch 'stable-3.3' into stable-3.4

* stable-3.3:
  Deserialize Event and EventMessage

Change-Id: I80ad1d967b164e2284cd2bcbc63190630fbbd4e2
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index bd4656f..4b24917 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -15,6 +15,6 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.3.2",
-        sha1 = "d8bcb77047cc12dd7c623b5b4de70a25499d3d6c",
+        artifact = "com.gerritforge:events-broker:3.4.0-rc2",
+        sha1 = "f72b4166e6d785fd1a41c997a4ffb14461dd7d87",
     )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index 1be52cb..2940b5e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -14,9 +14,9 @@
 
 package com.googlesource.gerrit.plugins.kafka;
 
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gson.Gson;
 import com.google.inject.AbstractModule;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
index 9a7c66a..7eabf2d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -17,6 +17,7 @@
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
@@ -42,7 +43,7 @@
   }
 
   @Override
-  public boolean send(String topic, EventMessage event) {
+  public ListenableFuture<Boolean> send(String topic, EventMessage event) {
     return publisher.publish(topic, event);
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
index cc271b5..7b8d480 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
@@ -16,6 +16,7 @@
 
 import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gson.Gson;
@@ -53,7 +54,7 @@
     }
   }
 
-  public boolean publish(String topic, EventMessage event) {
+  public ListenableFuture<Boolean> publish(String topic, EventMessage event) {
     return session.publish(topic, getPayload(event));
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index bb79cb5..0dc29e1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -14,10 +14,16 @@
 
 package com.googlesource.gerrit.plugins.kafka.session;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaEventsPublisherMetrics;
+import java.util.Objects;
 import java.util.concurrent.Future;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -80,34 +86,35 @@
     producer = null;
   }
 
-  public void publish(String messageBody) {
-    publish(properties.getTopic(), messageBody);
+  public ListenableFuture<Boolean> publish(String messageBody) {
+    return publish(properties.getTopic(), messageBody);
   }
 
-  public boolean publish(String topic, String messageBody) {
+  public ListenableFuture<Boolean> publish(String topic, String messageBody) {
     if (properties.isSendAsync()) {
       return publishAsync(topic, messageBody);
     }
     return publishSync(topic, messageBody);
   }
 
-  private boolean publishSync(String topic, String messageBody) {
-
+  private ListenableFuture<Boolean> publishSync(String topic, String messageBody) {
+    SettableFuture<Boolean> resultF = SettableFuture.create();
     try {
       Future<RecordMetadata> future =
           producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody));
       RecordMetadata metadata = future.get();
       LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
       publisherMetrics.incrementBrokerPublishedMessage();
-      return true;
+      resultF.set(true);
+      return resultF;
     } catch (Throwable e) {
       LOGGER.error("Cannot send the message", e);
       publisherMetrics.incrementBrokerFailedToPublishMessage();
-      return false;
+      return Futures.immediateFailedFuture(e);
     }
   }
 
-  private boolean publishAsync(String topic, String messageBody) {
+  private ListenableFuture<Boolean> publishAsync(String topic, String messageBody) {
     try {
       Future<RecordMetadata> future =
           producer.send(
@@ -121,11 +128,16 @@
                   publisherMetrics.incrementBrokerFailedToPublishMessage();
                 }
               });
-      return future != null;
+
+      // The transformation is lightweight, so we can afford using a directExecutor
+      return Futures.transform(
+          JdkFutureAdapters.listenInPoolThread(future),
+          Objects::nonNull,
+          MoreExecutors.directExecutor());
     } catch (Throwable e) {
       LOGGER.error("Cannot send the message", e);
       publisherMetrics.incrementBrokerFailedToPublishMessage();
-      return false;
+      return Futures.immediateFailedFuture(e);
     }
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 118a868..7ae9279 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -19,7 +19,6 @@
 import static org.junit.Assert.fail;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
 import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterables;
@@ -33,6 +32,7 @@
 import com.google.gerrit.extensions.common.ChangeMessageInfo;
 import com.google.gerrit.server.events.CommentAddedEvent;
 import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
 import com.google.gson.Gson;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 48350f9..75f3330 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -17,10 +17,10 @@
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.Mockito.mock;
 
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
 import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
 import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.util.IdGenerator;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
index 4074919..f5b6861 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
@@ -17,8 +17,8 @@
 import static com.google.common.truth.Truth.assertThat;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
 import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gson.Gson;
 import java.util.UUID;
 import org.junit.Before;