Use event deserialization logic from events broker
To avoid code repetition events deserialization was moved to
events-broker library
Bug: Issue 14593
Change-Id: Ib144252bf473689c4e926c241ff97d483de4b04d
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index bbc3418..52ac706 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -109,8 +109,8 @@
maven_jar(
name = "events-broker",
- artifact = "com.gerritforge:events-broker:3.4.0",
- sha1 = "031881f18def90f945b21c7aafda3a1ac95e89c8",
+ artifact = "com.gerritforge:events-broker:3.4.0.1",
+ sha1 = "2d406afa8787621442d855e4b458c97bd24f1198",
)
maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
index b92e8db..1357e44 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
@@ -14,18 +14,13 @@
package com.googlesource.gerrit.plugins.kinesis;
-import static java.util.Objects.requireNonNull;
-
+import com.gerritforge.gerrit.eventbroker.EventDeserializer;
import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.util.ManualRequestContext;
import com.google.gerrit.server.util.OneOffRequestContext;
-import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
-import java.util.UUID;
import java.util.function.Consumer;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
@@ -44,14 +39,16 @@
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final Consumer<EventMessage> recordProcessor;
private final OneOffRequestContext oneOffCtx;
- private final Gson gson;
+ private final EventDeserializer eventDeserializer;
@Inject
KinesisRecordProcessor(
- @Assisted Consumer<EventMessage> recordProcessor, OneOffRequestContext oneOffCtx, Gson gson) {
+ @Assisted Consumer<EventMessage> recordProcessor,
+ OneOffRequestContext oneOffCtx,
+ EventDeserializer eventDeserializer) {
this.recordProcessor = recordProcessor;
this.oneOffCtx = oneOffCtx;
- this.gson = gson;
+ this.eventDeserializer = eventDeserializer;
}
@Override
@@ -76,7 +73,7 @@
String jsonMessage = new String(byteRecord);
logger.atFiner().log("Kinesis consumed event: '%s'", jsonMessage);
try (ManualRequestContext ctx = oneOffCtx.open()) {
- EventMessage eventMessage = deserialise(jsonMessage);
+ EventMessage eventMessage = eventDeserializer.deserialize(jsonMessage);
recordProcessor.accept(eventMessage);
} catch (Exception e) {
logger.atSevere().withCause(e).log("Could not process event '%s'", jsonMessage);
@@ -87,23 +84,6 @@
}
}
- private EventMessage deserialise(String json) {
- EventMessage result = gson.fromJson(json, EventMessage.class);
- if (result.getEvent() == null && result.getHeader() == null) {
- Event event = deserialiseEvent(json);
- result = new EventMessage(new Header(UUID.randomUUID(), event.instanceId), event);
- }
- result.validate();
- return result;
- }
-
- private Event deserialiseEvent(String json) {
- Event event = gson.fromJson(json, Event.class);
- requireNonNull(event.type, "Event type cannot be null");
- requireNonNull(event.instanceId, "Event instance id cannot be null");
- return event;
- }
-
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
logger.atInfo().log("Lost lease, so terminating.");
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
index 2401034..1c30628 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
@@ -21,6 +21,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.gerritforge.gerrit.eventbroker.EventDeserializer;
import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.gerrit.server.events.Event;
@@ -47,6 +48,7 @@
public class KinesisRecordProcessorTest {
private KinesisRecordProcessor objectUnderTest;
private Gson gson = new EventGsonProvider().get();
+ private EventDeserializer eventDeserializer = new EventDeserializer(gson);
@Mock Consumer<EventMessage> succeedingConsumer;
@Captor ArgumentCaptor<EventMessage> eventMessageCaptor;
@@ -56,7 +58,7 @@
@Before
public void setup() {
when(oneOffCtx.open()).thenReturn(requestContext);
- objectUnderTest = new KinesisRecordProcessor(succeedingConsumer, oneOffCtx, gson);
+ objectUnderTest = new KinesisRecordProcessor(succeedingConsumer, oneOffCtx, eventDeserializer);
}
@Test