Use event deserialization logic from events broker

To avoid code repetition events deserialization was moved to
events-broker library

Bug: Issue 14593
Change-Id: Ib144252bf473689c4e926c241ff97d483de4b04d
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index bbc3418..52ac706 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -109,8 +109,8 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.4.0",
-        sha1 = "031881f18def90f945b21c7aafda3a1ac95e89c8",
+        artifact = "com.gerritforge:events-broker:3.4.0.1",
+        sha1 = "2d406afa8787621442d855e4b458c97bd24f1198",
     )
 
     maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
index b92e8db..1357e44 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
@@ -14,18 +14,13 @@
 
 package com.googlesource.gerrit.plugins.kinesis;
 
-import static java.util.Objects.requireNonNull;
-
+import com.gerritforge.gerrit.eventbroker.EventDeserializer;
 import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
 import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.util.ManualRequestContext;
 import com.google.gerrit.server.util.OneOffRequestContext;
-import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
-import java.util.UUID;
 import java.util.function.Consumer;
 import software.amazon.kinesis.exceptions.InvalidStateException;
 import software.amazon.kinesis.exceptions.ShutdownException;
@@ -44,14 +39,16 @@
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private final Consumer<EventMessage> recordProcessor;
   private final OneOffRequestContext oneOffCtx;
-  private final Gson gson;
+  private final EventDeserializer eventDeserializer;
 
   @Inject
   KinesisRecordProcessor(
-      @Assisted Consumer<EventMessage> recordProcessor, OneOffRequestContext oneOffCtx, Gson gson) {
+      @Assisted Consumer<EventMessage> recordProcessor,
+      OneOffRequestContext oneOffCtx,
+      EventDeserializer eventDeserializer) {
     this.recordProcessor = recordProcessor;
     this.oneOffCtx = oneOffCtx;
-    this.gson = gson;
+    this.eventDeserializer = eventDeserializer;
   }
 
   @Override
@@ -76,7 +73,7 @@
                 String jsonMessage = new String(byteRecord);
                 logger.atFiner().log("Kinesis consumed event: '%s'", jsonMessage);
                 try (ManualRequestContext ctx = oneOffCtx.open()) {
-                  EventMessage eventMessage = deserialise(jsonMessage);
+                  EventMessage eventMessage = eventDeserializer.deserialize(jsonMessage);
                   recordProcessor.accept(eventMessage);
                 } catch (Exception e) {
                   logger.atSevere().withCause(e).log("Could not process event '%s'", jsonMessage);
@@ -87,23 +84,6 @@
     }
   }
 
-  private EventMessage deserialise(String json) {
-    EventMessage result = gson.fromJson(json, EventMessage.class);
-    if (result.getEvent() == null && result.getHeader() == null) {
-      Event event = deserialiseEvent(json);
-      result = new EventMessage(new Header(UUID.randomUUID(), event.instanceId), event);
-    }
-    result.validate();
-    return result;
-  }
-
-  private Event deserialiseEvent(String json) {
-    Event event = gson.fromJson(json, Event.class);
-    requireNonNull(event.type, "Event type cannot be null");
-    requireNonNull(event.instanceId, "Event instance id cannot be null");
-    return event;
-  }
-
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
     logger.atInfo().log("Lost lease, so terminating.");
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
index 2401034..1c30628 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
@@ -21,6 +21,7 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.gerritforge.gerrit.eventbroker.EventDeserializer;
 import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
 import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.gerrit.server.events.Event;
@@ -47,6 +48,7 @@
 public class KinesisRecordProcessorTest {
   private KinesisRecordProcessor objectUnderTest;
   private Gson gson = new EventGsonProvider().get();
+  private EventDeserializer eventDeserializer = new EventDeserializer(gson);
 
   @Mock Consumer<EventMessage> succeedingConsumer;
   @Captor ArgumentCaptor<EventMessage> eventMessageCaptor;
@@ -56,7 +58,7 @@
   @Before
   public void setup() {
     when(oneOffCtx.open()).thenReturn(requestContext);
-    objectUnderTest = new KinesisRecordProcessor(succeedingConsumer, oneOffCtx, gson);
+    objectUnderTest = new KinesisRecordProcessor(succeedingConsumer, oneOffCtx, eventDeserializer);
   }
 
   @Test