Merge branch 'stable-3.3'
* stable-3.3:
Deserialize Event and EventMessage
Fix issue with message serialisation
Bump events-broker to v3.3.2
Add message content validation
Change-Id: I86d8e4f559ec4bf979ab31dded198abfbfc6553f
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 70bbdc8..d26ffb9 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -161,4 +161,4 @@
name = "grpc-auth",
artifact = "io.grpc:grpc-auth:1.36.0",
sha1 = "d9722016658f8e649111c8bb93b299ea38dc207e",
- )
\ No newline at end of file
+ )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
index 82e6133..32110dd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
@@ -14,11 +14,11 @@
package com.googlesource.gerrit.plugins.pubsub;
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
import com.google.api.gax.core.CredentialsProvider;
import com.google.gerrit.extensions.config.FactoryModule;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.EventGsonProvider;
import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
import com.google.inject.Inject;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
index a6058ae..c7be04c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
@@ -14,17 +14,22 @@
package com.googlesource.gerrit.plugins.pubsub;
+import static java.util.Objects.requireNonNull;
+
import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
@@ -100,16 +105,34 @@
MessageReceiver getMessageReceiver() {
return (PubsubMessage message, AckReplyConsumer consumer) -> {
try {
- EventMessage event = gson.fromJson(message.getData().toStringUtf8(), EventMessage.class);
+ EventMessage event = deserialise(message.getData().toStringUtf8());
messageProcessor.accept(event);
- consumer.ack();
subscriberMetrics.incrementSucceedToConsumeMessage();
} catch (Exception e) {
logger.atSevere().withCause(e).log(
"Exception when consuming message %s from topic %s [message: %s]",
message.getMessageId(), topic, message.getData().toStringUtf8());
subscriberMetrics.incrementFailedToConsumeMessage();
+ } finally {
+ consumer.ack();
}
};
}
+
+ private EventMessage deserialise(String json) {
+ EventMessage result = gson.fromJson(json, EventMessage.class);
+ if (result.getEvent() == null && result.getHeader() == null) {
+ Event event = deserialiseEvent(json);
+ result = new EventMessage(new Header(UUID.randomUUID(), event.instanceId), event);
+ }
+ result.validate();
+ return result;
+ }
+
+ private Event deserialiseEvent(String json) {
+ Event event = gson.fromJson(json, Event.class);
+ requireNonNull(event.type, "Event type cannot be null");
+ requireNonNull(event.instanceId, "Event instance id cannot be null");
+ return event;
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
index ec2dd69..c5a8318 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
@@ -14,17 +14,27 @@
package com.googlesource.gerrit.plugins.pubsub;
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.verify;
import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
-import com.google.gerrit.json.OutputFormat;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGsonProvider;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gson.Gson;
+import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
+import java.util.UUID;
import java.util.function.Consumer;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@@ -35,8 +45,14 @@
@Mock SubscriberProvider subscriberProviderMock;
@Mock PubSubSubscriberMetrics pubSubSubscriberMetricsMock;
@Mock AckReplyConsumer ackReplyConsumerMock;
+ @Mock Consumer<EventMessage> succeedingConsumer;
+ @Captor ArgumentCaptor<EventMessage> eventMessageCaptor;
private static final String TOPIC = "foo";
+ private static final EventMessage eventMessage =
+ new EventMessage(
+ new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()), new ProjectCreatedEvent());
+ private Gson gson = new EventGsonProvider().get();
@Test
public void shouldIncrementFailedToConsumeMessageWhenReceivingFails() {
@@ -53,22 +69,69 @@
@Test
public void shouldIncrementSucceedToConsumeMessageWhenReceivingSucceeds() {
- Consumer<EventMessage> succeedingConsumer = (message) -> {};
+ PubsubMessage pubsubMessage = sampleMessage();
- messageReceiver(succeedingConsumer)
- .receiveMessage(PubsubMessage.getDefaultInstance(), ackReplyConsumerMock);
+ messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock);
verify(pubSubSubscriberMetricsMock, only()).incrementSucceedToConsumeMessage();
}
+ @Test
+ public void shouldSkipEventWithoutSourceInstanceId() {
+ Event event = new ProjectCreatedEvent();
+ EventMessage messageWithoutSourceInstanceId =
+ new EventMessage(new EventMessage.Header(UUID.randomUUID(), (String) null), event);
+ PubsubMessage pubsubMessage = sampleMessage(messageWithoutSourceInstanceId);
+
+ messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock);
+
+ verify(succeedingConsumer, never()).accept(messageWithoutSourceInstanceId);
+ }
+
+ @Test
+ public void shouldParseEventObject() {
+ String instanceId = "instance-id";
+
+ Event event = new ProjectCreatedEvent();
+ event.instanceId = instanceId;
+ PubsubMessage pubsubMessage = sampleMessage(event);
+ messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock);
+
+ verify(succeedingConsumer, only()).accept(eventMessageCaptor.capture());
+ EventMessage result = eventMessageCaptor.getValue();
+ assertThat(result.getHeader().sourceInstanceId).isEqualTo(instanceId);
+ }
+
+ @Test
+ public void shouldParseEventObjectWithHeaderAndBodyProjectName() {
+ ProjectCreatedEvent event = new ProjectCreatedEvent();
+ event.instanceId = "instance-id";
+ event.projectName = "header_body_parser_project";
+ PubsubMessage pubsubMessage = sampleMessage(event);
+ messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock);
+
+ verify(succeedingConsumer, only()).accept(any(EventMessage.class));
+ }
+
+ private PubsubMessage sampleMessage(Event event) {
+ String eventPayload = gson.toJson(event);
+ ByteString data = ByteString.copyFromUtf8(eventPayload);
+ return PubsubMessage.newBuilder().setData(data).build();
+ }
+
+ private PubsubMessage sampleMessage(EventMessage message) {
+ String eventPayload = gson.toJson(message);
+ ByteString data = ByteString.copyFromUtf8(eventPayload);
+ return PubsubMessage.newBuilder().setData(data).build();
+ }
+
+ private PubsubMessage sampleMessage() {
+ return sampleMessage(eventMessage);
+ }
+
private MessageReceiver messageReceiver(Consumer<EventMessage> consumer) {
return new PubSubEventSubscriber(
- OutputFormat.JSON_COMPACT.newGson(),
- subscriberProviderMock,
- confMock,
- pubSubSubscriberMetricsMock,
- TOPIC,
- consumer)
+ gson, subscriberProviderMock, confMock, pubSubSubscriberMetricsMock, TOPIC, consumer)
.getMessageReceiver();
}
}