Merge branch 'stable-3.5' into stable-3.6
* stable-3.5:
Consume events-broker from source
Bump up events-broker version to 3.5.1
Change-Id: I2c12e60802bdb4cf7a008d91972a34cc0ff8d212
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
index fb3daaf..b8f4d18 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
@@ -21,6 +21,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.google.pubsub.v1.PubsubMessage;
@@ -39,6 +41,7 @@
private final EventDeserializer eventsDeserializer;
private final PubSubSubscriberMetrics subscriberMetrics;
+ private final OneOffRequestContext oneOffRequestContext;
private final String topic;
private final Consumer<Event> messageProcessor;
private final SubscriberProvider subscriberProvider;
@@ -51,10 +54,12 @@
SubscriberProvider subscriberProvider,
PubSubConfiguration config,
PubSubSubscriberMetrics subscriberMetrics,
+ OneOffRequestContext oneOffRequestContext,
@Assisted String topic,
@Assisted Consumer<Event> messageProcessor) {
this.eventsDeserializer = eventsDeserializer;
this.subscriberMetrics = subscriberMetrics;
+ this.oneOffRequestContext = oneOffRequestContext;
this.topic = topic;
this.messageProcessor = messageProcessor;
this.subscriberProvider = subscriberProvider;
@@ -99,7 +104,7 @@
@VisibleForTesting
MessageReceiver getMessageReceiver() {
return (PubsubMessage message, AckReplyConsumer consumer) -> {
- try {
+ try (ManualRequestContext ctx = oneOffRequestContext.open()) {
Event event = eventsDeserializer.deserialize(message.getData().toStringUtf8());
messageProcessor.accept(event);
subscriberMetrics.incrementSucceedToConsumeMessage();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
index c144d3b..d9d9fac 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
@@ -26,6 +26,7 @@
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventGsonProvider;
import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.gson.Gson;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
@@ -43,6 +44,7 @@
@Mock PubSubConfiguration confMock;
@Mock SubscriberProvider subscriberProviderMock;
@Mock PubSubSubscriberMetrics pubSubSubscriberMetricsMock;
+ @Mock OneOffRequestContext oneOffRequestContext;
@Mock AckReplyConsumer ackReplyConsumerMock;
@Mock Consumer<Event> succeedingConsumer;
@Captor ArgumentCaptor<Event> eventMessageCaptor;
@@ -123,6 +125,7 @@
subscriberProviderMock,
confMock,
pubSubSubscriberMetricsMock,
+ oneOffRequestContext,
TOPIC,
consumer)
.getMessageReceiver();