Add message content validation

Add message validation to the message receiver. This allows to avoid
issues during the message processing. For example validation will
discard all the messages without source instance id.

Bug: Issue 14423
Change-Id: I0445ab21449eccaa9214156077218c2268f5c5a7
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
index a57aa3f..f142b73 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
@@ -72,6 +72,7 @@
                 logger.atFiner().log("Kinesis consumed event: '%s'", jsonMessage);
                 try (ManualRequestContext ctx = oneOffCtx.open()) {
                   EventMessage eventMessage = gson.fromJson(jsonMessage, EventMessage.class);
+                  eventMessage.validate();
                   recordProcessor.accept(eventMessage);
                 } catch (Exception e) {
                   logger.atSevere().withCause(e).log("Could not process event '%s'", jsonMessage);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
new file mode 100644
index 0000000..5e0ff74
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
@@ -0,0 +1,75 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gson.Gson;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisRecordProcessorTest {
+  private KinesisRecordProcessor objectUnderTest;
+  private Gson gson = new EventGsonProvider().get();
+
+  @Mock Consumer<EventMessage> succeedingConsumer;
+  @Mock OneOffRequestContext oneOffCtx;
+  @Mock ManualRequestContext requestContext;
+
+  @Before
+  public void setup() {
+    when(oneOffCtx.open()).thenReturn(requestContext);
+    objectUnderTest = new KinesisRecordProcessor(succeedingConsumer, oneOffCtx, gson);
+  }
+
+  @Test
+  public void shouldSkipEventWithoutSourceInstanceId() {
+    Event event = new ProjectCreatedEvent();
+    EventMessage messageWithoutSourceInstanceId =
+        new EventMessage(new EventMessage.Header(UUID.randomUUID(), (String) null), event);
+
+    Record kinesisRecord =
+        Record.builder()
+            .data(SdkBytes.fromUtf8String(gson.toJson(messageWithoutSourceInstanceId)))
+            .build();
+    ProcessRecordsInput kinesisInput =
+        ProcessRecordsInput.builder()
+            .records(Collections.singletonList(KinesisClientRecord.fromRecord(kinesisRecord)))
+            .build();
+
+    objectUnderTest.processRecords(kinesisInput);
+
+    verify(succeedingConsumer, never()).accept(messageWithoutSourceInstanceId);
+  }
+}