Send/receive Event object instead of EventMessage

Event object contains instance id populated by Gerrit Core. Workaround
with EventMessage using sourceInstanceId field to recognise the event
source node is not needed anymore. Use Event object instead of
EventMessage.

Bug: Issue 14390
Change-Id: I7bb13eeac0e1c49c511a55e186276e7d03069673
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 0bb8f8a..388a15e 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -109,8 +109,8 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.4.0.2",
-        sha1 = "8a56300ce92c3e25b4669a0511b4c520b34851b2",
+        artifact = "com.gerritforge:events-broker:3.4.0.4",
+        sha1 = "8d361d863382290e33828116e65698190118d0f1",
     )
 
     maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
index 36e9f60..9846850 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
@@ -15,9 +15,9 @@
 package com.googlesource.gerrit.plugins.kinesis;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGson;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
@@ -46,13 +46,12 @@
   }
 
   @Override
-  public ListenableFuture<Boolean> send(String streamName, EventMessage event) {
-    return kinesisPublisher.publish(
-        streamName, gson.toJson(event), event.getHeader().sourceInstanceId.toString());
+  public ListenableFuture<Boolean> send(String streamName, Event event) {
+    return kinesisPublisher.publish(streamName, gson.toJson(event), event.instanceId);
   }
 
   @Override
-  public void receiveAsync(String streamName, Consumer<EventMessage> eventConsumer) {
+  public void receiveAsync(String streamName, Consumer<Event> eventConsumer) {
     KinesisConsumer consumer = consumerFactory.create(streamName, eventConsumer);
     consumers.add(consumer);
     consumer.subscribe(streamName, eventConsumer);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
index 4a610b5..b563e26 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
@@ -14,8 +14,8 @@
 
 package com.googlesource.gerrit.plugins.kinesis;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -26,7 +26,7 @@
 
 class KinesisConsumer {
   interface Factory {
-    KinesisConsumer create(String topic, Consumer<EventMessage> messageProcessor);
+    KinesisConsumer create(String topic, Consumer<Event> messageProcessor);
   }
 
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -36,7 +36,7 @@
   private final ExecutorService executor;
   private Scheduler kinesisScheduler;
 
-  private java.util.function.Consumer<EventMessage> messageProcessor;
+  private java.util.function.Consumer<Event> messageProcessor;
   private String streamName;
   private AtomicBoolean resetOffset = new AtomicBoolean(false);
 
@@ -52,8 +52,7 @@
     this.executor = executor;
   }
 
-  public void subscribe(
-      String streamName, java.util.function.Consumer<EventMessage> messageProcessor) {
+  public void subscribe(String streamName, java.util.function.Consumer<Event> messageProcessor) {
     this.streamName = streamName;
     this.messageProcessor = messageProcessor;
 
@@ -61,7 +60,7 @@
     runReceiver(messageProcessor);
   }
 
-  private void runReceiver(java.util.function.Consumer<EventMessage> messageProcessor) {
+  private void runReceiver(java.util.function.Consumer<Event> messageProcessor) {
     this.kinesisScheduler =
         schedulerFactory.create(streamName, resetOffset.getAndSet(false), messageProcessor).get();
     executor.execute(kinesisScheduler);
@@ -81,7 +80,7 @@
     logger.atInfo().log("Shutdown kinesis consumer of stream %s completed.", getStreamName());
   }
 
-  public java.util.function.Consumer<EventMessage> getMessageProcessor() {
+  public java.util.function.Consumer<Event> getMessageProcessor() {
     return messageProcessor;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
index 1357e44..5b93b4c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
@@ -15,8 +15,8 @@
 package com.googlesource.gerrit.plugins.kinesis;
 
 import com.gerritforge.gerrit.eventbroker.EventDeserializer;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.util.ManualRequestContext;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.inject.Inject;
@@ -33,17 +33,17 @@
 
 class KinesisRecordProcessor implements ShardRecordProcessor {
   interface Factory {
-    KinesisRecordProcessor create(Consumer<EventMessage> recordProcessor);
+    KinesisRecordProcessor create(Consumer<Event> recordProcessor);
   }
 
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-  private final Consumer<EventMessage> recordProcessor;
+  private final Consumer<Event> recordProcessor;
   private final OneOffRequestContext oneOffCtx;
   private final EventDeserializer eventDeserializer;
 
   @Inject
   KinesisRecordProcessor(
-      @Assisted Consumer<EventMessage> recordProcessor,
+      @Assisted Consumer<Event> recordProcessor,
       OneOffRequestContext oneOffCtx,
       EventDeserializer eventDeserializer) {
     this.recordProcessor = recordProcessor;
@@ -73,7 +73,7 @@
                 String jsonMessage = new String(byteRecord);
                 logger.atFiner().log("Kinesis consumed event: '%s'", jsonMessage);
                 try (ManualRequestContext ctx = oneOffCtx.open()) {
-                  EventMessage eventMessage = eventDeserializer.deserialize(jsonMessage);
+                  Event eventMessage = eventDeserializer.deserialize(jsonMessage);
                   recordProcessor.accept(eventMessage);
                 } catch (Exception e) {
                   logger.atSevere().withCause(e).log("Could not process event '%s'", jsonMessage);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorFactory.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorFactory.java
index 557ad0f..8571a1f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorFactory.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.kinesis;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import java.util.function.Consumer;
@@ -23,16 +23,15 @@
 
 class KinesisRecordProcessorFactory implements ShardRecordProcessorFactory {
   interface Factory {
-    KinesisRecordProcessorFactory create(Consumer<EventMessage> recordProcessor);
+    KinesisRecordProcessorFactory create(Consumer<Event> recordProcessor);
   }
 
-  private final Consumer<EventMessage> recordProcessor;
+  private final Consumer<Event> recordProcessor;
   private final KinesisRecordProcessor.Factory processorFactory;
 
   @Inject
   KinesisRecordProcessorFactory(
-      @Assisted Consumer<EventMessage> recordProcessor,
-      KinesisRecordProcessor.Factory processorFactory) {
+      @Assisted Consumer<Event> recordProcessor, KinesisRecordProcessor.Factory processorFactory) {
     this.recordProcessor = recordProcessor;
     this.processorFactory = processorFactory;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
index 19079bf..4c59b49 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
@@ -16,7 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.kinesis.Configuration.cosumerLeaseName;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Provider;
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
@@ -35,7 +35,7 @@
     SchedulerProvider create(
         String streamName,
         boolean fromBeginning,
-        java.util.function.Consumer<EventMessage> messageProcessor);
+        java.util.function.Consumer<Event> messageProcessor);
   }
 
   private final ConfigsBuilder configsBuilder;
@@ -53,7 +53,7 @@
       KinesisRecordProcessorFactory.Factory kinesisRecordProcessorFactory,
       @Assisted String streamName,
       @Assisted boolean fromBeginning,
-      @Assisted java.util.function.Consumer<EventMessage> messageProcessor) {
+      @Assisted java.util.function.Consumer<Event> messageProcessor) {
     this.configuration = configuration;
     this.kinesisAsyncClient = kinesisAsyncClient;
     this.streamName = streamName;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
index d7e4d07..6b0eeb4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
@@ -21,12 +21,12 @@
 import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.WaitUtil;
 import com.google.gerrit.acceptance.config.GerritConfig;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -113,11 +113,11 @@
     EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
     kinesisBroker().receiveAsync(streamName, eventConsumerCounter);
 
-    kinesisBroker().send(streamName, eventMessage());
+    Event event = eventMessage();
+    kinesisBroker().send(streamName, event);
     WaitUtil.waitUntil(
         () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
-    assertThat(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId)
-        .isEqualTo(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId);
+    compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
   }
 
   @Test
@@ -130,13 +130,12 @@
     EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
     kinesisBroker().receiveAsync(streamName, eventConsumerCounter);
 
-    EventMessage event = eventMessage();
+    Event event = eventMessage();
     kinesisBroker().send(streamName, event);
 
     WaitUtil.waitUntil(
         () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
-    assertThat(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId)
-        .isEqualTo(event.getHeader().eventId);
+    compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
 
     eventConsumerCounter.clear();
     kinesisBroker().disconnect();
@@ -145,8 +144,7 @@
 
     WaitUtil.waitUntil(
         () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
-    assertThat(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId)
-        .isEqualTo(event.getHeader().eventId);
+    compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
   }
 
   @Test
@@ -225,20 +223,27 @@
         CreateStreamRequest.builder().streamName(streamName).shardCount(1).build());
   }
 
-  private EventMessage eventMessage() {
-    return new EventMessage(
-        new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()), new ProjectCreatedEvent());
+  private Event eventMessage() {
+    Event event = new ProjectCreatedEvent();
+    event.instanceId = "instance-id";
+    return event;
   }
 
-  private static class EventConsumerCounter implements Consumer<EventMessage> {
-    List<EventMessage> consumedMessages = new ArrayList<>();
+  private void compareEvents(Event event, Event expectedEvent) {
+    assertThat(event.type).isEqualTo(expectedEvent.type);
+    assertThat(event.eventCreatedOn).isEqualTo(expectedEvent.eventCreatedOn);
+    assertThat(event.instanceId).isEqualTo(expectedEvent.instanceId);
+  }
+
+  private static class EventConsumerCounter implements Consumer<Event> {
+    List<Event> consumedMessages = new ArrayList<>();
 
     @Override
-    public void accept(EventMessage eventMessage) {
+    public void accept(Event eventMessage) {
       consumedMessages.add(eventMessage);
     }
 
-    public List<EventMessage> getConsumedMessages() {
+    public List<Event> getConsumedMessages() {
       return consumedMessages;
     }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
index 4d6c246..d488ab8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
@@ -18,11 +18,11 @@
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.gerritforge.gerrit.eventbroker.EventDeserializer;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
@@ -50,8 +50,8 @@
   private Gson gson = new EventGsonProvider().get();
   private EventDeserializer eventDeserializer = new EventDeserializer(gson);
 
-  @Mock Consumer<EventMessage> succeedingConsumer;
-  @Captor ArgumentCaptor<EventMessage> eventMessageCaptor;
+  @Mock Consumer<Event> succeedingConsumer;
+  @Captor ArgumentCaptor<Event> eventMessageCaptor;
   @Mock OneOffRequestContext oneOffCtx;
   @Mock ManualRequestContext requestContext;
 
@@ -64,14 +64,13 @@
   @Test
   public void shouldSkipEventWithoutSourceInstanceId() {
     Event event = new ProjectCreatedEvent();
-    EventMessage messageWithoutSourceInstanceId =
-        new EventMessage(new EventMessage.Header(UUID.randomUUID(), (String) null), event);
+    event.instanceId = UUID.randomUUID().toString();
 
-    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(messageWithoutSourceInstanceId));
+    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
 
     objectUnderTest.processRecords(kinesisInput);
 
-    verify(succeedingConsumer, never()).accept(messageWithoutSourceInstanceId);
+    verify(succeedingConsumer, never()).accept(event);
   }
 
   @Test
@@ -86,19 +85,19 @@
 
     verify(succeedingConsumer, only()).accept(eventMessageCaptor.capture());
 
-    EventMessage result = eventMessageCaptor.getValue();
-    assertThat(result.getHeader().sourceInstanceId).isEqualTo(instanceId);
+    Event result = eventMessageCaptor.getValue();
+    assertThat(result.instanceId).isEqualTo(instanceId);
   }
 
   @Test
-  public void shouldSkipEventObjectWithoutInstanceId() {
+  public void shouldProcessEventObjectWithoutInstanceId() {
     Event event = new ProjectCreatedEvent();
     event.instanceId = null;
 
     ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
     objectUnderTest.processRecords(kinesisInput);
 
-    verify(succeedingConsumer, never()).accept(any());
+    verify(succeedingConsumer, times(1)).accept(any());
   }
 
   @Test
@@ -143,7 +142,7 @@
     ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
     objectUnderTest.processRecords(kinesisInput);
 
-    verify(succeedingConsumer, only()).accept(any(EventMessage.class));
+    verify(succeedingConsumer, only()).accept(any(Event.class));
   }
 
   private ProcessRecordsInput sampleMessage(String message) {