Merge branch 'stable-3.3' into stable-3.4

* stable-3.3:
  Deserialize Event and EventMessage

Change-Id: I80ad1d967b164e2284cd2bcbc63190630fbbd4e2
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
index bab2ad0..4c57a54 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
@@ -14,11 +14,16 @@
 
 package com.googlesource.gerrit.plugins.kafka.subscribe;
 
+import static java.util.Objects.requireNonNull;
+
 import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
+import com.google.gerrit.server.events.Event;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
@@ -42,13 +47,23 @@
 
   @Override
   public EventMessage deserialize(String topic, byte[] data) {
-    final EventMessage result =
-        gson.fromJson(stringDeserializer.deserialize(topic, data), EventMessage.class);
+    String json = stringDeserializer.deserialize(topic, data);
+    EventMessage result = gson.fromJson(json, EventMessage.class);
+    if (result.getEvent() == null && result.getHeader() == null) {
+      Event event = deserialiseEvent(json);
+      result = new EventMessage(new Header(UUID.randomUUID(), event.instanceId), event);
+    }
     result.validate();
-
     return result;
   }
 
+  private Event deserialiseEvent(String json) {
+    Event event = gson.fromJson(json, Event.class);
+    requireNonNull(event.type, "Event type cannot be null");
+    requireNonNull(event.instanceId, "Event instance id cannot be null");
+    return event;
+  }
+
   @Override
   public void close() {}
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
index 8ea5f3f..f5b6861 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
@@ -34,7 +34,7 @@
   }
 
   @Test
-  public void kafkaEventDeserializerShouldParseAKafkaEvent() {
+  public void kafkaEventDeserializerShouldParseAKafkaEventMessage() {
     final UUID eventId = UUID.randomUUID();
     final String eventType = "event-type";
     final String sourceInstanceId = UUID.randomUUID().toString();
@@ -52,6 +52,23 @@
     assertThat(event.getHeader().sourceInstanceId).isEqualTo(sourceInstanceId);
   }
 
+  @Test
+  public void kafkaEventDeserializerShouldParseKafkaEvent() {
+    final String eventJson = "{ \"type\": \"project-created\", \"instanceId\":\"instance-id\" }";
+    final EventMessage event = deserializer.deserialize("ignored", eventJson.getBytes(UTF_8));
+
+    assertThat(event.getHeader().sourceInstanceId).isEqualTo("instance-id");
+  }
+
+  @Test
+  public void kafkaEventDeserializerShouldParseKafkaEventWithHeaderAndBodyProjectName() {
+    final String eventJson =
+        "{\"projectName\":\"header_body_parser_project\",\"type\":\"project-created\", \"instanceId\":\"instance-id\"}";
+    final EventMessage event = deserializer.deserialize("ignored", eventJson.getBytes(UTF_8));
+
+    assertThat(event.getHeader().sourceInstanceId).isEqualTo("instance-id");
+  }
+
   @Test(expected = RuntimeException.class)
   public void kafkaEventDeserializerShouldFailForInvalidJson() {
     deserializer.deserialize("ignored", "this is not a JSON string".getBytes(UTF_8));