Implement async send method as per 3.4.0-rc2 API

Bug: Issue 14411
Change-Id: If44260c12d924ba62a9c066058f2b7aa9fc93d69
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index e059b34..4b24917 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -15,6 +15,6 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.4.0-rc0",
-        sha1 = "8c34c88103d4783eb4c4decde6d93541bc1cf064",
+        artifact = "com.gerritforge:events-broker:3.4.0-rc2",
+        sha1 = "f72b4166e6d785fd1a41c997a4ffb14461dd7d87",
     )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
index 9a7c66a..7eabf2d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -17,6 +17,7 @@
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
@@ -42,7 +43,7 @@
   }
 
   @Override
-  public boolean send(String topic, EventMessage event) {
+  public ListenableFuture<Boolean> send(String topic, EventMessage event) {
     return publisher.publish(topic, event);
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
index cc271b5..7b8d480 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
@@ -16,6 +16,7 @@
 
 import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gson.Gson;
@@ -53,7 +54,7 @@
     }
   }
 
-  public boolean publish(String topic, EventMessage event) {
+  public ListenableFuture<Boolean> publish(String topic, EventMessage event) {
     return session.publish(topic, getPayload(event));
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index bb79cb5..0dc29e1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -14,10 +14,16 @@
 
 package com.googlesource.gerrit.plugins.kafka.session;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaEventsPublisherMetrics;
+import java.util.Objects;
 import java.util.concurrent.Future;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -80,34 +86,35 @@
     producer = null;
   }
 
-  public void publish(String messageBody) {
-    publish(properties.getTopic(), messageBody);
+  public ListenableFuture<Boolean> publish(String messageBody) {
+    return publish(properties.getTopic(), messageBody);
   }
 
-  public boolean publish(String topic, String messageBody) {
+  public ListenableFuture<Boolean> publish(String topic, String messageBody) {
     if (properties.isSendAsync()) {
       return publishAsync(topic, messageBody);
     }
     return publishSync(topic, messageBody);
   }
 
-  private boolean publishSync(String topic, String messageBody) {
-
+  private ListenableFuture<Boolean> publishSync(String topic, String messageBody) {
+    SettableFuture<Boolean> resultF = SettableFuture.create();
     try {
       Future<RecordMetadata> future =
           producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody));
       RecordMetadata metadata = future.get();
       LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
       publisherMetrics.incrementBrokerPublishedMessage();
-      return true;
+      resultF.set(true);
+      return resultF;
     } catch (Throwable e) {
       LOGGER.error("Cannot send the message", e);
       publisherMetrics.incrementBrokerFailedToPublishMessage();
-      return false;
+      return Futures.immediateFailedFuture(e);
     }
   }
 
-  private boolean publishAsync(String topic, String messageBody) {
+  private ListenableFuture<Boolean> publishAsync(String topic, String messageBody) {
     try {
       Future<RecordMetadata> future =
           producer.send(
@@ -121,11 +128,16 @@
                   publisherMetrics.incrementBrokerFailedToPublishMessage();
                 }
               });
-      return future != null;
+
+      // The transformation is lightweight, so we can afford using a directExecutor
+      return Futures.transform(
+          JdkFutureAdapters.listenInPoolThread(future),
+          Objects::nonNull,
+          MoreExecutors.directExecutor());
     } catch (Throwable e) {
       LOGGER.error("Cannot send the message", e);
       publisherMetrics.incrementBrokerFailedToPublishMessage();
-      return false;
+      return Futures.immediateFailedFuture(e);
     }
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
index 211dd4f..8ea5f3f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
@@ -37,7 +37,7 @@
   public void kafkaEventDeserializerShouldParseAKafkaEvent() {
     final UUID eventId = UUID.randomUUID();
     final String eventType = "event-type";
-    final UUID sourceInstanceId = UUID.randomUUID();
+    final String sourceInstanceId = UUID.randomUUID().toString();
     final long eventCreatedOn = 10L;
     final String eventJson =
         String.format(