Merge branch 'stable-3.3'
* stable-3.3:
Deserialize Event and EventMessage
Change-Id: Ife31563339c7506e8bceca6ff9bd7c6436a2d417
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
index f142b73..b92e8db 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
@@ -14,13 +14,18 @@
package com.googlesource.gerrit.plugins.kinesis;
+import static java.util.Objects.requireNonNull;
+
import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.util.ManualRequestContext;
import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
+import java.util.UUID;
import java.util.function.Consumer;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
@@ -71,8 +76,7 @@
String jsonMessage = new String(byteRecord);
logger.atFiner().log("Kinesis consumed event: '%s'", jsonMessage);
try (ManualRequestContext ctx = oneOffCtx.open()) {
- EventMessage eventMessage = gson.fromJson(jsonMessage, EventMessage.class);
- eventMessage.validate();
+ EventMessage eventMessage = deserialise(jsonMessage);
recordProcessor.accept(eventMessage);
} catch (Exception e) {
logger.atSevere().withCause(e).log("Could not process event '%s'", jsonMessage);
@@ -83,6 +87,23 @@
}
}
+ private EventMessage deserialise(String json) {
+ EventMessage result = gson.fromJson(json, EventMessage.class);
+ if (result.getEvent() == null && result.getHeader() == null) {
+ Event event = deserialiseEvent(json);
+ result = new EventMessage(new Header(UUID.randomUUID(), event.instanceId), event);
+ }
+ result.validate();
+ return result;
+ }
+
+ private Event deserialiseEvent(String json) {
+ Event event = gson.fromJson(json, Event.class);
+ requireNonNull(event.type, "Event type cannot be null");
+ requireNonNull(event.instanceId, "Event instance id cannot be null");
+ return event;
+ }
+
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
logger.atInfo().log("Lost lease, so terminating.");
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
index 5e0ff74..2401034 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
@@ -14,7 +14,10 @@
package com.googlesource.gerrit.plugins.kinesis;
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.only;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -31,6 +34,8 @@
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.core.SdkBytes;
@@ -44,6 +49,7 @@
private Gson gson = new EventGsonProvider().get();
@Mock Consumer<EventMessage> succeedingConsumer;
+ @Captor ArgumentCaptor<EventMessage> eventMessageCaptor;
@Mock OneOffRequestContext oneOffCtx;
@Mock ManualRequestContext requestContext;
@@ -59,17 +65,91 @@
EventMessage messageWithoutSourceInstanceId =
new EventMessage(new EventMessage.Header(UUID.randomUUID(), (String) null), event);
- Record kinesisRecord =
- Record.builder()
- .data(SdkBytes.fromUtf8String(gson.toJson(messageWithoutSourceInstanceId)))
- .build();
- ProcessRecordsInput kinesisInput =
- ProcessRecordsInput.builder()
- .records(Collections.singletonList(KinesisClientRecord.fromRecord(kinesisRecord)))
- .build();
+ ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(messageWithoutSourceInstanceId));
objectUnderTest.processRecords(kinesisInput);
verify(succeedingConsumer, never()).accept(messageWithoutSourceInstanceId);
}
+
+ @Test
+ public void shouldParseEventObject() {
+ String instanceId = "instance-id";
+
+ Event event = new ProjectCreatedEvent();
+ event.instanceId = instanceId;
+
+ ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+ objectUnderTest.processRecords(kinesisInput);
+
+ verify(succeedingConsumer, only()).accept(eventMessageCaptor.capture());
+
+ EventMessage result = eventMessageCaptor.getValue();
+ assertThat(result.getHeader().sourceInstanceId).isEqualTo(instanceId);
+ }
+
+ @Test
+ public void shouldSkipEventObjectWithoutInstanceId() {
+ Event event = new ProjectCreatedEvent();
+ event.instanceId = null;
+
+ ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+ objectUnderTest.processRecords(kinesisInput);
+
+ verify(succeedingConsumer, never()).accept(any());
+ }
+
+ @Test
+ public void shouldSkipEventObjectWithUnknownType() {
+ String instanceId = "instance-id";
+ Event event = new Event("unknown-type") {};
+ event.instanceId = instanceId;
+
+ ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+ objectUnderTest.processRecords(kinesisInput);
+
+ verify(succeedingConsumer, never()).accept(any());
+ }
+
+ @Test
+ public void shouldSkipEventObjectWithoutType() {
+ String instanceId = "instance-id";
+ Event event = new Event(null) {};
+ event.instanceId = instanceId;
+
+ ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+ objectUnderTest.processRecords(kinesisInput);
+
+ verify(succeedingConsumer, never()).accept(any());
+ }
+
+ @Test
+ public void shouldSkipEmptyObjectJsonPayload() {
+ String emptyJsonObject = "{}";
+
+ ProcessRecordsInput kinesisInput = sampleMessage(emptyJsonObject);
+ objectUnderTest.processRecords(kinesisInput);
+
+ verify(succeedingConsumer, never()).accept(any());
+ }
+
+ @Test
+ public void shouldParseEventObjectWithHeaderAndBodyProjectName() {
+ ProjectCreatedEvent event = new ProjectCreatedEvent();
+ event.instanceId = "instance-id";
+ event.projectName = "header_body_parser_project";
+ ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+ objectUnderTest.processRecords(kinesisInput);
+
+ verify(succeedingConsumer, only()).accept(any(EventMessage.class));
+ }
+
+ private ProcessRecordsInput sampleMessage(String message) {
+ Record kinesisRecord = Record.builder().data(SdkBytes.fromUtf8String(message)).build();
+ ProcessRecordsInput kinesisInput =
+ ProcessRecordsInput.builder()
+ .records(Collections.singletonList(KinesisClientRecord.fromRecord(kinesisRecord)))
+ .build();
+ return kinesisInput;
+ }
}