Send/receive Event object instead of EventMessage

Event object contains instance id populated by Gerrit Core. Workaround
with EventMessage using sourceInstanceId field to recognise the event
source node is not needed anymore. Use Event object instead of
EventMessage.

Bug: Issue 14390
Change-Id: I104e573f4f38651f51b9304ae1d06534419a3ee0
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index bcb1675..e444876 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -15,6 +15,6 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.4.0.2",
-        sha1 = "8a56300ce92c3e25b4669a0511b4c520b34851b2",
+        artifact = "com.gerritforge:events-broker:3.4.0.4",
+        sha1 = "8d361d863382290e33828116e65698190118d0f1",
     )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
index 73c7509..a128af8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
@@ -15,11 +15,11 @@
 package com.googlesource.gerrit.plugins.kafka.api;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
 import com.google.common.collect.Sets;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
 import com.google.inject.Scopes;
@@ -61,7 +61,7 @@
             workQueue.createQueue(configuration.getNumberOfSubscribers(), "kafka-subscriber"));
 
     bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
-    bind(new TypeLiteral<Deserializer<EventMessage>>() {}).to(KafkaEventDeserializer.class);
+    bind(new TypeLiteral<Deserializer<Event>>() {}).to(KafkaEventDeserializer.class);
     bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
 
     DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
index 7eabf2d..3ec21e0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -15,9 +15,9 @@
 package com.googlesource.gerrit.plugins.kafka.api;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
@@ -43,12 +43,12 @@
   }
 
   @Override
-  public ListenableFuture<Boolean> send(String topic, EventMessage event) {
+  public ListenableFuture<Boolean> send(String topic, Event event) {
     return publisher.publish(topic, event);
   }
 
   @Override
-  public void receiveAsync(String topic, Consumer<EventMessage> eventConsumer) {
+  public void receiveAsync(String topic, Consumer<Event> eventConsumer) {
     KafkaEventSubscriber subscriber = subscriberProvider.get();
     synchronized (subscribers) {
       subscribers.add(subscriber);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
index e46a818..e7670cb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
@@ -14,7 +14,6 @@
 
 package com.googlesource.gerrit.plugins.kafka.publish;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.server.events.Event;
@@ -55,11 +54,11 @@
     }
   }
 
-  public ListenableFuture<Boolean> publish(String topic, EventMessage event) {
+  public ListenableFuture<Boolean> publish(String topic, Event event) {
     return session.publish(topic, getPayload(event));
   }
 
-  private String getPayload(EventMessage event) {
+  private String getPayload(Event event) {
     return gson.toJson(event);
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
index 98b1cf2..cad2f37 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.kafka.subscribe;
 
 import com.gerritforge.gerrit.eventbroker.EventDeserializer;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.util.Map;
@@ -23,7 +23,7 @@
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 @Singleton
-public class KafkaEventDeserializer implements Deserializer<EventMessage> {
+public class KafkaEventDeserializer implements Deserializer<Event> {
 
   private final StringDeserializer stringDeserializer = new StringDeserializer();
   private EventDeserializer eventDeserializer;
@@ -41,7 +41,7 @@
   public void configure(Map<String, ?> configs, boolean isKey) {}
 
   @Override
-  public EventMessage deserialize(String topic, byte[] data) {
+  public Event deserialize(String topic, byte[] data) {
     String json = stringDeserializer.deserialize(topic, data);
     return eventDeserializer.deserialize(json);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
index 7ef9d7b..90b82aa 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -15,8 +15,8 @@
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.util.ManualRequestContext;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.inject.Inject;
@@ -39,14 +39,14 @@
   private final OneOffRequestContext oneOffCtx;
   private final AtomicBoolean closed = new AtomicBoolean(false);
 
-  private final Deserializer<EventMessage> valueDeserializer;
+  private final Deserializer<Event> valueDeserializer;
   private final KafkaSubscriberProperties configuration;
   private final ExecutorService executor;
   private final KafkaEventSubscriberMetrics subscriberMetrics;
   private final KafkaConsumerFactory consumerFactory;
   private final Deserializer<byte[]> keyDeserializer;
 
-  private java.util.function.Consumer<EventMessage> messageProcessor;
+  private java.util.function.Consumer<Event> messageProcessor;
   private String topic;
   private AtomicBoolean resetOffset = new AtomicBoolean(false);
 
@@ -57,7 +57,7 @@
       KafkaSubscriberProperties configuration,
       KafkaConsumerFactory consumerFactory,
       Deserializer<byte[]> keyDeserializer,
-      Deserializer<EventMessage> valueDeserializer,
+      Deserializer<Event> valueDeserializer,
       OneOffRequestContext oneOffCtx,
       @ConsumerExecutor ExecutorService executor,
       KafkaEventSubscriberMetrics subscriberMetrics) {
@@ -71,7 +71,7 @@
     this.valueDeserializer = valueDeserializer;
   }
 
-  public void subscribe(String topic, java.util.function.Consumer<EventMessage> messageProcessor) {
+  public void subscribe(String topic, java.util.function.Consumer<Event> messageProcessor) {
     this.topic = topic;
     this.messageProcessor = messageProcessor;
     logger.atInfo().log(
@@ -97,7 +97,7 @@
     receiver.wakeup();
   }
 
-  public java.util.function.Consumer<EventMessage> getMessageProcessor() {
+  public java.util.function.Consumer<Event> getMessageProcessor() {
     return messageProcessor;
   }
 
@@ -146,7 +146,7 @@
           consumerRecords.forEach(
               consumerRecord -> {
                 try (ManualRequestContext ctx = oneOffCtx.open()) {
-                  EventMessage event =
+                  Event event =
                       valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
                   messageProcessor.accept(event);
                 } catch (Exception e) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 7ae9279..bd47223 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -19,7 +19,6 @@
 import static org.junit.Assert.fail;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterables;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
@@ -40,7 +39,6 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.UUID;
 import java.util.function.Supplier;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -141,14 +139,12 @@
   @GerritConfig(name = "plugin.events-kafka.pollingIntervalMs", value = "500")
   public void shouldReplayAllEvents() throws InterruptedException {
     String topic = "a_topic";
-    EventMessage eventMessage =
-        new EventMessage(
-            new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()),
-            new ProjectCreatedEvent());
+    Event eventMessage = new ProjectCreatedEvent();
+    eventMessage.instanceId = "test-instance-id";
 
     Duration WAIT_FOR_POLL_TIMEOUT = Duration.ofMillis(1000);
 
-    List<EventMessage> receivedEvents = new ArrayList<>();
+    List<Event> receivedEvents = new ArrayList<>();
 
     BrokerApi kafkaBrokerApi = kafkaBrokerApi();
     kafkaBrokerApi.send(topic, eventMessage);
@@ -157,14 +153,12 @@
 
     waitUntil(() -> receivedEvents.size() == 1, WAIT_FOR_POLL_TIMEOUT);
 
-    assertThat(receivedEvents.get(0).getHeader().eventId)
-        .isEqualTo(eventMessage.getHeader().eventId);
+    assertThat(receivedEvents.get(0).instanceId).isEqualTo(eventMessage.instanceId);
 
     kafkaBrokerApi.replayAllEvents(topic);
     waitUntil(() -> receivedEvents.size() == 2, WAIT_FOR_POLL_TIMEOUT);
 
-    assertThat(receivedEvents.get(1).getHeader().eventId)
-        .isEqualTo(eventMessage.getHeader().eventId);
+    assertThat(receivedEvents.get(1).instanceId).isEqualTo(eventMessage.instanceId);
   }
 
   private BrokerApi kafkaBrokerApi() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 2a743dc..d8df198 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -17,9 +17,8 @@
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.Mockito.mock;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
 import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGson;
 import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
@@ -40,7 +39,6 @@
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -63,7 +61,7 @@
   private static final String TEST_GROUP_ID = KafkaBrokerApiTest.class.getName();
   private static final int TEST_POLLING_INTERVAL_MSEC = 100;
   private static final int TEST_THREAD_POOL_SIZE = 10;
-  private static final UUID TEST_INSTANCE_ID = UUID.randomUUID();
+  private static final String TEST_INSTANCE_ID = "test-instance-id";
   private static final TimeUnit TEST_TIMOUT_UNIT = TimeUnit.SECONDS;
   private static final int TEST_TIMEOUT = 30;
 
@@ -109,8 +107,8 @@
     }
   }
 
-  public static class TestConsumer implements Consumer<EventMessage> {
-    public final List<EventMessage> messages = new ArrayList<>();
+  public static class TestConsumer implements Consumer<Event> {
+    public final List<Event> messages = new ArrayList<>();
     private final CountDownLatch lock;
 
     public TestConsumer(int numMessagesExpected) {
@@ -118,7 +116,7 @@
     }
 
     @Override
-    public void accept(EventMessage message) {
+    public void accept(Event message) {
       messages.add(message);
       lock.countDown();
     }
@@ -132,13 +130,6 @@
     }
   }
 
-  public static class TestHeader extends Header {
-
-    public TestHeader() {
-      super(UUID.randomUUID(), TEST_INSTANCE_ID);
-    }
-  }
-
   @BeforeClass
   public static void beforeClass() throws Exception {
     kafka = new KafkaContainer();
@@ -180,7 +171,8 @@
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
     String testTopic = "test_topic_sync";
     TestConsumer testConsumer = new TestConsumer(1);
-    EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent());
+    Event testEventMessage = new ProjectCreatedEvent();
+    testEventMessage.instanceId = TEST_INSTANCE_ID;
 
     kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
     kafkaBrokerApi.send(testTopic, testEventMessage);
@@ -196,7 +188,8 @@
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
     String testTopic = "test_topic_async";
     TestConsumer testConsumer = new TestConsumer(1);
-    EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent());
+    Event testEventMessage = new ProjectCreatedEvent();
+    testEventMessage.instanceId = TEST_INSTANCE_ID;
 
     kafkaBrokerApi.send(testTopic, testEventMessage);
     kafkaBrokerApi.receiveAsync(testTopic, testConsumer);