Send/receive Event object instead of EventMessage

Event object contains instance id populated by Gerrit Core. Workaround
with EventMessage using sourceInstanceId field to recognise the event
source node is not needed anymore. Use Event object instead of
EventMessage.

Bug: Issue 14390
Change-Id: Ia7a6bc2f3cae58cdd56c92785a185616c4b0ebf2
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 0ed499d..c5b11fe 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -45,8 +45,8 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.4.0.2",
-        sha1 = "8a56300ce92c3e25b4669a0511b4c520b34851b2",
+        artifact = "com.gerritforge:events-broker:3.4.0.4",
+        sha1 = "8d361d863382290e33828116e65698190118d0f1",
     )
 
     maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
index 7d07a5f..17233b0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
@@ -15,10 +15,10 @@
 package com.googlesource.gerrit.plugins.pubsub;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
 import com.google.common.flogger.FluentLogger;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import java.util.Collections;
 import java.util.Map;
@@ -43,12 +43,12 @@
   }
 
   @Override
-  public ListenableFuture<Boolean> send(String topic, EventMessage message) {
+  public ListenableFuture<Boolean> send(String topic, Event message) {
     return publishers.computeIfAbsent(topic, t -> publisherFactory.create(t)).publish(message);
   }
 
   @Override
-  public void receiveAsync(String topic, Consumer<EventMessage> eventConsumer) {
+  public void receiveAsync(String topic, Consumer<Event> eventConsumer) {
     PubSubEventSubscriber subscriber = subscriberFactory.create(topic, eventConsumer);
     subscribers.add(subscriber);
     subscriber.subscribe();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
index faeb218..fb3daaf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
@@ -15,12 +15,12 @@
 package com.googlesource.gerrit.plugins.pubsub;
 
 import com.gerritforge.gerrit.eventbroker.EventDeserializer;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.cloud.pubsub.v1.AckReplyConsumer;
 import com.google.cloud.pubsub.v1.MessageReceiver;
 import com.google.cloud.pubsub.v1.Subscriber;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.google.pubsub.v1.PubsubMessage;
@@ -32,7 +32,7 @@
 public class PubSubEventSubscriber {
 
   public interface Factory {
-    public PubSubEventSubscriber create(String topic, Consumer<EventMessage> messageProcessor);
+    public PubSubEventSubscriber create(String topic, Consumer<Event> messageProcessor);
   }
 
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -40,7 +40,7 @@
   private final EventDeserializer eventsDeserializer;
   private final PubSubSubscriberMetrics subscriberMetrics;
   private final String topic;
-  private final Consumer<EventMessage> messageProcessor;
+  private final Consumer<Event> messageProcessor;
   private final SubscriberProvider subscriberProvider;
   private final PubSubConfiguration config;
   private Subscriber subscriber;
@@ -52,7 +52,7 @@
       PubSubConfiguration config,
       PubSubSubscriberMetrics subscriberMetrics,
       @Assisted String topic,
-      @Assisted Consumer<EventMessage> messageProcessor) {
+      @Assisted Consumer<Event> messageProcessor) {
     this.eventsDeserializer = eventsDeserializer;
     this.subscriberMetrics = subscriberMetrics;
     this.topic = topic;
@@ -78,7 +78,7 @@
     return topic;
   }
 
-  public Consumer<EventMessage> getMessageProcessor() {
+  public Consumer<Event> getMessageProcessor() {
     return messageProcessor;
   }
 
@@ -100,7 +100,7 @@
   MessageReceiver getMessageReceiver() {
     return (PubsubMessage message, AckReplyConsumer consumer) -> {
       try {
-        EventMessage event = eventsDeserializer.deserialize(message.getData().toStringUtf8());
+        Event event = eventsDeserializer.deserialize(message.getData().toStringUtf8());
         messageProcessor.accept(event);
         subscriberMetrics.incrementSucceedToConsumeMessage();
       } catch (Exception e) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
index 4506217..d202f25 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
@@ -19,7 +19,6 @@
 import static java.util.stream.Collectors.groupingBy;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.api.gax.core.NoCredentialsProvider;
 import com.google.api.gax.grpc.GrpcTransportChannel;
 import com.google.api.gax.rpc.FixedTransportChannelProvider;
@@ -55,7 +54,6 @@
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.function.Consumer;
 import org.junit.Test;
 import org.testcontainers.containers.PubSubEmulatorContainer;
@@ -72,6 +70,7 @@
 
   private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5);
   private static final String PRIVATE_KEY_LOCATION = "not used in test";
+  private static final String DEFAULT_INSTANCE_ID = "instance-id";
 
   @Inject @EventGson private Gson gson;
 
@@ -116,12 +115,11 @@
       value = PRIVATE_KEY_LOCATION)
   public void shouldSendEvent() throws IOException {
     createSubscription(SUBSCRIPTION_ID, TOPIC_ID, channelProvider, credentialsProvider);
-    UUID id = UUID.randomUUID();
     Event event = new ProjectCreatedEvent();
-    EventMessage eventMessage = new EventMessage(new EventMessage.Header(id, id), event);
-    String expectedMessageJson = gson.toJson(eventMessage);
+    event.instanceId = DEFAULT_INSTANCE_ID;
+    String expectedMessageJson = gson.toJson(event);
 
-    objectUnderTest.send(TOPIC_ID, eventMessage);
+    objectUnderTest.send(TOPIC_ID, event);
 
     readMessageAndValidate(
         (pullResponse) -> {
@@ -168,15 +166,14 @@
       name = "plugin.events-gcloud-pubsub.privateKeyLocation",
       value = PRIVATE_KEY_LOCATION)
   public void shouldConsumeEvent() throws InterruptedException {
-    UUID id = UUID.randomUUID();
     Event event = new ProjectCreatedEvent();
-    EventMessage eventMessage = new EventMessage(new EventMessage.Header(id, id), event);
-    String expectedMessageJson = gson.toJson(eventMessage);
+    event.instanceId = DEFAULT_INSTANCE_ID;
+    String expectedMessageJson = gson.toJson(event);
     TestConsumer consumer = new TestConsumer();
 
     objectUnderTest.receiveAsync(TOPIC_ID, consumer);
 
-    objectUnderTest.send(TOPIC_ID, eventMessage);
+    objectUnderTest.send(TOPIC_ID, event);
 
     WaitUtil.waitUntil(
         () ->
@@ -248,15 +245,15 @@
         .getName();
   }
 
-  private class TestConsumer implements Consumer<EventMessage> {
-    private EventMessage msg;
+  private class TestConsumer implements Consumer<Event> {
+    private Event msg;
 
     @Override
-    public void accept(EventMessage msg) {
+    public void accept(Event msg) {
       this.msg = msg;
     }
 
-    public EventMessage getMessage() {
+    public Event getMessage() {
       return msg;
     }
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
index fa85120..c144d3b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
@@ -21,7 +21,6 @@
 import static org.mockito.Mockito.verify;
 
 import com.gerritforge.gerrit.eventbroker.EventDeserializer;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.cloud.pubsub.v1.AckReplyConsumer;
 import com.google.cloud.pubsub.v1.MessageReceiver;
 import com.google.gerrit.server.events.Event;
@@ -30,7 +29,6 @@
 import com.google.gson.Gson;
 import com.google.protobuf.ByteString;
 import com.google.pubsub.v1.PubsubMessage;
-import java.util.UUID;
 import java.util.function.Consumer;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -46,19 +44,17 @@
   @Mock SubscriberProvider subscriberProviderMock;
   @Mock PubSubSubscriberMetrics pubSubSubscriberMetricsMock;
   @Mock AckReplyConsumer ackReplyConsumerMock;
-  @Mock Consumer<EventMessage> succeedingConsumer;
-  @Captor ArgumentCaptor<EventMessage> eventMessageCaptor;
+  @Mock Consumer<Event> succeedingConsumer;
+  @Captor ArgumentCaptor<Event> eventMessageCaptor;
 
   private static final String TOPIC = "foo";
-  private static final EventMessage eventMessage =
-      new EventMessage(
-          new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()), new ProjectCreatedEvent());
+
   private Gson gson = new EventGsonProvider().get();
   private EventDeserializer deserializer = new EventDeserializer(gson);
 
   @Test
   public void shouldIncrementFailedToConsumeMessageWhenReceivingFails() {
-    Consumer<EventMessage> failingConsumer =
+    Consumer<Event> failingConsumer =
         (message) -> {
           throw new RuntimeException("Error receiving message");
         };
@@ -71,7 +67,10 @@
 
   @Test
   public void shouldIncrementSucceedToConsumeMessageWhenReceivingSucceeds() {
-    PubsubMessage pubsubMessage = sampleMessage();
+    String instanceId = "instance-id";
+    Event eventMessage = new ProjectCreatedEvent();
+    eventMessage.instanceId = instanceId;
+    PubsubMessage pubsubMessage = sampleMessage(eventMessage);
 
     messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock);
 
@@ -80,28 +79,25 @@
 
   @Test
   public void shouldSkipEventWithoutSourceInstanceId() {
-    Event event = new ProjectCreatedEvent();
-    EventMessage messageWithoutSourceInstanceId =
-        new EventMessage(new EventMessage.Header(UUID.randomUUID(), (String) null), event);
-    PubsubMessage pubsubMessage = sampleMessage(messageWithoutSourceInstanceId);
+    Event eventWithoutSourceInstanceId = new ProjectCreatedEvent();
+    PubsubMessage pubsubMessage = sampleMessage(eventWithoutSourceInstanceId);
 
     messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock);
 
-    verify(succeedingConsumer, never()).accept(messageWithoutSourceInstanceId);
+    verify(succeedingConsumer, never()).accept(eventWithoutSourceInstanceId);
   }
 
   @Test
   public void shouldParseEventObject() {
     String instanceId = "instance-id";
-
     Event event = new ProjectCreatedEvent();
     event.instanceId = instanceId;
     PubsubMessage pubsubMessage = sampleMessage(event);
     messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock);
 
     verify(succeedingConsumer, only()).accept(eventMessageCaptor.capture());
-    EventMessage result = eventMessageCaptor.getValue();
-    assertThat(result.getHeader().sourceInstanceId).isEqualTo(instanceId);
+    Event result = eventMessageCaptor.getValue();
+    assertThat(result.instanceId).isEqualTo(instanceId);
   }
 
   @Test
@@ -112,7 +108,7 @@
     PubsubMessage pubsubMessage = sampleMessage(event);
     messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock);
 
-    verify(succeedingConsumer, only()).accept(any(EventMessage.class));
+    verify(succeedingConsumer, only()).accept(any(Event.class));
   }
 
   private PubsubMessage sampleMessage(Event event) {
@@ -121,17 +117,7 @@
     return PubsubMessage.newBuilder().setData(data).build();
   }
 
-  private PubsubMessage sampleMessage(EventMessage message) {
-    String eventPayload = gson.toJson(message);
-    ByteString data = ByteString.copyFromUtf8(eventPayload);
-    return PubsubMessage.newBuilder().setData(data).build();
-  }
-
-  private PubsubMessage sampleMessage() {
-    return sampleMessage(eventMessage);
-  }
-
-  private MessageReceiver messageReceiver(Consumer<EventMessage> consumer) {
+  private MessageReceiver messageReceiver(Consumer<Event> consumer) {
     return new PubSubEventSubscriber(
             deserializer,
             subscriberProviderMock,