Merge branch 'stable-3.3'

* stable-3.3:
  Deserialize Event and EventMessage
  Fix issue with message serialisation
  Bump events-broker to v3.3.2
  Add message content validation

Change-Id: I86d8e4f559ec4bf979ab31dded198abfbfc6553f
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 70bbdc8..d26ffb9 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -161,4 +161,4 @@
         name = "grpc-auth",
         artifact = "io.grpc:grpc-auth:1.36.0",
         sha1 = "d9722016658f8e649111c8bb93b299ea38dc207e",
-    )
\ No newline at end of file
+    )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
index 82e6133..32110dd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
@@ -14,11 +14,11 @@
 
 package com.googlesource.gerrit.plugins.pubsub;
 
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
 import com.google.api.gax.core.CredentialsProvider;
 import com.google.gerrit.extensions.config.FactoryModule;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
index a6058ae..c7be04c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
@@ -14,17 +14,22 @@
 
 package com.googlesource.gerrit.plugins.pubsub;
 
+import static java.util.Objects.requireNonNull;
+
 import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
 import com.google.cloud.pubsub.v1.AckReplyConsumer;
 import com.google.cloud.pubsub.v1.MessageReceiver;
 import com.google.cloud.pubsub.v1.Subscriber;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.google.pubsub.v1.PubsubMessage;
 import java.io.IOException;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
@@ -100,16 +105,34 @@
   MessageReceiver getMessageReceiver() {
     return (PubsubMessage message, AckReplyConsumer consumer) -> {
       try {
-        EventMessage event = gson.fromJson(message.getData().toStringUtf8(), EventMessage.class);
+        EventMessage event = deserialise(message.getData().toStringUtf8());
         messageProcessor.accept(event);
-        consumer.ack();
         subscriberMetrics.incrementSucceedToConsumeMessage();
       } catch (Exception e) {
         logger.atSevere().withCause(e).log(
             "Exception when consuming message %s from topic %s [message: %s]",
             message.getMessageId(), topic, message.getData().toStringUtf8());
         subscriberMetrics.incrementFailedToConsumeMessage();
+      } finally {
+        consumer.ack();
       }
     };
   }
+
+  private EventMessage deserialise(String json) {
+    EventMessage result = gson.fromJson(json, EventMessage.class);
+    if (result.getEvent() == null && result.getHeader() == null) {
+      Event event = deserialiseEvent(json);
+      result = new EventMessage(new Header(UUID.randomUUID(), event.instanceId), event);
+    }
+    result.validate();
+    return result;
+  }
+
+  private Event deserialiseEvent(String json) {
+    Event event = gson.fromJson(json, Event.class);
+    requireNonNull(event.type, "Event type cannot be null");
+    requireNonNull(event.instanceId, "Event instance id cannot be null");
+    return event;
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
index ec2dd69..c5a8318 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
@@ -14,17 +14,27 @@
 
 package com.googlesource.gerrit.plugins.pubsub;
 
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.only;
 import static org.mockito.Mockito.verify;
 
 import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.cloud.pubsub.v1.AckReplyConsumer;
 import com.google.cloud.pubsub.v1.MessageReceiver;
-import com.google.gerrit.json.OutputFormat;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGsonProvider;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gson.Gson;
+import com.google.protobuf.ByteString;
 import com.google.pubsub.v1.PubsubMessage;
+import java.util.UUID;
 import java.util.function.Consumer;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
@@ -35,8 +45,14 @@
   @Mock SubscriberProvider subscriberProviderMock;
   @Mock PubSubSubscriberMetrics pubSubSubscriberMetricsMock;
   @Mock AckReplyConsumer ackReplyConsumerMock;
+  @Mock Consumer<EventMessage> succeedingConsumer;
+  @Captor ArgumentCaptor<EventMessage> eventMessageCaptor;
 
   private static final String TOPIC = "foo";
+  private static final EventMessage eventMessage =
+      new EventMessage(
+          new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()), new ProjectCreatedEvent());
+  private Gson gson = new EventGsonProvider().get();
 
   @Test
   public void shouldIncrementFailedToConsumeMessageWhenReceivingFails() {
@@ -53,22 +69,69 @@
 
   @Test
   public void shouldIncrementSucceedToConsumeMessageWhenReceivingSucceeds() {
-    Consumer<EventMessage> succeedingConsumer = (message) -> {};
+    PubsubMessage pubsubMessage = sampleMessage();
 
-    messageReceiver(succeedingConsumer)
-        .receiveMessage(PubsubMessage.getDefaultInstance(), ackReplyConsumerMock);
+    messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock);
 
     verify(pubSubSubscriberMetricsMock, only()).incrementSucceedToConsumeMessage();
   }
 
+  @Test
+  public void shouldSkipEventWithoutSourceInstanceId() {
+    Event event = new ProjectCreatedEvent();
+    EventMessage messageWithoutSourceInstanceId =
+        new EventMessage(new EventMessage.Header(UUID.randomUUID(), (String) null), event);
+    PubsubMessage pubsubMessage = sampleMessage(messageWithoutSourceInstanceId);
+
+    messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock);
+
+    verify(succeedingConsumer, never()).accept(messageWithoutSourceInstanceId);
+  }
+
+  @Test
+  public void shouldParseEventObject() {
+    String instanceId = "instance-id";
+
+    Event event = new ProjectCreatedEvent();
+    event.instanceId = instanceId;
+    PubsubMessage pubsubMessage = sampleMessage(event);
+    messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock);
+
+    verify(succeedingConsumer, only()).accept(eventMessageCaptor.capture());
+    EventMessage result = eventMessageCaptor.getValue();
+    assertThat(result.getHeader().sourceInstanceId).isEqualTo(instanceId);
+  }
+
+  @Test
+  public void shouldParseEventObjectWithHeaderAndBodyProjectName() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = "instance-id";
+    event.projectName = "header_body_parser_project";
+    PubsubMessage pubsubMessage = sampleMessage(event);
+    messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock);
+
+    verify(succeedingConsumer, only()).accept(any(EventMessage.class));
+  }
+
+  private PubsubMessage sampleMessage(Event event) {
+    String eventPayload = gson.toJson(event);
+    ByteString data = ByteString.copyFromUtf8(eventPayload);
+    return PubsubMessage.newBuilder().setData(data).build();
+  }
+
+  private PubsubMessage sampleMessage(EventMessage message) {
+    String eventPayload = gson.toJson(message);
+    ByteString data = ByteString.copyFromUtf8(eventPayload);
+    return PubsubMessage.newBuilder().setData(data).build();
+  }
+
+  private PubsubMessage sampleMessage() {
+    return sampleMessage(eventMessage);
+  }
+
   private MessageReceiver messageReceiver(Consumer<EventMessage> consumer) {
     return new PubSubEventSubscriber(
-            OutputFormat.JSON_COMPACT.newGson(),
-            subscriberProviderMock,
-            confMock,
-            pubSubSubscriberMetricsMock,
-            TOPIC,
-            consumer)
+            gson, subscriberProviderMock, confMock, pubSubSubscriberMetricsMock, TOPIC, consumer)
         .getMessageReceiver();
   }
 }