Implement async send method as per new Events Broker API

Bug: Issue 14428
Change-Id: I0ae55fdf47f0086f5ef6f472cfdb616f7d5c08f1
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index ad74876..7aa154e 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -109,8 +109,8 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.4.0-rc0",
-        sha1 = "8c34c88103d4783eb4c4decde6d93541bc1cf064",
+        artifact = "com.gerritforge:events-broker:3.4.0-rc2",
+        sha1 = "f72b4166e6d785fd1a41c997a4ffb14461dd7d87",
     )
 
     maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
index 4e4ca3f..1c053b4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
@@ -17,6 +17,7 @@
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import java.util.Collections;
@@ -42,11 +43,7 @@
   }
 
   @Override
-  public boolean send(String streamName, EventMessage event) {
-    return sendWithResult(streamName, event).isSuccess();
-  }
-
-  PublishResult sendWithResult(String streamName, EventMessage event) {
+  public ListenableFuture<Boolean> send(String streamName, EventMessage event) {
     return kinesisPublisher.publish(
         streamName, gson.toJson(event), event.getHeader().sourceInstanceId.toString());
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
index 9547191..225649a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
@@ -14,21 +14,20 @@
 
 package com.googlesource.gerrit.plugins.kinesis;
 
-import com.amazonaws.services.kinesis.producer.Attempt;
 import com.amazonaws.services.kinesis.producer.KinesisProducer;
 import com.amazonaws.services.kinesis.producer.UserRecordResult;
 import com.google.common.flogger.FluentLogger;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Optional;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -61,80 +60,34 @@
     publish(configuration.getStreamEventsTopic(), gson.toJson(event), event.getType());
   }
 
-  PublishResult publish(String streamName, String stringEvent, String partitionKey) {
+  ListenableFuture<Boolean> publish(String streamName, String stringEvent, String partitionKey) {
     if (configuration.isSendAsync()) {
       return publishAsync(streamName, stringEvent, partitionKey);
     }
     return publishSync(streamName, stringEvent, partitionKey);
   }
 
-  private PublishResult publishSync(String streamName, String stringEvent, String partitionKey) {
-    logger.atFiner().log(
-        "KINESIS PRODUCER - Attempt to publish event %s to stream %s [PK: %s]",
-        stringEvent, streamName, partitionKey);
-
-    UserRecordResult result = null;
+  private ListenableFuture<Boolean> publishSync(
+      String streamName, String stringEvent, String partitionKey) {
+    SettableFuture<Boolean> resultFuture = SettableFuture.create();
     try {
-      result =
-          kinesisProducer
-              .addUserRecord(streamName, partitionKey, ByteBuffer.wrap(stringEvent.getBytes()))
-              .get(configuration.getPublishTimeoutMs(), TimeUnit.MILLISECONDS);
-
-      List<Attempt> attemptsDetails = result.getAttempts();
-      int numberOfAttempts = attemptsDetails.size();
-      if (result.isSuccessful()) {
-        logger.atFine().log(
-            "KINESIS PRODUCER - Successfully published event '%s' to shardId '%s' [PK: %s] [Sequence: %s] after %s attempt(s)",
-            stringEvent,
-            result.getShardId(),
-            partitionKey,
-            result.getSequenceNumber(),
-            numberOfAttempts);
-        return PublishResult.success(numberOfAttempts);
-      } else {
-        int currentIdx = numberOfAttempts - 1;
-        int previousIdx = currentIdx - 1;
-        Attempt current = attemptsDetails.get(currentIdx);
-        if (previousIdx >= 0) {
-          Attempt previous = attemptsDetails.get(previousIdx);
-          logger.atSevere().log(
-              String.format(
-                  "KINESIS PRODUCER - Failed publishing event '%s' [PK: %s] - %s : %s. Previous failure - %s : %s",
-                  stringEvent,
-                  partitionKey,
-                  current.getErrorCode(),
-                  current.getErrorMessage(),
-                  previous.getErrorCode(),
-                  previous.getErrorMessage()));
-        } else {
-          logger.atSevere().log(
-              String.format(
-                  "KINESIS PRODUCER - Failed publishing event '%s' [PK: %s] - %s : %s.",
-                  stringEvent, partitionKey, current.getErrorCode(), current.getErrorMessage()));
-        }
-      }
-    } catch (InterruptedException e) {
-      logger.atSevere().withCause(e).log(
-          String.format(
-              "KINESIS PRODUCER - Interrupted publishing event '%s' [PK: %s]",
-              stringEvent, partitionKey));
-    } catch (ExecutionException e) {
-      logger.atSevere().withCause(e).log(
-          String.format(
-              "KINESIS PRODUCER - Error when publishing event '%s' [PK: %s]",
-              stringEvent, partitionKey));
-    } catch (TimeoutException e) {
-      logger.atSevere().withCause(e).log(
-          String.format(
-              "KINESIS PRODUCER - Timeout when publishing event '%s' [PK: %s]",
-              stringEvent, partitionKey));
+      resultFuture.set(
+          publishAsync(streamName, stringEvent, partitionKey)
+              .get(configuration.getPublishTimeoutMs(), TimeUnit.MILLISECONDS));
+    } catch (CancellationException
+        | ExecutionException
+        | InterruptedException
+        | TimeoutException futureException) {
+      logger.atSevere().withCause(futureException).log(
+          "KINESIS PRODUCER - Failed publishing event %s [PK: %s]", stringEvent, partitionKey);
+      resultFuture.set(false);
     }
 
-    return PublishResult.failure(
-        Optional.ofNullable(result).map(r -> r.getAttempts().size()).orElse(0));
+    return resultFuture;
   }
 
-  private PublishResult publishAsync(String streamName, String stringEvent, String partitionKey) {
+  private ListenableFuture<Boolean> publishAsync(
+      String streamName, String stringEvent, String partitionKey) {
     try {
       ListenableFuture<UserRecordResult> publishF =
           kinesisProducer.addUserRecord(
@@ -162,12 +115,13 @@
             }
           },
           callBackExecutor);
+
+      return Futures.transform(
+          publishF, res -> res != null && res.isSuccessful(), callBackExecutor);
     } catch (Exception e) {
       logger.atSevere().withCause(e).log(
           "KINESIS PRODUCER - Error when publishing event %s [PK: %s]", stringEvent, partitionKey);
-      return PublishResult.failure(1);
+      return Futures.immediateFailedFuture(e);
     }
-
-    return PublishResult.success(1);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/PublishResult.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/PublishResult.java
deleted file mode 100644
index 202f3e9..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/PublishResult.java
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright (C) 2021 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.kinesis;
-
-import com.google.auto.value.AutoValue;
-
-@AutoValue
-abstract class PublishResult {
-  public abstract boolean isSuccess();
-
-  public abstract int attempts();
-
-  public static PublishResult success(int attempts) {
-    return new AutoValue_PublishResult(true, attempts);
-  }
-
-  public static PublishResult failure(int attempts) {
-    return new AutoValue_PublishResult(false, attempts);
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
index 7b5dded..d7e4d07 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
@@ -15,12 +15,14 @@
 package com.googlesource.gerrit.plugins.kinesis;
 
 import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
 import static org.testcontainers.containers.localstack.LocalStackContainer.Service.CLOUDWATCH;
 import static org.testcontainers.containers.localstack.LocalStackContainer.Service.DYNAMODB;
 import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.WaitUtil;
@@ -30,6 +32,9 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import org.junit.Before;
 import org.junit.Test;
@@ -51,8 +56,10 @@
   // lease on the newly created stream
   private static final Duration WAIT_FOR_CONSUMPTION = Duration.ofSeconds(120);
   private static final Duration STREAM_CREATION_TIMEOUT = Duration.ofSeconds(10);
+  private static final long SEND_TIMEOUT_MILLIS = 200;
 
   private static final int LOCALSTACK_PORT = 4566;
+
   private LocalStackContainer localstack =
       new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.8"))
           .withServices(DYNAMODB, KINESIS, CLOUDWATCH)
@@ -147,48 +154,52 @@
   @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
   @GerritConfig(name = "plugin.events-aws-kinesis.publishTimeoutMs", value = "10000")
   @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "false")
-  public void sendingSynchronouslyShouldRetryUntilSuccessful() {
+  public void sendingSynchronouslyShouldBeSuccessful()
+      throws InterruptedException, ExecutionException {
     String streamName = UUID.randomUUID().toString();
     createStreamAsync(streamName);
 
-    PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
-    assertThat(publishResult.isSuccess()).isTrue();
-    assertThat(publishResult.attempts()).isGreaterThan(1);
+    ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+    assertThat(result.get()).isTrue();
   }
 
   @Test
   @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
   @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
   @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "false")
-  public void sendingSynchronouslyShouldBeUnsuccessfulWhenTimingOut() {
+  public void sendingSynchronouslyShouldBeUnsuccessfulWhenTimingOut()
+      throws InterruptedException, ExecutionException, TimeoutException {
     String streamName = "not-existing-stream";
 
-    PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
-    assertThat(publishResult.isSuccess()).isFalse();
+    ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+    assertThat(result.get(SEND_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)).isFalse();
   }
 
   @Test
   @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
   @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
   @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "true")
-  public void sendingAsynchronouslyShouldBeImmediatelySuccessfulEvenWhenStreamDoesNotExist() {
+  public void sendingAsynchronouslyShouldFailWhenStreamDoesNotExist() {
     String streamName = "not-existing-stream";
 
-    PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
-    assertThat(publishResult.isSuccess()).isTrue();
+    ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+    ExecutionException executionException = assertThrows(ExecutionException.class, result::get);
+    assertThat(executionException)
+        .hasMessageThat()
+        .contains("com.amazonaws.services.kinesis.producer.UserRecordFailedException");
   }
 
   @Test
   @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
   @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
   @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "true")
-  public void sendingAsynchronouslyShouldBeImmediatelySuccessful() {
+  public void sendingAsynchronouslyShouldBeSuccessful()
+      throws InterruptedException, ExecutionException {
     String streamName = UUID.randomUUID().toString();
     createStreamAsync(streamName);
 
-    PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
-    assertThat(publishResult.isSuccess()).isTrue();
-    assertThat(publishResult.attempts()).isEqualTo(1);
+    ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+    assertThat(result.get()).isTrue();
   }
 
   public KinesisBrokerApi kinesisBroker() {