Do not block authentication operation during websession event send

Send websession events in separate thread to not block authentication
operation. This change helps to reduce performance impact when
event-broker is not performing properly.

Feature: Issue  12305
Change-Id: I340e7f8ac6387aa9226c33037b331875c13db96c
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSession.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSession.java
index 733581c..0ad019c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSession.java
@@ -25,6 +25,7 @@
 import com.google.gerrit.server.cache.CacheModule;
 import com.google.gerrit.server.config.AuthConfig;
 import com.google.gerrit.server.events.EventTypes;
+import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Scopes;
@@ -34,11 +35,20 @@
 import com.googlesource.gerrit.plugins.websession.broker.log.Log4jWebSessionLogger;
 import com.googlesource.gerrit.plugins.websession.broker.log.WebSessionLogger;
 import java.lang.annotation.Annotation;
+import java.util.concurrent.ExecutorService;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 public class BrokerBasedWebSession extends CacheBasedWebSession {
   public static class Module extends CacheModule {
+    private WorkQueue workQueue;
+    private BrokerBasedWebSessionConfiguration configuration;
+
+    @Inject
+    public Module(WorkQueue workQueue, BrokerBasedWebSessionConfiguration configuration) {
+      this.workQueue = workQueue;
+      this.configuration = configuration;
+    }
 
     @Override
     protected void configure() {
@@ -50,6 +60,12 @@
           .to(BrokerBasedWebSession.class)
           .in(RequestScoped.class);
 
+      bind(ExecutorService.class)
+          .annotatedWith(WebSessionProducerExecutor.class)
+          .toInstance(
+              workQueue.createQueue(
+                  configuration.getNumberOfThreads(), "websession-events-producer"));
+
       listener(BrokerBasedWebSessionCache.class);
       listener(BrokerBasedWebSessionCacheCleaner.class);
       listener(Log4jWebSessionLogger.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
index 147fa93..678d246 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
@@ -47,6 +47,7 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 
 @Singleton
 public class BrokerBasedWebSessionCache
@@ -59,6 +60,7 @@
   String webSessionTopicName;
   DynamicItem<BrokerApi> brokerApi;
   TimeMachine timeMachine;
+  ExecutorService executor;
   private final WebSessionLogger webSessionLogger;
 
   @Inject
@@ -68,12 +70,14 @@
       TimeMachine timeMachine,
       PluginConfigFactory cfg,
       @PluginName String pluginName,
-      WebSessionLogger webSessionLogger) {
+      WebSessionLogger webSessionLogger,
+      @WebSessionProducerExecutor ExecutorService executor) {
     this.cache = cache;
     this.brokerApi = brokerApi;
     this.timeMachine = timeMachine;
     this.webSessionTopicName = getWebSessionTopicName(cfg, pluginName);
     this.webSessionLogger = webSessionLogger;
+    this.executor = executor;
   }
 
   protected void processMessage(EventMessage message) {
@@ -184,26 +188,11 @@
   }
 
   private void sendEvent(String key, Val value, WebSessionEvent.Operation operation) {
-    boolean succeeded = false;
-    try (ByteArrayOutputStream out = new ByteArrayOutputStream();
-        ObjectOutputStream objectOutputStream = new ObjectOutputStream(out)) {
-
-      objectOutputStream.writeObject(value);
-      out.flush();
-      byte[] serializedObject = out.toByteArray();
-      WebSessionEvent webSessionEvent = new WebSessionEvent(key, serializedObject, operation);
-      EventMessage message = brokerApi.get().newMessage(UUID.randomUUID(), webSessionEvent);
-      succeeded = brokerApi.get().send(webSessionTopicName, message);
-      if (succeeded) {
-        webSessionLogger.log(
-            Direction.PUBLISH, webSessionTopicName, webSessionEvent, Optional.ofNullable(value));
-      } else {
-        logger.atSevere().log(
-            "Cannot send web-session message for '%s Topic: '%s'", key, webSessionTopicName);
-      }
-    } catch (IOException e) {
+    try {
+      executor.execute(new WebSessionEventTask(key, value, operation));
+    } catch (RuntimeException e) {
       logger.atSevere().withCause(e).log(
-          "Cannot serialize event for account id '%s': [Exception: %s]", value.getAccountId());
+          "Cannot send web-session message for '%s Topic: '%s'", key, webSessionTopicName);
     }
   }
 
@@ -243,4 +232,41 @@
 
   @Override
   public void stop() {}
+
+  private class WebSessionEventTask implements Runnable {
+    private String key;
+    private Val value;
+    private WebSessionEvent.Operation operation;
+
+    public WebSessionEventTask(String key, Val value, WebSessionEvent.Operation operation) {
+      this.key = key;
+      this.value = value;
+      this.operation = operation;
+    }
+
+    @Override
+    public void run() {
+      boolean succeeded = false;
+      try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+          ObjectOutputStream objectOutputStream = new ObjectOutputStream(out)) {
+
+        objectOutputStream.writeObject(value);
+        out.flush();
+        byte[] serializedObject = out.toByteArray();
+        WebSessionEvent webSessionEvent = new WebSessionEvent(key, serializedObject, operation);
+        EventMessage message = brokerApi.get().newMessage(UUID.randomUUID(), webSessionEvent);
+        succeeded = brokerApi.get().send(webSessionTopicName, message);
+        if (succeeded) {
+          webSessionLogger.log(
+              Direction.PUBLISH, webSessionTopicName, webSessionEvent, Optional.ofNullable(value));
+        } else {
+          logger.atSevere().log(
+              "Cannot send web-session message for '%s Topic: '%s'", key, webSessionTopicName);
+        }
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log(
+            "Cannot serialize event for account id '%s': [Exception: %s]", value.getAccountId());
+      }
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionConfiguration.java
new file mode 100644
index 0000000..8f103a7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionConfiguration.java
@@ -0,0 +1,40 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.websession.broker;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.PluginConfig;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import com.google.inject.Inject;
+
+public class BrokerBasedWebSessionConfiguration {
+
+  private static final int DEFAULT_NUMBER_OF_THREADS = 1;
+
+  private final Integer numberOfThreads;
+
+  @Inject
+  public BrokerBasedWebSessionConfiguration(
+      PluginConfigFactory configFactory, @PluginName String pluginName) {
+
+    PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
+
+    this.numberOfThreads = fromGerritConfig.getInt("numberOfThreads", DEFAULT_NUMBER_OF_THREADS);
+  }
+
+  public Integer getNumberOfThreads() {
+    return numberOfThreads;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/WebSessionProducerExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/WebSessionProducerExecutor.java
new file mode 100644
index 0000000..5d59852
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/WebSessionProducerExecutor.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.websession.broker;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+public @interface WebSessionProducerExecutor {}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 7ac2e37..a834904 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -18,6 +18,10 @@
 :   Name of the topic to use for publishing web session events.
     Default: gerrit\_web\_session
 
+`plugin.websession-broker.numberOfThreads`
+:   Number of threads used to send web session events via events-broker.
+    Default: 1
+
 `plugin.websession-broker.cleanupInterval`
 :   Frequency of the expired web session cleanup operation.
     Value should use common time unit suffixes to express their setting:
diff --git a/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
index 60e995e..e6cb20e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
@@ -28,6 +28,7 @@
 import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
 import com.google.common.cache.Cache;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.httpd.WebSessionManager.Val;
 import com.google.gerrit.server.config.PluginConfig;
@@ -40,6 +41,7 @@
 import java.time.Instant;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -68,6 +70,8 @@
         52, 121, 110, 65, 100, 110, 113, 99, 68, 45, 105, 99, 75, 97, 0, 120
       };
 
+  ExecutorService executorServce = MoreExecutors.newDirectExecutorService();
+
   @Mock BrokerApi brokerApi;
   @Mock Cache<String, Val> cache;
   @Mock TimeMachine timeMachine;
@@ -88,7 +92,7 @@
     DynamicItem<BrokerApi> item = DynamicItem.itemOf(BrokerApi.class, brokerApi);
     objectUnderTest =
         new BrokerBasedWebSessionCache(
-            cache, item, timeMachine, cfg, PLUGIN_NAME, webSessionLogger);
+            cache, item, timeMachine, cfg, PLUGIN_NAME, webSessionLogger, executorServce);
   }
 
   @Test
@@ -98,7 +102,6 @@
     when(brokerApi.newMessage(any(UUID.class), any(Event.class))).thenReturn(eventMessage);
 
     objectUnderTest.put(KEY, value);
-
     verify(brokerApi, times(1)).send(anyString(), eventCaptor.capture());
 
     assertThat(eventCaptor.getValue().getEvent()).isNotNull();