Merge branch 'stable-3.3' into stable-3.4

* stable-3.3:
  Reduce verbose logging when processing message

Change-Id: Ic30872124e0393f75849de30eecf27a38b66dddd
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 0006f37..388a15e 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -109,8 +109,8 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.3.2",
-        sha1 = "d8bcb77047cc12dd7c623b5b4de70a25499d3d6c",
+        artifact = "com.gerritforge:events-broker:3.4.0.4",
+        sha1 = "8d361d863382290e33828116e65698190118d0f1",
     )
 
     maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
index 4e4ca3f..0316c06 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
@@ -15,9 +15,9 @@
 package com.googlesource.gerrit.plugins.kinesis;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
-import com.google.gson.Gson;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import java.util.Collections;
 import java.util.Set;
@@ -28,31 +28,24 @@
 class KinesisBrokerApi implements BrokerApi {
   private final KinesisConsumer.Factory consumerFactory;
 
-  private final Gson gson;
   private final KinesisPublisher kinesisPublisher;
   private final Set<KinesisConsumer> consumers;
 
   @Inject
   public KinesisBrokerApi(
-      Gson gson, KinesisPublisher kinesisPublisher, KinesisConsumer.Factory consumerFactory) {
-    this.gson = gson;
+      KinesisPublisher kinesisPublisher, KinesisConsumer.Factory consumerFactory) {
     this.kinesisPublisher = kinesisPublisher;
     this.consumerFactory = consumerFactory;
     this.consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
   }
 
   @Override
-  public boolean send(String streamName, EventMessage event) {
-    return sendWithResult(streamName, event).isSuccess();
-  }
-
-  PublishResult sendWithResult(String streamName, EventMessage event) {
-    return kinesisPublisher.publish(
-        streamName, gson.toJson(event), event.getHeader().sourceInstanceId.toString());
+  public ListenableFuture<Boolean> send(String streamName, Event event) {
+    return kinesisPublisher.publish(streamName, event);
   }
 
   @Override
-  public void receiveAsync(String streamName, Consumer<EventMessage> eventConsumer) {
+  public void receiveAsync(String streamName, Consumer<Event> eventConsumer) {
     KinesisConsumer consumer = consumerFactory.create(streamName, eventConsumer);
     consumers.add(consumer);
     consumer.subscribe(streamName, eventConsumer);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
index 4a610b5..b563e26 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
@@ -14,8 +14,8 @@
 
 package com.googlesource.gerrit.plugins.kinesis;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -26,7 +26,7 @@
 
 class KinesisConsumer {
   interface Factory {
-    KinesisConsumer create(String topic, Consumer<EventMessage> messageProcessor);
+    KinesisConsumer create(String topic, Consumer<Event> messageProcessor);
   }
 
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -36,7 +36,7 @@
   private final ExecutorService executor;
   private Scheduler kinesisScheduler;
 
-  private java.util.function.Consumer<EventMessage> messageProcessor;
+  private java.util.function.Consumer<Event> messageProcessor;
   private String streamName;
   private AtomicBoolean resetOffset = new AtomicBoolean(false);
 
@@ -52,8 +52,7 @@
     this.executor = executor;
   }
 
-  public void subscribe(
-      String streamName, java.util.function.Consumer<EventMessage> messageProcessor) {
+  public void subscribe(String streamName, java.util.function.Consumer<Event> messageProcessor) {
     this.streamName = streamName;
     this.messageProcessor = messageProcessor;
 
@@ -61,7 +60,7 @@
     runReceiver(messageProcessor);
   }
 
-  private void runReceiver(java.util.function.Consumer<EventMessage> messageProcessor) {
+  private void runReceiver(java.util.function.Consumer<Event> messageProcessor) {
     this.kinesisScheduler =
         schedulerFactory.create(streamName, resetOffset.getAndSet(false), messageProcessor).get();
     executor.execute(kinesisScheduler);
@@ -81,7 +80,7 @@
     logger.atInfo().log("Shutdown kinesis consumer of stream %s completed.", getStreamName());
   }
 
-  public java.util.function.Consumer<EventMessage> getMessageProcessor() {
+  public java.util.function.Consumer<Event> getMessageProcessor() {
     return messageProcessor;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
index 9547191..158f8ac 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
@@ -14,21 +14,21 @@
 
 package com.googlesource.gerrit.plugins.kinesis;
 
-import com.amazonaws.services.kinesis.producer.Attempt;
 import com.amazonaws.services.kinesis.producer.KinesisProducer;
 import com.amazonaws.services.kinesis.producer.UserRecordResult;
 import com.google.common.flogger.FluentLogger;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGson;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Optional;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -46,7 +46,7 @@
 
   @Inject
   public KinesisPublisher(
-      Gson gson,
+      @EventGson Gson gson,
       KinesisProducer kinesisProducer,
       Configuration configuration,
       @ProducerCallbackExecutor ExecutorService callBackExecutor) {
@@ -58,83 +58,37 @@
 
   @Override
   public void onEvent(Event event) {
-    publish(configuration.getStreamEventsTopic(), gson.toJson(event), event.getType());
+    publish(configuration.getStreamEventsTopic(), event);
   }
 
-  PublishResult publish(String streamName, String stringEvent, String partitionKey) {
+  ListenableFuture<Boolean> publish(String streamName, Event event) {
     if (configuration.isSendAsync()) {
-      return publishAsync(streamName, stringEvent, partitionKey);
+      return publishAsync(streamName, gson.toJson(event), event.getType());
     }
-    return publishSync(streamName, stringEvent, partitionKey);
+    return publishSync(streamName, gson.toJson(event), event.getType());
   }
 
-  private PublishResult publishSync(String streamName, String stringEvent, String partitionKey) {
-    logger.atFiner().log(
-        "KINESIS PRODUCER - Attempt to publish event %s to stream %s [PK: %s]",
-        stringEvent, streamName, partitionKey);
-
-    UserRecordResult result = null;
+  private ListenableFuture<Boolean> publishSync(
+      String streamName, String stringEvent, String partitionKey) {
+    SettableFuture<Boolean> resultFuture = SettableFuture.create();
     try {
-      result =
-          kinesisProducer
-              .addUserRecord(streamName, partitionKey, ByteBuffer.wrap(stringEvent.getBytes()))
-              .get(configuration.getPublishTimeoutMs(), TimeUnit.MILLISECONDS);
-
-      List<Attempt> attemptsDetails = result.getAttempts();
-      int numberOfAttempts = attemptsDetails.size();
-      if (result.isSuccessful()) {
-        logger.atFine().log(
-            "KINESIS PRODUCER - Successfully published event '%s' to shardId '%s' [PK: %s] [Sequence: %s] after %s attempt(s)",
-            stringEvent,
-            result.getShardId(),
-            partitionKey,
-            result.getSequenceNumber(),
-            numberOfAttempts);
-        return PublishResult.success(numberOfAttempts);
-      } else {
-        int currentIdx = numberOfAttempts - 1;
-        int previousIdx = currentIdx - 1;
-        Attempt current = attemptsDetails.get(currentIdx);
-        if (previousIdx >= 0) {
-          Attempt previous = attemptsDetails.get(previousIdx);
-          logger.atSevere().log(
-              String.format(
-                  "KINESIS PRODUCER - Failed publishing event '%s' [PK: %s] - %s : %s. Previous failure - %s : %s",
-                  stringEvent,
-                  partitionKey,
-                  current.getErrorCode(),
-                  current.getErrorMessage(),
-                  previous.getErrorCode(),
-                  previous.getErrorMessage()));
-        } else {
-          logger.atSevere().log(
-              String.format(
-                  "KINESIS PRODUCER - Failed publishing event '%s' [PK: %s] - %s : %s.",
-                  stringEvent, partitionKey, current.getErrorCode(), current.getErrorMessage()));
-        }
-      }
-    } catch (InterruptedException e) {
-      logger.atSevere().withCause(e).log(
-          String.format(
-              "KINESIS PRODUCER - Interrupted publishing event '%s' [PK: %s]",
-              stringEvent, partitionKey));
-    } catch (ExecutionException e) {
-      logger.atSevere().withCause(e).log(
-          String.format(
-              "KINESIS PRODUCER - Error when publishing event '%s' [PK: %s]",
-              stringEvent, partitionKey));
-    } catch (TimeoutException e) {
-      logger.atSevere().withCause(e).log(
-          String.format(
-              "KINESIS PRODUCER - Timeout when publishing event '%s' [PK: %s]",
-              stringEvent, partitionKey));
+      resultFuture.set(
+          publishAsync(streamName, stringEvent, partitionKey)
+              .get(configuration.getPublishTimeoutMs(), TimeUnit.MILLISECONDS));
+    } catch (CancellationException
+        | ExecutionException
+        | InterruptedException
+        | TimeoutException futureException) {
+      logger.atSevere().withCause(futureException).log(
+          "KINESIS PRODUCER - Failed publishing event %s [PK: %s]", stringEvent, partitionKey);
+      resultFuture.set(false);
     }
 
-    return PublishResult.failure(
-        Optional.ofNullable(result).map(r -> r.getAttempts().size()).orElse(0));
+    return resultFuture;
   }
 
-  private PublishResult publishAsync(String streamName, String stringEvent, String partitionKey) {
+  private ListenableFuture<Boolean> publishAsync(
+      String streamName, String stringEvent, String partitionKey) {
     try {
       ListenableFuture<UserRecordResult> publishF =
           kinesisProducer.addUserRecord(
@@ -162,12 +116,13 @@
             }
           },
           callBackExecutor);
+
+      return Futures.transform(
+          publishF, res -> res != null && res.isSuccessful(), callBackExecutor);
     } catch (Exception e) {
       logger.atSevere().withCause(e).log(
           "KINESIS PRODUCER - Error when publishing event %s [PK: %s]", stringEvent, partitionKey);
-      return PublishResult.failure(1);
+      return Futures.immediateFailedFuture(e);
     }
-
-    return PublishResult.success(1);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
index 5d50870..749ea2a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
@@ -14,18 +14,13 @@
 
 package com.googlesource.gerrit.plugins.kinesis;
 
-import static java.util.Objects.requireNonNull;
-
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
+import com.gerritforge.gerrit.eventbroker.EventDeserializer;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.util.ManualRequestContext;
 import com.google.gerrit.server.util.OneOffRequestContext;
-import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
-import java.util.UUID;
 import java.util.function.Consumer;
 import software.amazon.kinesis.exceptions.InvalidStateException;
 import software.amazon.kinesis.exceptions.ShutdownException;
@@ -38,20 +33,22 @@
 
 class KinesisRecordProcessor implements ShardRecordProcessor {
   interface Factory {
-    KinesisRecordProcessor create(Consumer<EventMessage> recordProcessor);
+    KinesisRecordProcessor create(Consumer<Event> recordProcessor);
   }
 
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-  private final Consumer<EventMessage> recordProcessor;
+  private final Consumer<Event> recordProcessor;
   private final OneOffRequestContext oneOffCtx;
-  private final Gson gson;
+  private final EventDeserializer eventDeserializer;
 
   @Inject
   KinesisRecordProcessor(
-      @Assisted Consumer<EventMessage> recordProcessor, OneOffRequestContext oneOffCtx, Gson gson) {
+      @Assisted Consumer<Event> recordProcessor,
+      OneOffRequestContext oneOffCtx,
+      EventDeserializer eventDeserializer) {
     this.recordProcessor = recordProcessor;
     this.oneOffCtx = oneOffCtx;
-    this.gson = gson;
+    this.eventDeserializer = eventDeserializer;
   }
 
   @Override
@@ -76,7 +73,7 @@
                 String jsonMessage = new String(byteRecord);
                 logger.atFiner().log("Kinesis consumed event: '%s'", jsonMessage);
                 try (ManualRequestContext ctx = oneOffCtx.open()) {
-                  EventMessage eventMessage = deserialise(jsonMessage);
+                  Event eventMessage = eventDeserializer.deserialize(jsonMessage);
                   recordProcessor.accept(eventMessage);
                 } catch (Exception e) {
                   logger.atSevere().withCause(e).log("Could not process event '%s'", jsonMessage);
@@ -87,23 +84,6 @@
     }
   }
 
-  private EventMessage deserialise(String json) {
-    EventMessage result = gson.fromJson(json, EventMessage.class);
-    if (result.getEvent() == null && result.getHeader() == null) {
-      Event event = deserialiseEvent(json);
-      result = new EventMessage(new Header(UUID.randomUUID(), event.instanceId), event);
-    }
-    result.validate();
-    return result;
-  }
-
-  private Event deserialiseEvent(String json) {
-    Event event = gson.fromJson(json, Event.class);
-    requireNonNull(event.type, "Event type cannot be null");
-    requireNonNull(event.instanceId, "Event instance id cannot be null");
-    return event;
-  }
-
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
     logger.atInfo().log("Lost lease, so terminating.");
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorFactory.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorFactory.java
index 557ad0f..8571a1f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorFactory.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.kinesis;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import java.util.function.Consumer;
@@ -23,16 +23,15 @@
 
 class KinesisRecordProcessorFactory implements ShardRecordProcessorFactory {
   interface Factory {
-    KinesisRecordProcessorFactory create(Consumer<EventMessage> recordProcessor);
+    KinesisRecordProcessorFactory create(Consumer<Event> recordProcessor);
   }
 
-  private final Consumer<EventMessage> recordProcessor;
+  private final Consumer<Event> recordProcessor;
   private final KinesisRecordProcessor.Factory processorFactory;
 
   @Inject
   KinesisRecordProcessorFactory(
-      @Assisted Consumer<EventMessage> recordProcessor,
-      KinesisRecordProcessor.Factory processorFactory) {
+      @Assisted Consumer<Event> recordProcessor, KinesisRecordProcessor.Factory processorFactory) {
     this.recordProcessor = recordProcessor;
     this.processorFactory = processorFactory;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
index b503d0e..10d752c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
@@ -18,7 +18,6 @@
 
 import com.amazonaws.services.kinesis.producer.KinesisProducer;
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
 import com.google.common.collect.Sets;
 import com.google.gerrit.extensions.events.LifecycleListener;
@@ -26,10 +25,8 @@
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.gerrit.server.events.EventListener;
-import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Scopes;
-import com.google.inject.Singleton;
 import com.google.inject.TypeLiteral;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -79,7 +76,6 @@
     factory(SchedulerProvider.Factory.class);
     bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
     DynamicItem.bind(binder(), BrokerApi.class).to(KinesisBrokerApi.class).in(Scopes.SINGLETON);
-    bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class);
     DynamicSet.bind(binder(), LifecycleListener.class).to(KinesisBrokerLifeCycleManager.class);
     factory(KinesisConsumer.Factory.class);
     DynamicSet.bind(binder(), EventListener.class).to(KinesisPublisher.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/PublishResult.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/PublishResult.java
deleted file mode 100644
index 202f3e9..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/PublishResult.java
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright (C) 2021 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.kinesis;
-
-import com.google.auto.value.AutoValue;
-
-@AutoValue
-abstract class PublishResult {
-  public abstract boolean isSuccess();
-
-  public abstract int attempts();
-
-  public static PublishResult success(int attempts) {
-    return new AutoValue_PublishResult(true, attempts);
-  }
-
-  public static PublishResult failure(int attempts) {
-    return new AutoValue_PublishResult(false, attempts);
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
index 19079bf..4c59b49 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
@@ -16,7 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.kinesis.Configuration.cosumerLeaseName;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Provider;
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
@@ -35,7 +35,7 @@
     SchedulerProvider create(
         String streamName,
         boolean fromBeginning,
-        java.util.function.Consumer<EventMessage> messageProcessor);
+        java.util.function.Consumer<Event> messageProcessor);
   }
 
   private final ConfigsBuilder configsBuilder;
@@ -53,7 +53,7 @@
       KinesisRecordProcessorFactory.Factory kinesisRecordProcessorFactory,
       @Assisted String streamName,
       @Assisted boolean fromBeginning,
-      @Assisted java.util.function.Consumer<EventMessage> messageProcessor) {
+      @Assisted java.util.function.Consumer<Event> messageProcessor) {
     this.configuration = configuration;
     this.kinesisAsyncClient = kinesisAsyncClient;
     this.streamName = streamName;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
index 1ff7faa..f70a5be 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
@@ -15,21 +15,26 @@
 package com.googlesource.gerrit.plugins.kinesis;
 
 import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
 import static org.testcontainers.containers.localstack.LocalStackContainer.Service.CLOUDWATCH;
 import static org.testcontainers.containers.localstack.LocalStackContainer.Service.DYNAMODB;
 import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.WaitUtil;
 import com.google.gerrit.acceptance.config.GerritConfig;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import org.junit.Before;
 import org.junit.Test;
@@ -51,8 +56,10 @@
   // lease on the newly created stream
   private static final Duration WAIT_FOR_CONSUMPTION = Duration.ofSeconds(120);
   private static final Duration STREAM_CREATION_TIMEOUT = Duration.ofSeconds(10);
+  private static final long SEND_TIMEOUT_MILLIS = 200;
 
   private static final int LOCALSTACK_PORT = 4566;
+
   private LocalStackContainer localstack =
       new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.17.5"))
           .withServices(DYNAMODB, KINESIS, CLOUDWATCH)
@@ -106,11 +113,30 @@
     EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
     kinesisBroker().receiveAsync(streamName, eventConsumerCounter);
 
-    kinesisBroker().send(streamName, eventMessage());
+    Event event = eventMessage();
+    kinesisBroker().send(streamName, event);
     WaitUtil.waitUntil(
         () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
-    assertThat(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId)
-        .isEqualTo(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId);
+    compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
+  }
+
+  @Test
+  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
+  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
+  public void shouldConsumeAnEventWithoutInstanceId() throws Exception {
+    String streamName = UUID.randomUUID().toString();
+    createStreamAndWait(streamName, STREAM_CREATION_TIMEOUT);
+
+    EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
+    kinesisBroker().receiveAsync(streamName, eventConsumerCounter);
+
+    Event event = eventMessage();
+    event.instanceId = null;
+
+    kinesisBroker().send(streamName, event);
+    WaitUtil.waitUntil(
+        () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
+    compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
   }
 
   @Test
@@ -123,13 +149,12 @@
     EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
     kinesisBroker().receiveAsync(streamName, eventConsumerCounter);
 
-    EventMessage event = eventMessage();
+    Event event = eventMessage();
     kinesisBroker().send(streamName, event);
 
     WaitUtil.waitUntil(
         () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
-    assertThat(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId)
-        .isEqualTo(event.getHeader().eventId);
+    compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
 
     eventConsumerCounter.clear();
     kinesisBroker().disconnect();
@@ -138,8 +163,7 @@
 
     WaitUtil.waitUntil(
         () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
-    assertThat(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId)
-        .isEqualTo(event.getHeader().eventId);
+    compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
   }
 
   @Test
@@ -147,48 +171,52 @@
   @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
   @GerritConfig(name = "plugin.events-aws-kinesis.publishTimeoutMs", value = "10000")
   @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "false")
-  public void sendingSynchronouslyShouldRetryUntilSuccessful() {
+  public void sendingSynchronouslyShouldBeSuccessful()
+      throws InterruptedException, ExecutionException {
     String streamName = UUID.randomUUID().toString();
     createStreamAsync(streamName);
 
-    PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
-    assertThat(publishResult.isSuccess()).isTrue();
-    assertThat(publishResult.attempts()).isGreaterThan(1);
+    ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+    assertThat(result.get()).isTrue();
   }
 
   @Test
   @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
   @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
   @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "false")
-  public void sendingSynchronouslyShouldBeUnsuccessfulWhenTimingOut() {
+  public void sendingSynchronouslyShouldBeUnsuccessfulWhenTimingOut()
+      throws InterruptedException, ExecutionException, TimeoutException {
     String streamName = "not-existing-stream";
 
-    PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
-    assertThat(publishResult.isSuccess()).isFalse();
+    ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+    assertThat(result.get(SEND_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)).isFalse();
   }
 
   @Test
   @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
   @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
   @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "true")
-  public void sendingAsynchronouslyShouldBeImmediatelySuccessfulEvenWhenStreamDoesNotExist() {
+  public void sendingAsynchronouslyShouldFailWhenStreamDoesNotExist() {
     String streamName = "not-existing-stream";
 
-    PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
-    assertThat(publishResult.isSuccess()).isTrue();
+    ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+    ExecutionException executionException = assertThrows(ExecutionException.class, result::get);
+    assertThat(executionException)
+        .hasMessageThat()
+        .contains("com.amazonaws.services.kinesis.producer.UserRecordFailedException");
   }
 
   @Test
   @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
   @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
   @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "true")
-  public void sendingAsynchronouslyShouldBeImmediatelySuccessful() {
+  public void sendingAsynchronouslyShouldBeSuccessful()
+      throws InterruptedException, ExecutionException {
     String streamName = UUID.randomUUID().toString();
     createStreamAsync(streamName);
 
-    PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
-    assertThat(publishResult.isSuccess()).isTrue();
-    assertThat(publishResult.attempts()).isEqualTo(1);
+    ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+    assertThat(result.get()).isTrue();
   }
 
   public KinesisBrokerApi kinesisBroker() {
@@ -214,20 +242,27 @@
         CreateStreamRequest.builder().streamName(streamName).shardCount(1).build());
   }
 
-  private EventMessage eventMessage() {
-    return new EventMessage(
-        new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()), new ProjectCreatedEvent());
+  private Event eventMessage() {
+    Event event = new ProjectCreatedEvent();
+    event.instanceId = "instance-id";
+    return event;
   }
 
-  private static class EventConsumerCounter implements Consumer<EventMessage> {
-    List<EventMessage> consumedMessages = new ArrayList<>();
+  private void compareEvents(Event event, Event expectedEvent) {
+    assertThat(event.type).isEqualTo(expectedEvent.type);
+    assertThat(event.eventCreatedOn).isEqualTo(expectedEvent.eventCreatedOn);
+    assertThat(event.instanceId).isEqualTo(expectedEvent.instanceId);
+  }
+
+  private static class EventConsumerCounter implements Consumer<Event> {
+    List<Event> consumedMessages = new ArrayList<>();
 
     @Override
-    public void accept(EventMessage eventMessage) {
+    public void accept(Event eventMessage) {
       consumedMessages.add(eventMessage);
     }
 
-    public List<EventMessage> getConsumedMessages() {
+    public List<Event> getConsumedMessages() {
       return consumedMessages;
     }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
index 2401034..d488ab8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
@@ -18,12 +18,13 @@
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.EventDeserializer;
 import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
 import com.google.gerrit.server.util.ManualRequestContext;
 import com.google.gerrit.server.util.OneOffRequestContext;
@@ -47,29 +48,29 @@
 public class KinesisRecordProcessorTest {
   private KinesisRecordProcessor objectUnderTest;
   private Gson gson = new EventGsonProvider().get();
+  private EventDeserializer eventDeserializer = new EventDeserializer(gson);
 
-  @Mock Consumer<EventMessage> succeedingConsumer;
-  @Captor ArgumentCaptor<EventMessage> eventMessageCaptor;
+  @Mock Consumer<Event> succeedingConsumer;
+  @Captor ArgumentCaptor<Event> eventMessageCaptor;
   @Mock OneOffRequestContext oneOffCtx;
   @Mock ManualRequestContext requestContext;
 
   @Before
   public void setup() {
     when(oneOffCtx.open()).thenReturn(requestContext);
-    objectUnderTest = new KinesisRecordProcessor(succeedingConsumer, oneOffCtx, gson);
+    objectUnderTest = new KinesisRecordProcessor(succeedingConsumer, oneOffCtx, eventDeserializer);
   }
 
   @Test
   public void shouldSkipEventWithoutSourceInstanceId() {
     Event event = new ProjectCreatedEvent();
-    EventMessage messageWithoutSourceInstanceId =
-        new EventMessage(new EventMessage.Header(UUID.randomUUID(), (String) null), event);
+    event.instanceId = UUID.randomUUID().toString();
 
-    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(messageWithoutSourceInstanceId));
+    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
 
     objectUnderTest.processRecords(kinesisInput);
 
-    verify(succeedingConsumer, never()).accept(messageWithoutSourceInstanceId);
+    verify(succeedingConsumer, never()).accept(event);
   }
 
   @Test
@@ -84,19 +85,19 @@
 
     verify(succeedingConsumer, only()).accept(eventMessageCaptor.capture());
 
-    EventMessage result = eventMessageCaptor.getValue();
-    assertThat(result.getHeader().sourceInstanceId).isEqualTo(instanceId);
+    Event result = eventMessageCaptor.getValue();
+    assertThat(result.instanceId).isEqualTo(instanceId);
   }
 
   @Test
-  public void shouldSkipEventObjectWithoutInstanceId() {
+  public void shouldProcessEventObjectWithoutInstanceId() {
     Event event = new ProjectCreatedEvent();
     event.instanceId = null;
 
     ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
     objectUnderTest.processRecords(kinesisInput);
 
-    verify(succeedingConsumer, never()).accept(any());
+    verify(succeedingConsumer, times(1)).accept(any());
   }
 
   @Test
@@ -141,7 +142,7 @@
     ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
     objectUnderTest.processRecords(kinesisInput);
 
-    verify(succeedingConsumer, only()).accept(any(EventMessage.class));
+    verify(succeedingConsumer, only()).accept(any(Event.class));
   }
 
   private ProcessRecordsInput sampleMessage(String message) {