Process receive events as InternalUser, set request context

Same as in the events-kafka plugin we must set proper thread local
context for the events subscriber thread(s). If this is not done then
the current user is the anonymous user and, as an example for the
pull-replication plugin, project creation doesn't work (unless project
creation would be enabled for anonymous users).

Change-Id: I3ecb379d327b62a7964706a12c0c158a7d0a13d6
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
index fb3daaf..b8f4d18 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
@@ -21,6 +21,8 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.google.pubsub.v1.PubsubMessage;
@@ -39,6 +41,7 @@
 
   private final EventDeserializer eventsDeserializer;
   private final PubSubSubscriberMetrics subscriberMetrics;
+  private final OneOffRequestContext oneOffRequestContext;
   private final String topic;
   private final Consumer<Event> messageProcessor;
   private final SubscriberProvider subscriberProvider;
@@ -51,10 +54,12 @@
       SubscriberProvider subscriberProvider,
       PubSubConfiguration config,
       PubSubSubscriberMetrics subscriberMetrics,
+      OneOffRequestContext oneOffRequestContext,
       @Assisted String topic,
       @Assisted Consumer<Event> messageProcessor) {
     this.eventsDeserializer = eventsDeserializer;
     this.subscriberMetrics = subscriberMetrics;
+    this.oneOffRequestContext = oneOffRequestContext;
     this.topic = topic;
     this.messageProcessor = messageProcessor;
     this.subscriberProvider = subscriberProvider;
@@ -99,7 +104,7 @@
   @VisibleForTesting
   MessageReceiver getMessageReceiver() {
     return (PubsubMessage message, AckReplyConsumer consumer) -> {
-      try {
+      try (ManualRequestContext ctx = oneOffRequestContext.open()) {
         Event event = eventsDeserializer.deserialize(message.getData().toStringUtf8());
         messageProcessor.accept(event);
         subscriberMetrics.incrementSucceedToConsumeMessage();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
index c144d3b..d9d9fac 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
@@ -26,6 +26,7 @@
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.protobuf.ByteString;
 import com.google.pubsub.v1.PubsubMessage;
@@ -43,6 +44,7 @@
   @Mock PubSubConfiguration confMock;
   @Mock SubscriberProvider subscriberProviderMock;
   @Mock PubSubSubscriberMetrics pubSubSubscriberMetricsMock;
+  @Mock OneOffRequestContext oneOffRequestContext;
   @Mock AckReplyConsumer ackReplyConsumerMock;
   @Mock Consumer<Event> succeedingConsumer;
   @Captor ArgumentCaptor<Event> eventMessageCaptor;
@@ -123,6 +125,7 @@
             subscriberProviderMock,
             confMock,
             pubSubSubscriberMetricsMock,
+            oneOffRequestContext,
             TOPIC,
             consumer)
         .getMessageReceiver();