Merge branch 'stable-3.4'

* stable-3.4:
  Send/receive Event object instead of EventMessage
  Use EventGsonProvider binding from Gerrit core
  Use event deserialization logic from events broker
  Implement async send method as per 3.4.0-rc2 API
  Deserialize Event and EventMessage
  Use EventGsonProvider from Gerrit core
  Fix properties in EventConsumerIT tests

Change-Id: Ia534d42980f0949b3954e0eca23b8a74cc3be3ea
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index e059b34..e444876 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -15,6 +15,6 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.4.0-rc0",
-        sha1 = "8c34c88103d4783eb4c4decde6d93541bc1cf064",
+        artifact = "com.gerritforge:events-broker:3.4.0.4",
+        sha1 = "8d361d863382290e33828116e65698190118d0f1",
     )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index 1be52cb..2768dd3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -14,14 +14,11 @@
 
 package com.googlesource.gerrit.plugins.kafka;
 
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventListener;
-import com.google.gson.Gson;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
-import com.google.inject.Singleton;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
@@ -39,7 +36,6 @@
 
   @Override
   protected void configure() {
-    bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class);
     DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
     DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
index 73c7509..a128af8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
@@ -15,11 +15,11 @@
 package com.googlesource.gerrit.plugins.kafka.api;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
 import com.google.common.collect.Sets;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
 import com.google.inject.Scopes;
@@ -61,7 +61,7 @@
             workQueue.createQueue(configuration.getNumberOfSubscribers(), "kafka-subscriber"));
 
     bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
-    bind(new TypeLiteral<Deserializer<EventMessage>>() {}).to(KafkaEventDeserializer.class);
+    bind(new TypeLiteral<Deserializer<Event>>() {}).to(KafkaEventDeserializer.class);
     bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
 
     DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
index 9a7c66a..3ec21e0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -15,8 +15,9 @@
 package com.googlesource.gerrit.plugins.kafka.api;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
@@ -42,12 +43,12 @@
   }
 
   @Override
-  public boolean send(String topic, EventMessage event) {
+  public ListenableFuture<Boolean> send(String topic, Event event) {
     return publisher.publish(topic, event);
   }
 
   @Override
-  public void receiveAsync(String topic, Consumer<EventMessage> eventConsumer) {
+  public void receiveAsync(String topic, Consumer<Event> eventConsumer) {
     KafkaEventSubscriber subscriber = subscriberProvider.get();
     synchronized (subscribers) {
       subscribers.add(subscriber);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/GsonProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/GsonProvider.java
deleted file mode 100644
index 2c5c1e7..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/GsonProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.kafka.publish;
-
-import com.google.common.base.Supplier;
-import com.google.gerrit.server.events.SupplierSerializer;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.inject.Provider;
-
-public class GsonProvider implements Provider<Gson> {
-
-  @Override
-  public Gson get() {
-    return new GsonBuilder().registerTypeAdapter(Supplier.class, new SupplierSerializer()).create();
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
index cc271b5..e7670cb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
@@ -14,9 +14,10 @@
 
 package com.googlesource.gerrit.plugins.kafka.publish;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGson;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
@@ -31,7 +32,7 @@
   private final Gson gson;
 
   @Inject
-  public KafkaPublisher(KafkaSession kafkaSession, Gson gson) {
+  public KafkaPublisher(KafkaSession kafkaSession, @EventGson Gson gson) {
     this.session = kafkaSession;
     this.gson = gson;
   }
@@ -53,11 +54,11 @@
     }
   }
 
-  public boolean publish(String topic, EventMessage event) {
+  public ListenableFuture<Boolean> publish(String topic, Event event) {
     return session.publish(topic, getPayload(event));
   }
 
-  private String getPayload(EventMessage event) {
+  private String getPayload(Event event) {
     return gson.toJson(event);
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index bb79cb5..0dc29e1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -14,10 +14,16 @@
 
 package com.googlesource.gerrit.plugins.kafka.session;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaEventsPublisherMetrics;
+import java.util.Objects;
 import java.util.concurrent.Future;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -80,34 +86,35 @@
     producer = null;
   }
 
-  public void publish(String messageBody) {
-    publish(properties.getTopic(), messageBody);
+  public ListenableFuture<Boolean> publish(String messageBody) {
+    return publish(properties.getTopic(), messageBody);
   }
 
-  public boolean publish(String topic, String messageBody) {
+  public ListenableFuture<Boolean> publish(String topic, String messageBody) {
     if (properties.isSendAsync()) {
       return publishAsync(topic, messageBody);
     }
     return publishSync(topic, messageBody);
   }
 
-  private boolean publishSync(String topic, String messageBody) {
-
+  private ListenableFuture<Boolean> publishSync(String topic, String messageBody) {
+    SettableFuture<Boolean> resultF = SettableFuture.create();
     try {
       Future<RecordMetadata> future =
           producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody));
       RecordMetadata metadata = future.get();
       LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
       publisherMetrics.incrementBrokerPublishedMessage();
-      return true;
+      resultF.set(true);
+      return resultF;
     } catch (Throwable e) {
       LOGGER.error("Cannot send the message", e);
       publisherMetrics.incrementBrokerFailedToPublishMessage();
-      return false;
+      return Futures.immediateFailedFuture(e);
     }
   }
 
-  private boolean publishAsync(String topic, String messageBody) {
+  private ListenableFuture<Boolean> publishAsync(String topic, String messageBody) {
     try {
       Future<RecordMetadata> future =
           producer.send(
@@ -121,11 +128,16 @@
                   publisherMetrics.incrementBrokerFailedToPublishMessage();
                 }
               });
-      return future != null;
+
+      // The transformation is lightweight, so we can afford using a directExecutor
+      return Futures.transform(
+          JdkFutureAdapters.listenInPoolThread(future),
+          Objects::nonNull,
+          MoreExecutors.directExecutor());
     } catch (Throwable e) {
       LOGGER.error("Cannot send the message", e);
       publisherMetrics.incrementBrokerFailedToPublishMessage();
-      return false;
+      return Futures.immediateFailedFuture(e);
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
index bab2ad0..cad2f37 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
@@ -14,8 +14,8 @@
 
 package com.googlesource.gerrit.plugins.kafka.subscribe;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.google.gson.Gson;
+import com.gerritforge.gerrit.eventbroker.EventDeserializer;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.util.Map;
@@ -23,30 +23,27 @@
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 @Singleton
-public class KafkaEventDeserializer implements Deserializer<EventMessage> {
+public class KafkaEventDeserializer implements Deserializer<Event> {
 
   private final StringDeserializer stringDeserializer = new StringDeserializer();
-  private Gson gson;
+  private EventDeserializer eventDeserializer;
 
   // To be used when providing this deserializer with class name (then need to add a configuration
   // entry to set the gson.provider
   public KafkaEventDeserializer() {}
 
   @Inject
-  public KafkaEventDeserializer(Gson gson) {
-    this.gson = gson;
+  public KafkaEventDeserializer(EventDeserializer eventDeserializer) {
+    this.eventDeserializer = eventDeserializer;
   }
 
   @Override
   public void configure(Map<String, ?> configs, boolean isKey) {}
 
   @Override
-  public EventMessage deserialize(String topic, byte[] data) {
-    final EventMessage result =
-        gson.fromJson(stringDeserializer.deserialize(topic, data), EventMessage.class);
-    result.validate();
-
-    return result;
+  public Event deserialize(String topic, byte[] data) {
+    String json = stringDeserializer.deserialize(topic, data);
+    return eventDeserializer.deserialize(json);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
index 7ef9d7b..90b82aa 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -15,8 +15,8 @@
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.util.ManualRequestContext;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.inject.Inject;
@@ -39,14 +39,14 @@
   private final OneOffRequestContext oneOffCtx;
   private final AtomicBoolean closed = new AtomicBoolean(false);
 
-  private final Deserializer<EventMessage> valueDeserializer;
+  private final Deserializer<Event> valueDeserializer;
   private final KafkaSubscriberProperties configuration;
   private final ExecutorService executor;
   private final KafkaEventSubscriberMetrics subscriberMetrics;
   private final KafkaConsumerFactory consumerFactory;
   private final Deserializer<byte[]> keyDeserializer;
 
-  private java.util.function.Consumer<EventMessage> messageProcessor;
+  private java.util.function.Consumer<Event> messageProcessor;
   private String topic;
   private AtomicBoolean resetOffset = new AtomicBoolean(false);
 
@@ -57,7 +57,7 @@
       KafkaSubscriberProperties configuration,
       KafkaConsumerFactory consumerFactory,
       Deserializer<byte[]> keyDeserializer,
-      Deserializer<EventMessage> valueDeserializer,
+      Deserializer<Event> valueDeserializer,
       OneOffRequestContext oneOffCtx,
       @ConsumerExecutor ExecutorService executor,
       KafkaEventSubscriberMetrics subscriberMetrics) {
@@ -71,7 +71,7 @@
     this.valueDeserializer = valueDeserializer;
   }
 
-  public void subscribe(String topic, java.util.function.Consumer<EventMessage> messageProcessor) {
+  public void subscribe(String topic, java.util.function.Consumer<Event> messageProcessor) {
     this.topic = topic;
     this.messageProcessor = messageProcessor;
     logger.atInfo().log(
@@ -97,7 +97,7 @@
     receiver.wakeup();
   }
 
-  public java.util.function.Consumer<EventMessage> getMessageProcessor() {
+  public java.util.function.Consumer<Event> getMessageProcessor() {
     return messageProcessor;
   }
 
@@ -146,7 +146,7 @@
           consumerRecords.forEach(
               consumerRecord -> {
                 try (ManualRequestContext ctx = oneOffCtx.open()) {
-                  EventMessage event =
+                  Event event =
                       valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
                   messageProcessor.accept(event);
                 } catch (Exception e) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 4e27585..bd47223 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -19,8 +19,6 @@
 import static org.junit.Assert.fail;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterables;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
@@ -33,6 +31,7 @@
 import com.google.gerrit.extensions.common.ChangeMessageInfo;
 import com.google.gerrit.server.events.CommentAddedEvent;
 import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
 import com.google.gson.Gson;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
@@ -40,7 +39,6 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.UUID;
 import java.util.function.Supplier;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -131,24 +129,22 @@
 
   @Test
   @UseLocalDisk
-  @GerritConfig(name = "plugin.kafka-events.groupId", value = "test-consumer-group")
+  @GerritConfig(name = "plugin.events-kafka.groupId", value = "test-consumer-group")
   @GerritConfig(
-      name = "plugin.kafka-events.keyDeserializer",
+      name = "plugin.events-kafka.keyDeserializer",
       value = "org.apache.kafka.common.serialization.StringDeserializer")
   @GerritConfig(
-      name = "plugin.kafka-events.valueDeserializer",
+      name = "plugin.events-kafka.valueDeserializer",
       value = "org.apache.kafka.common.serialization.StringDeserializer")
-  @GerritConfig(name = "plugin.kafka-events.pollingIntervalMs", value = "500")
+  @GerritConfig(name = "plugin.events-kafka.pollingIntervalMs", value = "500")
   public void shouldReplayAllEvents() throws InterruptedException {
     String topic = "a_topic";
-    EventMessage eventMessage =
-        new EventMessage(
-            new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()),
-            new ProjectCreatedEvent());
+    Event eventMessage = new ProjectCreatedEvent();
+    eventMessage.instanceId = "test-instance-id";
 
     Duration WAIT_FOR_POLL_TIMEOUT = Duration.ofMillis(1000);
 
-    List<EventMessage> receivedEvents = new ArrayList<>();
+    List<Event> receivedEvents = new ArrayList<>();
 
     BrokerApi kafkaBrokerApi = kafkaBrokerApi();
     kafkaBrokerApi.send(topic, eventMessage);
@@ -157,14 +153,12 @@
 
     waitUntil(() -> receivedEvents.size() == 1, WAIT_FOR_POLL_TIMEOUT);
 
-    assertThat(receivedEvents.get(0).getHeader().eventId)
-        .isEqualTo(eventMessage.getHeader().eventId);
+    assertThat(receivedEvents.get(0).instanceId).isEqualTo(eventMessage.instanceId);
 
     kafkaBrokerApi.replayAllEvents(topic);
     waitUntil(() -> receivedEvents.size() == 2, WAIT_FOR_POLL_TIMEOUT);
 
-    assertThat(receivedEvents.get(1).getHeader().eventId)
-        .isEqualTo(eventMessage.getHeader().eventId);
+    assertThat(receivedEvents.get(1).instanceId).isEqualTo(eventMessage.instanceId);
   }
 
   private BrokerApi kafkaBrokerApi() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 48350f9..d8df198 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -17,10 +17,10 @@
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.Mockito.mock;
 
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
 import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGson;
+import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.util.IdGenerator;
@@ -39,7 +39,6 @@
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -62,7 +61,7 @@
   private static final String TEST_GROUP_ID = KafkaBrokerApiTest.class.getName();
   private static final int TEST_POLLING_INTERVAL_MSEC = 100;
   private static final int TEST_THREAD_POOL_SIZE = 10;
-  private static final UUID TEST_INSTANCE_ID = UUID.randomUUID();
+  private static final String TEST_INSTANCE_ID = "test-instance-id";
   private static final TimeUnit TEST_TIMOUT_UNIT = TimeUnit.SECONDS;
   private static final int TEST_TIMEOUT = 30;
 
@@ -87,7 +86,10 @@
 
     @Override
     protected void configure() {
-      bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class);
+      bind(Gson.class)
+          .annotatedWith(EventGson.class)
+          .toProvider(EventGsonProvider.class)
+          .in(Singleton.class);
       bind(MetricMaker.class).toInstance(mock(MetricMaker.class, Answers.RETURNS_DEEP_STUBS));
       bind(OneOffRequestContext.class)
           .toInstance(mock(OneOffRequestContext.class, Answers.RETURNS_DEEP_STUBS));
@@ -105,8 +107,8 @@
     }
   }
 
-  public static class TestConsumer implements Consumer<EventMessage> {
-    public final List<EventMessage> messages = new ArrayList<>();
+  public static class TestConsumer implements Consumer<Event> {
+    public final List<Event> messages = new ArrayList<>();
     private final CountDownLatch lock;
 
     public TestConsumer(int numMessagesExpected) {
@@ -114,7 +116,7 @@
     }
 
     @Override
-    public void accept(EventMessage message) {
+    public void accept(Event message) {
       messages.add(message);
       lock.countDown();
     }
@@ -128,13 +130,6 @@
     }
   }
 
-  public static class TestHeader extends Header {
-
-    public TestHeader() {
-      super(UUID.randomUUID(), TEST_INSTANCE_ID);
-    }
-  }
-
   @BeforeClass
   public static void beforeClass() throws Exception {
     kafka = new KafkaContainer();
@@ -176,7 +171,8 @@
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
     String testTopic = "test_topic_sync";
     TestConsumer testConsumer = new TestConsumer(1);
-    EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent());
+    Event testEventMessage = new ProjectCreatedEvent();
+    testEventMessage.instanceId = TEST_INSTANCE_ID;
 
     kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
     kafkaBrokerApi.send(testTopic, testEventMessage);
@@ -192,7 +188,8 @@
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
     String testTopic = "test_topic_async";
     TestConsumer testConsumer = new TestConsumer(1);
-    EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent());
+    Event testEventMessage = new ProjectCreatedEvent();
+    testEventMessage.instanceId = TEST_INSTANCE_ID;
 
     kafkaBrokerApi.send(testTopic, testEventMessage);
     kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
deleted file mode 100644
index e456a2a..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.kafka.subscribe;
-
-import static com.google.common.truth.Truth.assertThat;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.google.gson.Gson;
-import java.util.UUID;
-import org.junit.Before;
-import org.junit.Test;
-
-public class KafkaEventDeserializerTest {
-  private KafkaEventDeserializer deserializer;
-
-  @Before
-  public void setUp() {
-    final Gson gson = new EventGsonProvider().get();
-    deserializer = new KafkaEventDeserializer(gson);
-  }
-
-  @Test
-  public void kafkaEventDeserializerShouldParseAKafkaEvent() {
-    final UUID eventId = UUID.randomUUID();
-    final String eventType = "event-type";
-    final UUID sourceInstanceId = UUID.randomUUID();
-    final long eventCreatedOn = 10L;
-    final String eventJson =
-        String.format(
-            "{ "
-                + "\"header\": { \"eventId\": \"%s\", \"eventType\": \"%s\", \"sourceInstanceId\": \"%s\", \"eventCreatedOn\": %d },"
-                + "\"body\": { \"type\": \"project-created\" }"
-                + "}",
-            eventId, eventType, sourceInstanceId, eventCreatedOn);
-    final EventMessage event = deserializer.deserialize("ignored", eventJson.getBytes(UTF_8));
-
-    assertThat(event.getHeader().eventId).isEqualTo(eventId);
-    assertThat(event.getHeader().sourceInstanceId).isEqualTo(sourceInstanceId);
-  }
-
-  @Test(expected = RuntimeException.class)
-  public void kafkaEventDeserializerShouldFailForInvalidJson() {
-    deserializer.deserialize("ignored", "this is not a JSON string".getBytes(UTF_8));
-  }
-
-  @Test(expected = RuntimeException.class)
-  public void kafkaEventDeserializerShouldFailForInvalidObjectButValidJSON() {
-    deserializer.deserialize("ignored", "{}".getBytes(UTF_8));
-  }
-}