Merge branch 'stable-3.3'

* stable-3.3:
  Deserialize Event and EventMessage

Change-Id: Ife31563339c7506e8bceca6ff9bd7c6436a2d417
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
index f142b73..b92e8db 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
@@ -14,13 +14,18 @@
 
 package com.googlesource.gerrit.plugins.kinesis;
 
+import static java.util.Objects.requireNonNull;
+
 import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.util.ManualRequestContext;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
+import java.util.UUID;
 import java.util.function.Consumer;
 import software.amazon.kinesis.exceptions.InvalidStateException;
 import software.amazon.kinesis.exceptions.ShutdownException;
@@ -71,8 +76,7 @@
                 String jsonMessage = new String(byteRecord);
                 logger.atFiner().log("Kinesis consumed event: '%s'", jsonMessage);
                 try (ManualRequestContext ctx = oneOffCtx.open()) {
-                  EventMessage eventMessage = gson.fromJson(jsonMessage, EventMessage.class);
-                  eventMessage.validate();
+                  EventMessage eventMessage = deserialise(jsonMessage);
                   recordProcessor.accept(eventMessage);
                 } catch (Exception e) {
                   logger.atSevere().withCause(e).log("Could not process event '%s'", jsonMessage);
@@ -83,6 +87,23 @@
     }
   }
 
+  private EventMessage deserialise(String json) {
+    EventMessage result = gson.fromJson(json, EventMessage.class);
+    if (result.getEvent() == null && result.getHeader() == null) {
+      Event event = deserialiseEvent(json);
+      result = new EventMessage(new Header(UUID.randomUUID(), event.instanceId), event);
+    }
+    result.validate();
+    return result;
+  }
+
+  private Event deserialiseEvent(String json) {
+    Event event = gson.fromJson(json, Event.class);
+    requireNonNull(event.type, "Event type cannot be null");
+    requireNonNull(event.instanceId, "Event instance id cannot be null");
+    return event;
+  }
+
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
     logger.atInfo().log("Lost lease, so terminating.");
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
index 5e0ff74..2401034 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
@@ -14,7 +14,10 @@
 
 package com.googlesource.gerrit.plugins.kinesis;
 
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.only;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -31,6 +34,8 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import software.amazon.awssdk.core.SdkBytes;
@@ -44,6 +49,7 @@
   private Gson gson = new EventGsonProvider().get();
 
   @Mock Consumer<EventMessage> succeedingConsumer;
+  @Captor ArgumentCaptor<EventMessage> eventMessageCaptor;
   @Mock OneOffRequestContext oneOffCtx;
   @Mock ManualRequestContext requestContext;
 
@@ -59,17 +65,91 @@
     EventMessage messageWithoutSourceInstanceId =
         new EventMessage(new EventMessage.Header(UUID.randomUUID(), (String) null), event);
 
-    Record kinesisRecord =
-        Record.builder()
-            .data(SdkBytes.fromUtf8String(gson.toJson(messageWithoutSourceInstanceId)))
-            .build();
-    ProcessRecordsInput kinesisInput =
-        ProcessRecordsInput.builder()
-            .records(Collections.singletonList(KinesisClientRecord.fromRecord(kinesisRecord)))
-            .build();
+    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(messageWithoutSourceInstanceId));
 
     objectUnderTest.processRecords(kinesisInput);
 
     verify(succeedingConsumer, never()).accept(messageWithoutSourceInstanceId);
   }
+
+  @Test
+  public void shouldParseEventObject() {
+    String instanceId = "instance-id";
+
+    Event event = new ProjectCreatedEvent();
+    event.instanceId = instanceId;
+
+    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+    objectUnderTest.processRecords(kinesisInput);
+
+    verify(succeedingConsumer, only()).accept(eventMessageCaptor.capture());
+
+    EventMessage result = eventMessageCaptor.getValue();
+    assertThat(result.getHeader().sourceInstanceId).isEqualTo(instanceId);
+  }
+
+  @Test
+  public void shouldSkipEventObjectWithoutInstanceId() {
+    Event event = new ProjectCreatedEvent();
+    event.instanceId = null;
+
+    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+    objectUnderTest.processRecords(kinesisInput);
+
+    verify(succeedingConsumer, never()).accept(any());
+  }
+
+  @Test
+  public void shouldSkipEventObjectWithUnknownType() {
+    String instanceId = "instance-id";
+    Event event = new Event("unknown-type") {};
+    event.instanceId = instanceId;
+
+    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+    objectUnderTest.processRecords(kinesisInput);
+
+    verify(succeedingConsumer, never()).accept(any());
+  }
+
+  @Test
+  public void shouldSkipEventObjectWithoutType() {
+    String instanceId = "instance-id";
+    Event event = new Event(null) {};
+    event.instanceId = instanceId;
+
+    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+    objectUnderTest.processRecords(kinesisInput);
+
+    verify(succeedingConsumer, never()).accept(any());
+  }
+
+  @Test
+  public void shouldSkipEmptyObjectJsonPayload() {
+    String emptyJsonObject = "{}";
+
+    ProcessRecordsInput kinesisInput = sampleMessage(emptyJsonObject);
+    objectUnderTest.processRecords(kinesisInput);
+
+    verify(succeedingConsumer, never()).accept(any());
+  }
+
+  @Test
+  public void shouldParseEventObjectWithHeaderAndBodyProjectName() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = "instance-id";
+    event.projectName = "header_body_parser_project";
+    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+    objectUnderTest.processRecords(kinesisInput);
+
+    verify(succeedingConsumer, only()).accept(any(EventMessage.class));
+  }
+
+  private ProcessRecordsInput sampleMessage(String message) {
+    Record kinesisRecord = Record.builder().data(SdkBytes.fromUtf8String(message)).build();
+    ProcessRecordsInput kinesisInput =
+        ProcessRecordsInput.builder()
+            .records(Collections.singletonList(KinesisClientRecord.fromRecord(kinesisRecord)))
+            .build();
+    return kinesisInput;
+  }
 }