Implements async send interface

Events-broker 3.4.0-rc2 version exposes an async send API.
This allows, for example, to attach a callback to the API return value
and correctly calculate publisher metrics.

Bug: Issue 14396
Change-Id: If26a056b5edfb588d16a6df5f9b3008153f1e5f5
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index fc1b8ad..77e9d52 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -9,6 +9,6 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.4.0-rc0",
-        sha1 = "8c34c88103d4783eb4c4decde6d93541bc1cf064",
+        artifact = "com.gerritforge:events-broker:3.4.0-rc2",
+        sha1 = "f72b4166e6d785fd1a41c997a4ffb14461dd7d87",
     )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index f44f4f7..44c4554 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -22,6 +22,7 @@
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import com.google.inject.spi.Message;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
 import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
 import com.googlesource.gerrit.plugins.multisite.event.EventModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
@@ -72,6 +73,8 @@
       install(new IndexModule());
     }
 
+    install(new BrokerModule());
+
     install(new RouterModule());
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index 71be5e6..cff9899 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -17,6 +17,10 @@
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
@@ -26,9 +30,14 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class BrokerApiWrapper implements BrokerApi {
+  private static final Logger log = LoggerFactory.getLogger(BrokerApiWrapper.class);
+  private final Executor executor;
   private final DynamicItem<BrokerApi> apiDelegate;
   private final BrokerMetrics metrics;
   private final MessageLogger msgLog;
@@ -36,37 +45,64 @@
 
   @Inject
   public BrokerApiWrapper(
+      @BrokerExecutor Executor executor,
       DynamicItem<BrokerApi> apiDelegate,
       BrokerMetrics metrics,
       MessageLogger msgLog,
       @InstanceId UUID instanceId) {
     this.apiDelegate = apiDelegate;
+    this.executor = executor;
     this.metrics = metrics;
     this.msgLog = msgLog;
     this.instanceId = instanceId;
   }
 
   public boolean send(String topic, Event event) {
-    return send(topic, apiDelegate.get().newMessage(instanceId, event));
+    try {
+      return send(topic, apiDelegate.get().newMessage(instanceId, event)).get();
+    } catch (Throwable e) {
+      log.error(
+          "Failed to publish event '{}' to topic '{}' - error: {} - stack trace: {}",
+          event,
+          topic,
+          e.getMessage(),
+          e.getStackTrace());
+      metrics.incrementBrokerFailedToPublishMessage();
+      return false;
+    }
   }
 
   @Override
-  public boolean send(String topic, EventMessage message) {
+  public ListenableFuture<Boolean> send(String topic, EventMessage message) {
+    SettableFuture<Boolean> resultFuture = SettableFuture.create();
     if (Context.isForwardedEvent()) {
-      return true;
+      resultFuture.set(true);
+      return resultFuture;
     }
-    boolean succeeded = false;
-    try {
-      succeeded = apiDelegate.get().send(topic, message);
-    } finally {
-      if (succeeded) {
-        msgLog.log(Direction.PUBLISH, topic, message);
-        metrics.incrementBrokerPublishedMessage();
-      } else {
-        metrics.incrementBrokerFailedToPublishMessage();
-      }
-    }
-    return succeeded;
+
+    ListenableFuture<Boolean> resfultF = apiDelegate.get().send(topic, message);
+    Futures.addCallback(
+        resfultF,
+        new FutureCallback<Boolean>() {
+          @Override
+          public void onSuccess(Boolean result) {
+            msgLog.log(Direction.PUBLISH, topic, message);
+            metrics.incrementBrokerPublishedMessage();
+          }
+
+          @Override
+          public void onFailure(Throwable throwable) {
+            log.error(
+                "Failed to publish message '{}' to topic '{}' - error: {}",
+                message.toString(),
+                topic,
+                throwable.getMessage());
+            metrics.incrementBrokerFailedToPublishMessage();
+          }
+        },
+        executor);
+
+    return resfultF;
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutor.java
new file mode 100644
index 0000000..aa24eb1
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutor.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+@interface BrokerExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutorProvider.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutorProvider.java
new file mode 100644
index 0000000..e843263
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutorProvider.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.ExecutorProvider;
+
+@Singleton
+class BrokerExecutorProvider extends ExecutorProvider {
+
+  @Inject
+  BrokerExecutorProvider(WorkQueue workQueue) {
+    super(workQueue, 1, "Multi-Site-Broker");
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
new file mode 100644
index 0000000..a5dac4a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -0,0 +1,28 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.lifecycle.LifecycleModule;
+import java.util.concurrent.Executor;
+
+public class BrokerModule extends LifecycleModule {
+
+  @Override
+  protected void configure() {
+    bind(Executor.class)
+        .annotatedWith(BrokerExecutor.class)
+        .toProvider(BrokerExecutorProvider.class);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
index 92fa101..bb63caf 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -6,6 +6,8 @@
 import static org.mockito.Mockito.when;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.Event;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
@@ -31,19 +33,27 @@
   public void setUp() {
     objectUnderTest =
         new BrokerApiWrapper(
-            DynamicItem.itemOf(BrokerApi.class, brokerApi), brokerMetrics, msgLog, instanceId);
+            MoreExecutors.directExecutor(),
+            DynamicItem.itemOf(BrokerApi.class, brokerApi),
+            brokerMetrics,
+            msgLog,
+            instanceId);
   }
 
   @Test
   public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
-    when(brokerApi.send(any(), any())).thenReturn(true);
+    SettableFuture<Boolean> resultF = SettableFuture.create();
+    resultF.set(true);
+    when(brokerApi.send(any(), any())).thenReturn(resultF);
     objectUnderTest.send(topic, event);
     verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
   }
 
   @Test
   public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailed() {
-    when(brokerApi.send(any(), any())).thenReturn(false);
+    SettableFuture<Boolean> resultF = SettableFuture.create();
+    resultF.setException(new Exception("Force Future failure"));
+    when(brokerApi.send(any(), any())).thenReturn(resultF);
     objectUnderTest.send(topic, event);
     verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
   }