Deserialize Event and EventMessage

From Gerrit v3.2 event contains instance id field. In version 3.4
EventMessage envelope will be replaced with Event. To allow rolling
upgrade compatibility between v3.3 and v3.4 need to be assured. To do
that KinesisRecordProcessor must be able to handle both message types.

Bug: Issue 14390
Change-Id: I0236f67e7dd75dcb18d0473417c59a11a63f2f97
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
index f142b73..b92e8db 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
@@ -14,13 +14,18 @@
 
 package com.googlesource.gerrit.plugins.kinesis;
 
+import static java.util.Objects.requireNonNull;
+
 import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.util.ManualRequestContext;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
+import java.util.UUID;
 import java.util.function.Consumer;
 import software.amazon.kinesis.exceptions.InvalidStateException;
 import software.amazon.kinesis.exceptions.ShutdownException;
@@ -71,8 +76,7 @@
                 String jsonMessage = new String(byteRecord);
                 logger.atFiner().log("Kinesis consumed event: '%s'", jsonMessage);
                 try (ManualRequestContext ctx = oneOffCtx.open()) {
-                  EventMessage eventMessage = gson.fromJson(jsonMessage, EventMessage.class);
-                  eventMessage.validate();
+                  EventMessage eventMessage = deserialise(jsonMessage);
                   recordProcessor.accept(eventMessage);
                 } catch (Exception e) {
                   logger.atSevere().withCause(e).log("Could not process event '%s'", jsonMessage);
@@ -83,6 +87,23 @@
     }
   }
 
+  private EventMessage deserialise(String json) {
+    EventMessage result = gson.fromJson(json, EventMessage.class);
+    if (result.getEvent() == null && result.getHeader() == null) {
+      Event event = deserialiseEvent(json);
+      result = new EventMessage(new Header(UUID.randomUUID(), event.instanceId), event);
+    }
+    result.validate();
+    return result;
+  }
+
+  private Event deserialiseEvent(String json) {
+    Event event = gson.fromJson(json, Event.class);
+    requireNonNull(event.type, "Event type cannot be null");
+    requireNonNull(event.instanceId, "Event instance id cannot be null");
+    return event;
+  }
+
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
     logger.atInfo().log("Lost lease, so terminating.");
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
index 5e0ff74..2401034 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
@@ -14,7 +14,10 @@
 
 package com.googlesource.gerrit.plugins.kinesis;
 
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.only;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -31,6 +34,8 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import software.amazon.awssdk.core.SdkBytes;
@@ -44,6 +49,7 @@
   private Gson gson = new EventGsonProvider().get();
 
   @Mock Consumer<EventMessage> succeedingConsumer;
+  @Captor ArgumentCaptor<EventMessage> eventMessageCaptor;
   @Mock OneOffRequestContext oneOffCtx;
   @Mock ManualRequestContext requestContext;
 
@@ -59,17 +65,91 @@
     EventMessage messageWithoutSourceInstanceId =
         new EventMessage(new EventMessage.Header(UUID.randomUUID(), (String) null), event);
 
-    Record kinesisRecord =
-        Record.builder()
-            .data(SdkBytes.fromUtf8String(gson.toJson(messageWithoutSourceInstanceId)))
-            .build();
-    ProcessRecordsInput kinesisInput =
-        ProcessRecordsInput.builder()
-            .records(Collections.singletonList(KinesisClientRecord.fromRecord(kinesisRecord)))
-            .build();
+    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(messageWithoutSourceInstanceId));
 
     objectUnderTest.processRecords(kinesisInput);
 
     verify(succeedingConsumer, never()).accept(messageWithoutSourceInstanceId);
   }
+
+  @Test
+  public void shouldParseEventObject() {
+    String instanceId = "instance-id";
+
+    Event event = new ProjectCreatedEvent();
+    event.instanceId = instanceId;
+
+    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+    objectUnderTest.processRecords(kinesisInput);
+
+    verify(succeedingConsumer, only()).accept(eventMessageCaptor.capture());
+
+    EventMessage result = eventMessageCaptor.getValue();
+    assertThat(result.getHeader().sourceInstanceId).isEqualTo(instanceId);
+  }
+
+  @Test
+  public void shouldSkipEventObjectWithoutInstanceId() {
+    Event event = new ProjectCreatedEvent();
+    event.instanceId = null;
+
+    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+    objectUnderTest.processRecords(kinesisInput);
+
+    verify(succeedingConsumer, never()).accept(any());
+  }
+
+  @Test
+  public void shouldSkipEventObjectWithUnknownType() {
+    String instanceId = "instance-id";
+    Event event = new Event("unknown-type") {};
+    event.instanceId = instanceId;
+
+    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+    objectUnderTest.processRecords(kinesisInput);
+
+    verify(succeedingConsumer, never()).accept(any());
+  }
+
+  @Test
+  public void shouldSkipEventObjectWithoutType() {
+    String instanceId = "instance-id";
+    Event event = new Event(null) {};
+    event.instanceId = instanceId;
+
+    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+    objectUnderTest.processRecords(kinesisInput);
+
+    verify(succeedingConsumer, never()).accept(any());
+  }
+
+  @Test
+  public void shouldSkipEmptyObjectJsonPayload() {
+    String emptyJsonObject = "{}";
+
+    ProcessRecordsInput kinesisInput = sampleMessage(emptyJsonObject);
+    objectUnderTest.processRecords(kinesisInput);
+
+    verify(succeedingConsumer, never()).accept(any());
+  }
+
+  @Test
+  public void shouldParseEventObjectWithHeaderAndBodyProjectName() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = "instance-id";
+    event.projectName = "header_body_parser_project";
+    ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+    objectUnderTest.processRecords(kinesisInput);
+
+    verify(succeedingConsumer, only()).accept(any(EventMessage.class));
+  }
+
+  private ProcessRecordsInput sampleMessage(String message) {
+    Record kinesisRecord = Record.builder().data(SdkBytes.fromUtf8String(message)).build();
+    ProcessRecordsInput kinesisInput =
+        ProcessRecordsInput.builder()
+            .records(Collections.singletonList(KinesisClientRecord.fromRecord(kinesisRecord)))
+            .build();
+    return kinesisInput;
+  }
 }