Deserialize Event and EventMessage

From Gerrit v3.2 event contains instance id field. In version 3.4
EventMessage envelope will be replaced with Event. To allow rolling
upgrade compatibility between v3.3 and v3.4 need to be assured. To do
that PubSubEventSubscriber must be able to handle both message types.

Bug: Issue 14390
Change-Id: I690af72de6fbce59650a4fbe74e347d5d6cb6fb9
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
index f58f3ec..c7be04c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
@@ -14,17 +14,22 @@
 
 package com.googlesource.gerrit.plugins.pubsub;
 
+import static java.util.Objects.requireNonNull;
+
 import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
 import com.google.cloud.pubsub.v1.AckReplyConsumer;
 import com.google.cloud.pubsub.v1.MessageReceiver;
 import com.google.cloud.pubsub.v1.Subscriber;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.google.pubsub.v1.PubsubMessage;
 import java.io.IOException;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
@@ -100,8 +105,7 @@
   MessageReceiver getMessageReceiver() {
     return (PubsubMessage message, AckReplyConsumer consumer) -> {
       try {
-        EventMessage event = gson.fromJson(message.getData().toStringUtf8(), EventMessage.class);
-        event.validate();
+        EventMessage event = deserialise(message.getData().toStringUtf8());
         messageProcessor.accept(event);
         subscriberMetrics.incrementSucceedToConsumeMessage();
       } catch (Exception e) {
@@ -114,4 +118,21 @@
       }
     };
   }
+
+  private EventMessage deserialise(String json) {
+    EventMessage result = gson.fromJson(json, EventMessage.class);
+    if (result.getEvent() == null && result.getHeader() == null) {
+      Event event = deserialiseEvent(json);
+      result = new EventMessage(new Header(UUID.randomUUID(), event.instanceId), event);
+    }
+    result.validate();
+    return result;
+  }
+
+  private Event deserialiseEvent(String json) {
+    Event event = gson.fromJson(json, Event.class);
+    requireNonNull(event.type, "Event type cannot be null");
+    requireNonNull(event.instanceId, "Event instance id cannot be null");
+    return event;
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
index 1ee8e8a..c5a8318 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.pubsub;
 
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.only;
 import static org.mockito.Mockito.verify;
@@ -31,6 +33,8 @@
 import java.util.function.Consumer;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
@@ -42,6 +46,7 @@
   @Mock PubSubSubscriberMetrics pubSubSubscriberMetricsMock;
   @Mock AckReplyConsumer ackReplyConsumerMock;
   @Mock Consumer<EventMessage> succeedingConsumer;
+  @Captor ArgumentCaptor<EventMessage> eventMessageCaptor;
 
   private static final String TOPIC = "foo";
   private static final EventMessage eventMessage =
@@ -83,6 +88,37 @@
     verify(succeedingConsumer, never()).accept(messageWithoutSourceInstanceId);
   }
 
+  @Test
+  public void shouldParseEventObject() {
+    String instanceId = "instance-id";
+
+    Event event = new ProjectCreatedEvent();
+    event.instanceId = instanceId;
+    PubsubMessage pubsubMessage = sampleMessage(event);
+    messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock);
+
+    verify(succeedingConsumer, only()).accept(eventMessageCaptor.capture());
+    EventMessage result = eventMessageCaptor.getValue();
+    assertThat(result.getHeader().sourceInstanceId).isEqualTo(instanceId);
+  }
+
+  @Test
+  public void shouldParseEventObjectWithHeaderAndBodyProjectName() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = "instance-id";
+    event.projectName = "header_body_parser_project";
+    PubsubMessage pubsubMessage = sampleMessage(event);
+    messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock);
+
+    verify(succeedingConsumer, only()).accept(any(EventMessage.class));
+  }
+
+  private PubsubMessage sampleMessage(Event event) {
+    String eventPayload = gson.toJson(event);
+    ByteString data = ByteString.copyFromUtf8(eventPayload);
+    return PubsubMessage.newBuilder().setData(data).build();
+  }
+
   private PubsubMessage sampleMessage(EventMessage message) {
     String eventPayload = gson.toJson(message);
     ByteString data = ByteString.copyFromUtf8(eventPayload);