Deserialize Event and EventMessage

From Gerrit v3.2 event contains instance id field. In version 3.4
EventMessage envelope will be replaced with Event. To allow rolling
upgrade compatibility between v3.3 and v3.4 need to be assured. To do
that KafkaEventDeserializer must be able to handle both message types.

Bug: Issue 14390
Change-Id: I270f8fdf1c1bcbd537d03c19087113b483f3b6f4
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 61d20ad..bd4656f 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -15,6 +15,6 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.3.0-rc7",
-        sha1 = "5efe1c4a0f7c385b0ec95b8f9897248049c4173c",
+        artifact = "com.gerritforge:events-broker:3.3.2",
+        sha1 = "d8bcb77047cc12dd7c623b5b4de70a25499d3d6c",
     )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
index bab2ad0..4c57a54 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
@@ -14,11 +14,16 @@
 
 package com.googlesource.gerrit.plugins.kafka.subscribe;
 
+import static java.util.Objects.requireNonNull;
+
 import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
+import com.google.gerrit.server.events.Event;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
@@ -42,13 +47,23 @@
 
   @Override
   public EventMessage deserialize(String topic, byte[] data) {
-    final EventMessage result =
-        gson.fromJson(stringDeserializer.deserialize(topic, data), EventMessage.class);
+    String json = stringDeserializer.deserialize(topic, data);
+    EventMessage result = gson.fromJson(json, EventMessage.class);
+    if (result.getEvent() == null && result.getHeader() == null) {
+      Event event = deserialiseEvent(json);
+      result = new EventMessage(new Header(UUID.randomUUID(), event.instanceId), event);
+    }
     result.validate();
-
     return result;
   }
 
+  private Event deserialiseEvent(String json) {
+    Event event = gson.fromJson(json, Event.class);
+    requireNonNull(event.type, "Event type cannot be null");
+    requireNonNull(event.instanceId, "Event instance id cannot be null");
+    return event;
+  }
+
   @Override
   public void close() {}
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
index e456a2a..4074919 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
@@ -34,10 +34,10 @@
   }
 
   @Test
-  public void kafkaEventDeserializerShouldParseAKafkaEvent() {
+  public void kafkaEventDeserializerShouldParseAKafkaEventMessage() {
     final UUID eventId = UUID.randomUUID();
     final String eventType = "event-type";
-    final UUID sourceInstanceId = UUID.randomUUID();
+    final String sourceInstanceId = UUID.randomUUID().toString();
     final long eventCreatedOn = 10L;
     final String eventJson =
         String.format(
@@ -52,6 +52,23 @@
     assertThat(event.getHeader().sourceInstanceId).isEqualTo(sourceInstanceId);
   }
 
+  @Test
+  public void kafkaEventDeserializerShouldParseKafkaEvent() {
+    final String eventJson = "{ \"type\": \"project-created\", \"instanceId\":\"instance-id\" }";
+    final EventMessage event = deserializer.deserialize("ignored", eventJson.getBytes(UTF_8));
+
+    assertThat(event.getHeader().sourceInstanceId).isEqualTo("instance-id");
+  }
+
+  @Test
+  public void kafkaEventDeserializerShouldParseKafkaEventWithHeaderAndBodyProjectName() {
+    final String eventJson =
+        "{\"projectName\":\"header_body_parser_project\",\"type\":\"project-created\", \"instanceId\":\"instance-id\"}";
+    final EventMessage event = deserializer.deserialize("ignored", eventJson.getBytes(UTF_8));
+
+    assertThat(event.getHeader().sourceInstanceId).isEqualTo("instance-id");
+  }
+
   @Test(expected = RuntimeException.class)
   public void kafkaEventDeserializerShouldFailForInvalidJson() {
     deserializer.deserialize("ignored", "this is not a JSON string".getBytes(UTF_8));