Merge branch 'stable-3.3' into stable-3.4
* stable-3.3:
Deserialize Event and EventMessage
Change-Id: I80ad1d967b164e2284cd2bcbc63190630fbbd4e2
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
index bab2ad0..4c57a54 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
@@ -14,11 +14,16 @@
package com.googlesource.gerrit.plugins.kafka.subscribe;
+import static java.util.Objects.requireNonNull;
+
import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
+import com.google.gerrit.server.events.Event;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.Map;
+import java.util.UUID;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -42,13 +47,23 @@
@Override
public EventMessage deserialize(String topic, byte[] data) {
- final EventMessage result =
- gson.fromJson(stringDeserializer.deserialize(topic, data), EventMessage.class);
+ String json = stringDeserializer.deserialize(topic, data);
+ EventMessage result = gson.fromJson(json, EventMessage.class);
+ if (result.getEvent() == null && result.getHeader() == null) {
+ Event event = deserialiseEvent(json);
+ result = new EventMessage(new Header(UUID.randomUUID(), event.instanceId), event);
+ }
result.validate();
-
return result;
}
+ private Event deserialiseEvent(String json) {
+ Event event = gson.fromJson(json, Event.class);
+ requireNonNull(event.type, "Event type cannot be null");
+ requireNonNull(event.instanceId, "Event instance id cannot be null");
+ return event;
+ }
+
@Override
public void close() {}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
index 8ea5f3f..f5b6861 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
@@ -34,7 +34,7 @@
}
@Test
- public void kafkaEventDeserializerShouldParseAKafkaEvent() {
+ public void kafkaEventDeserializerShouldParseAKafkaEventMessage() {
final UUID eventId = UUID.randomUUID();
final String eventType = "event-type";
final String sourceInstanceId = UUID.randomUUID().toString();
@@ -52,6 +52,23 @@
assertThat(event.getHeader().sourceInstanceId).isEqualTo(sourceInstanceId);
}
+ @Test
+ public void kafkaEventDeserializerShouldParseKafkaEvent() {
+ final String eventJson = "{ \"type\": \"project-created\", \"instanceId\":\"instance-id\" }";
+ final EventMessage event = deserializer.deserialize("ignored", eventJson.getBytes(UTF_8));
+
+ assertThat(event.getHeader().sourceInstanceId).isEqualTo("instance-id");
+ }
+
+ @Test
+ public void kafkaEventDeserializerShouldParseKafkaEventWithHeaderAndBodyProjectName() {
+ final String eventJson =
+ "{\"projectName\":\"header_body_parser_project\",\"type\":\"project-created\", \"instanceId\":\"instance-id\"}";
+ final EventMessage event = deserializer.deserialize("ignored", eventJson.getBytes(UTF_8));
+
+ assertThat(event.getHeader().sourceInstanceId).isEqualTo("instance-id");
+ }
+
@Test(expected = RuntimeException.class)
public void kafkaEventDeserializerShouldFailForInvalidJson() {
deserializer.deserialize("ignored", "this is not a JSON string".getBytes(UTF_8));