Merge branch 'stable-3.3'

* stable-3.3:
  Deserialize Event and EventMessage
  Fix issue with message serialisation
  Bump events-broker to v3.3.2
  Add message content validation

Change-Id: I86d8e4f559ec4bf979ab31dded198abfbfc6553f
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 5a21ad4..d26ffb9 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -45,8 +45,8 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.3.2",
-        sha1 = "d8bcb77047cc12dd7c623b5b4de70a25499d3d6c",
+        artifact = "com.gerritforge:events-broker:3.4.0-rc2",
+        sha1 = "f72b4166e6d785fd1a41c997a4ffb14461dd7d87",
     )
 
     maven_jar(
@@ -161,4 +161,4 @@
         name = "grpc-auth",
         artifact = "io.grpc:grpc-auth:1.36.0",
         sha1 = "d9722016658f8e649111c8bb93b299ea38dc207e",
-    )
\ No newline at end of file
+    )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
index c989aa4..7d07a5f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
@@ -18,6 +18,7 @@
 import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
 import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Inject;
 import java.util.Collections;
 import java.util.Map;
@@ -42,7 +43,7 @@
   }
 
   @Override
-  public boolean send(String topic, EventMessage message) {
+  public ListenableFuture<Boolean> send(String topic, EventMessage message) {
     return publishers.computeIfAbsent(topic, t -> publisherFactory.create(t)).publish(message);
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
index 54efac3..614d0aa 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
@@ -33,7 +33,6 @@
   private final String gcloudProject;
   private final String subscriptionId;
   private final Integer numberOfSubscribers;
-  private final Boolean sendAsync;
   private final String privateKeyLocation;
   private final Integer ackDeadlineSeconds;
   private final Long subscribtionTimeoutInSeconds;
@@ -47,7 +46,6 @@
       @PluginName String pluginName,
       @Nullable @GerritInstanceId String instanceId) {
     this.fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
-    this.sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
     this.gcloudProject = getMandatoryString("gcloudProject");
     this.subscriptionId = getMandatoryString("subscriptionId", instanceId);
     this.privateKeyLocation = getMandatoryString("privateKeyLocation");
@@ -68,10 +66,6 @@
             fromGerritConfig.getString("shutdownTimeoutInSeconds", DEFAULT_SHUTDOWN_TIMEOUT));
   }
 
-  public Boolean isSendAsync() {
-    return sendAsync;
-  }
-
   public String getGCloudProject() {
     return gcloudProject;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
index a802d71..850a96d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
@@ -20,6 +20,9 @@
 import com.google.api.core.ApiFutures;
 import com.google.cloud.pubsub.v1.Publisher;
 import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gerrit.server.events.Event;
 import com.google.gson.Gson;
@@ -28,9 +31,8 @@
 import com.google.protobuf.ByteString;
 import com.google.pubsub.v1.PubsubMessage;
 import java.io.IOException;
-import java.util.concurrent.ExecutionException;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 public class PubSubPublisher {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -60,26 +62,21 @@
     this.pubSubProperties = pubSubProperties;
   }
 
-  public boolean publish(Event event) {
+  public ListenableFuture<Boolean> publish(Event event) {
     return publish(gson.toJson(event));
   }
 
-  public boolean publish(EventMessage event) {
+  public ListenableFuture<Boolean> publish(EventMessage event) {
     return publish(gson.toJson(event));
   }
 
-  private boolean publish(String eventPayload) {
+  private ListenableFuture<Boolean> publish(String eventPayload) {
     ByteString data = ByteString.copyFromUtf8(eventPayload);
     PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
-    if (pubSubProperties.isSendAsync()) {
-      return publishAsync(pubsubMessage) != null;
-    }
-
-    publishSync(pubsubMessage);
-    return true;
+    return publishAsync(pubsubMessage);
   }
 
-  private ApiFuture<String> publishAsync(PubsubMessage pubsubMessage) {
+  private ListenableFuture<Boolean> publishAsync(PubsubMessage pubsubMessage) {
     ApiFuture<String> publish = publisher.publish(pubsubMessage);
     ApiFutures.addCallback(
         publish,
@@ -102,17 +99,11 @@
           }
         },
         MoreExecutors.directExecutor());
-    return publish;
-  }
 
-  private void publishSync(PubsubMessage pubsubMessage) {
-    try {
-      ApiFuture<String> messageIdFuture = publishAsync(pubsubMessage);
-      messageIdFuture.get(1000, TimeUnit.SECONDS);
-
-    } catch (InterruptedException | ExecutionException | TimeoutException e) {
-      logger.atSevere().withCause(e).log("Cannot send the message");
-    }
+    return Futures.transform(
+        JdkFutureAdapters.listenInPoolThread(publish),
+        Objects::nonNull,
+        MoreExecutors.directExecutor());
   }
 
   public void close() throws InterruptedException {
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 2e93838..f3263d3 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -34,16 +34,6 @@
     to allocate a thread pool able to run all subscribers.
     Default: 6
 
-`plugin.events-gcloud-pubsub.sendAsync`
-:   Optional. Send messages to GCloud PubSub asynchronously, detaching the calling
-    process from the acknowledge of the message being sent.
-    The drawback of the enabling the sendAsync parameter is that the broker would only
-    return the status of the successful invocation of the message send operation and not
-    the actual status received by the broker. This means that when sendAsync is enabled
-    'broker_msg_publisher_failure_counter' metric is not incremented when message send
-    failure occurs.
-    Default: true
-
 `plugin.events-gcloud-pubsub.ackDeadlineSeconds`
 :   Optional. The approximate amount of time (on a best-effort basis) Pub/Sub waits for
     the subscriber to acknowledge receipt before resending the message.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
index c009973..97a2c8d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
@@ -60,18 +60,6 @@
 
   @Test
   public void shouldIncrementFailedToPublishMessageWhenAsyncPublishFails() {
-    when(confMock.isSendAsync()).thenReturn(true);
-    when(publisherMock.publish(any()))
-        .thenReturn(ApiFutures.immediateFailedFuture(new Exception("Something went wrong")));
-
-    objectUnderTest.publish(eventMessage);
-
-    verify(pubSubPublisherMetricsMock, only()).incrementFailedToPublishMessage();
-  }
-
-  @Test
-  public void shouldIncrementFailedToPublishMessageWhenSyncPublishFails() {
-    when(confMock.isSendAsync()).thenReturn(false);
     when(publisherMock.publish(any()))
         .thenReturn(ApiFutures.immediateFailedFuture(new Exception("Something went wrong")));
 
@@ -82,17 +70,6 @@
 
   @Test
   public void shouldIncrementSuccessToPublishMessageWhenAsyncPublishSucceeds() {
-    when(confMock.isSendAsync()).thenReturn(true);
-    when(publisherMock.publish(any())).thenReturn(ApiFutures.immediateFuture("some-message-id"));
-
-    objectUnderTest.publish(eventMessage);
-
-    verify(pubSubPublisherMetricsMock, only()).incrementSucceedToPublishMessage();
-  }
-
-  @Test
-  public void shouldIncrementSuccessToPublishMessageWhenSyncPublishSucceeds() {
-    when(confMock.isSendAsync()).thenReturn(false);
     when(publisherMock.publish(any())).thenReturn(ApiFutures.immediateFuture("some-message-id"));
 
     objectUnderTest.publish(eventMessage);