Merge branch 'stable-3.3' * stable-3.3: Deserialize Event and EventMessage Fix issue with message serialisation Bump events-broker to v3.3.2 Add message content validation Change-Id: I86d8e4f559ec4bf979ab31dded198abfbfc6553f
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl index 70bbdc8..d26ffb9 100644 --- a/external_plugin_deps.bzl +++ b/external_plugin_deps.bzl
@@ -161,4 +161,4 @@ name = "grpc-auth", artifact = "io.grpc:grpc-auth:1.36.0", sha1 = "d9722016658f8e649111c8bb93b299ea38dc207e", - ) \ No newline at end of file + )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java index 82e6133..32110dd 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java +++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
@@ -14,11 +14,11 @@ package com.googlesource.gerrit.plugins.pubsub; +import com.gerritforge.gerrit.eventbroker.EventGsonProvider; import com.google.api.gax.core.CredentialsProvider; import com.google.gerrit.extensions.config.FactoryModule; import com.google.gerrit.extensions.events.LifecycleListener; import com.google.gerrit.extensions.registration.DynamicSet; -import com.google.gerrit.server.events.EventGsonProvider; import com.google.gerrit.server.events.EventListener; import com.google.gson.Gson; import com.google.inject.Inject;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java index a6058ae..c7be04c 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java +++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
@@ -14,17 +14,22 @@ package com.googlesource.gerrit.plugins.pubsub; +import static java.util.Objects.requireNonNull; + import com.gerritforge.gerrit.eventbroker.EventMessage; +import com.gerritforge.gerrit.eventbroker.EventMessage.Header; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Subscriber; import com.google.common.annotations.VisibleForTesting; import com.google.common.flogger.FluentLogger; +import com.google.gerrit.server.events.Event; import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; import com.google.pubsub.v1.PubsubMessage; import java.io.IOException; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -100,16 +105,34 @@ MessageReceiver getMessageReceiver() { return (PubsubMessage message, AckReplyConsumer consumer) -> { try { - EventMessage event = gson.fromJson(message.getData().toStringUtf8(), EventMessage.class); + EventMessage event = deserialise(message.getData().toStringUtf8()); messageProcessor.accept(event); - consumer.ack(); subscriberMetrics.incrementSucceedToConsumeMessage(); } catch (Exception e) { logger.atSevere().withCause(e).log( "Exception when consuming message %s from topic %s [message: %s]", message.getMessageId(), topic, message.getData().toStringUtf8()); subscriberMetrics.incrementFailedToConsumeMessage(); + } finally { + consumer.ack(); } }; } + + private EventMessage deserialise(String json) { + EventMessage result = gson.fromJson(json, EventMessage.class); + if (result.getEvent() == null && result.getHeader() == null) { + Event event = deserialiseEvent(json); + result = new EventMessage(new Header(UUID.randomUUID(), event.instanceId), event); + } + result.validate(); + return result; + } + + private Event deserialiseEvent(String json) { + Event event = gson.fromJson(json, Event.class); + requireNonNull(event.type, "Event type cannot be null"); + requireNonNull(event.instanceId, "Event instance id cannot be null"); + return event; + } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java index ec2dd69..c5a8318 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
@@ -14,17 +14,27 @@ package com.googlesource.gerrit.plugins.pubsub; +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.only; import static org.mockito.Mockito.verify; import com.gerritforge.gerrit.eventbroker.EventMessage; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; -import com.google.gerrit.json.OutputFormat; +import com.google.gerrit.server.events.Event; +import com.google.gerrit.server.events.EventGsonProvider; +import com.google.gerrit.server.events.ProjectCreatedEvent; +import com.google.gson.Gson; +import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; +import java.util.UUID; import java.util.function.Consumer; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @@ -35,8 +45,14 @@ @Mock SubscriberProvider subscriberProviderMock; @Mock PubSubSubscriberMetrics pubSubSubscriberMetricsMock; @Mock AckReplyConsumer ackReplyConsumerMock; + @Mock Consumer<EventMessage> succeedingConsumer; + @Captor ArgumentCaptor<EventMessage> eventMessageCaptor; private static final String TOPIC = "foo"; + private static final EventMessage eventMessage = + new EventMessage( + new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()), new ProjectCreatedEvent()); + private Gson gson = new EventGsonProvider().get(); @Test public void shouldIncrementFailedToConsumeMessageWhenReceivingFails() { @@ -53,22 +69,69 @@ @Test public void shouldIncrementSucceedToConsumeMessageWhenReceivingSucceeds() { - Consumer<EventMessage> succeedingConsumer = (message) -> {}; + PubsubMessage pubsubMessage = sampleMessage(); - messageReceiver(succeedingConsumer) - .receiveMessage(PubsubMessage.getDefaultInstance(), ackReplyConsumerMock); + messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock); verify(pubSubSubscriberMetricsMock, only()).incrementSucceedToConsumeMessage(); } + @Test + public void shouldSkipEventWithoutSourceInstanceId() { + Event event = new ProjectCreatedEvent(); + EventMessage messageWithoutSourceInstanceId = + new EventMessage(new EventMessage.Header(UUID.randomUUID(), (String) null), event); + PubsubMessage pubsubMessage = sampleMessage(messageWithoutSourceInstanceId); + + messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock); + + verify(succeedingConsumer, never()).accept(messageWithoutSourceInstanceId); + } + + @Test + public void shouldParseEventObject() { + String instanceId = "instance-id"; + + Event event = new ProjectCreatedEvent(); + event.instanceId = instanceId; + PubsubMessage pubsubMessage = sampleMessage(event); + messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock); + + verify(succeedingConsumer, only()).accept(eventMessageCaptor.capture()); + EventMessage result = eventMessageCaptor.getValue(); + assertThat(result.getHeader().sourceInstanceId).isEqualTo(instanceId); + } + + @Test + public void shouldParseEventObjectWithHeaderAndBodyProjectName() { + ProjectCreatedEvent event = new ProjectCreatedEvent(); + event.instanceId = "instance-id"; + event.projectName = "header_body_parser_project"; + PubsubMessage pubsubMessage = sampleMessage(event); + messageReceiver(succeedingConsumer).receiveMessage(pubsubMessage, ackReplyConsumerMock); + + verify(succeedingConsumer, only()).accept(any(EventMessage.class)); + } + + private PubsubMessage sampleMessage(Event event) { + String eventPayload = gson.toJson(event); + ByteString data = ByteString.copyFromUtf8(eventPayload); + return PubsubMessage.newBuilder().setData(data).build(); + } + + private PubsubMessage sampleMessage(EventMessage message) { + String eventPayload = gson.toJson(message); + ByteString data = ByteString.copyFromUtf8(eventPayload); + return PubsubMessage.newBuilder().setData(data).build(); + } + + private PubsubMessage sampleMessage() { + return sampleMessage(eventMessage); + } + private MessageReceiver messageReceiver(Consumer<EventMessage> consumer) { return new PubSubEventSubscriber( - OutputFormat.JSON_COMPACT.newGson(), - subscriberProviderMock, - confMock, - pubSubSubscriberMetricsMock, - TOPIC, - consumer) + gson, subscriberProviderMock, confMock, pubSubSubscriberMetricsMock, TOPIC, consumer) .getMessageReceiver(); } }