Merge branch 'stable-3.3'

* stable-3.3:
  Add message content validation
  Use events-broker 3.3.2

Change-Id: I963bde4e53eb11faee8077cc1784b0af020caa73
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
index a57aa3f..f142b73 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
@@ -72,6 +72,7 @@
                 logger.atFiner().log("Kinesis consumed event: '%s'", jsonMessage);
                 try (ManualRequestContext ctx = oneOffCtx.open()) {
                   EventMessage eventMessage = gson.fromJson(jsonMessage, EventMessage.class);
+                  eventMessage.validate();
                   recordProcessor.accept(eventMessage);
                 } catch (Exception e) {
                   logger.atSevere().withCause(e).log("Could not process event '%s'", jsonMessage);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
new file mode 100644
index 0000000..5e0ff74
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
@@ -0,0 +1,75 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gson.Gson;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisRecordProcessorTest {
+  private KinesisRecordProcessor objectUnderTest;
+  private Gson gson = new EventGsonProvider().get();
+
+  @Mock Consumer<EventMessage> succeedingConsumer;
+  @Mock OneOffRequestContext oneOffCtx;
+  @Mock ManualRequestContext requestContext;
+
+  @Before
+  public void setup() {
+    when(oneOffCtx.open()).thenReturn(requestContext);
+    objectUnderTest = new KinesisRecordProcessor(succeedingConsumer, oneOffCtx, gson);
+  }
+
+  @Test
+  public void shouldSkipEventWithoutSourceInstanceId() {
+    Event event = new ProjectCreatedEvent();
+    EventMessage messageWithoutSourceInstanceId =
+        new EventMessage(new EventMessage.Header(UUID.randomUUID(), (String) null), event);
+
+    Record kinesisRecord =
+        Record.builder()
+            .data(SdkBytes.fromUtf8String(gson.toJson(messageWithoutSourceInstanceId)))
+            .build();
+    ProcessRecordsInput kinesisInput =
+        ProcessRecordsInput.builder()
+            .records(Collections.singletonList(KinesisClientRecord.fromRecord(kinesisRecord)))
+            .build();
+
+    objectUnderTest.processRecords(kinesisInput);
+
+    verify(succeedingConsumer, never()).accept(messageWithoutSourceInstanceId);
+  }
+}