Do not block authentication operation during websession event send
Send websession events in separate thread to not block authentication
operation. This change helps to reduce performance impact when
event-broker is not performing properly.
Feature: Issue 12305
Change-Id: I340e7f8ac6387aa9226c33037b331875c13db96c
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSession.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSession.java
index 733581c..0ad019c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSession.java
@@ -25,6 +25,7 @@
import com.google.gerrit.server.cache.CacheModule;
import com.google.gerrit.server.config.AuthConfig;
import com.google.gerrit.server.events.EventTypes;
+import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Scopes;
@@ -34,11 +35,20 @@
import com.googlesource.gerrit.plugins.websession.broker.log.Log4jWebSessionLogger;
import com.googlesource.gerrit.plugins.websession.broker.log.WebSessionLogger;
import java.lang.annotation.Annotation;
+import java.util.concurrent.ExecutorService;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
public class BrokerBasedWebSession extends CacheBasedWebSession {
public static class Module extends CacheModule {
+ private WorkQueue workQueue;
+ private BrokerBasedWebSessionConfiguration configuration;
+
+ @Inject
+ public Module(WorkQueue workQueue, BrokerBasedWebSessionConfiguration configuration) {
+ this.workQueue = workQueue;
+ this.configuration = configuration;
+ }
@Override
protected void configure() {
@@ -50,6 +60,12 @@
.to(BrokerBasedWebSession.class)
.in(RequestScoped.class);
+ bind(ExecutorService.class)
+ .annotatedWith(WebSessionProducerExecutor.class)
+ .toInstance(
+ workQueue.createQueue(
+ configuration.getNumberOfThreads(), "websession-events-producer"));
+
listener(BrokerBasedWebSessionCache.class);
listener(BrokerBasedWebSessionCacheCleaner.class);
listener(Log4jWebSessionLogger.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
index 147fa93..678d246 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
@@ -47,6 +47,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
@Singleton
public class BrokerBasedWebSessionCache
@@ -59,6 +60,7 @@
String webSessionTopicName;
DynamicItem<BrokerApi> brokerApi;
TimeMachine timeMachine;
+ ExecutorService executor;
private final WebSessionLogger webSessionLogger;
@Inject
@@ -68,12 +70,14 @@
TimeMachine timeMachine,
PluginConfigFactory cfg,
@PluginName String pluginName,
- WebSessionLogger webSessionLogger) {
+ WebSessionLogger webSessionLogger,
+ @WebSessionProducerExecutor ExecutorService executor) {
this.cache = cache;
this.brokerApi = brokerApi;
this.timeMachine = timeMachine;
this.webSessionTopicName = getWebSessionTopicName(cfg, pluginName);
this.webSessionLogger = webSessionLogger;
+ this.executor = executor;
}
protected void processMessage(EventMessage message) {
@@ -184,26 +188,11 @@
}
private void sendEvent(String key, Val value, WebSessionEvent.Operation operation) {
- boolean succeeded = false;
- try (ByteArrayOutputStream out = new ByteArrayOutputStream();
- ObjectOutputStream objectOutputStream = new ObjectOutputStream(out)) {
-
- objectOutputStream.writeObject(value);
- out.flush();
- byte[] serializedObject = out.toByteArray();
- WebSessionEvent webSessionEvent = new WebSessionEvent(key, serializedObject, operation);
- EventMessage message = brokerApi.get().newMessage(UUID.randomUUID(), webSessionEvent);
- succeeded = brokerApi.get().send(webSessionTopicName, message);
- if (succeeded) {
- webSessionLogger.log(
- Direction.PUBLISH, webSessionTopicName, webSessionEvent, Optional.ofNullable(value));
- } else {
- logger.atSevere().log(
- "Cannot send web-session message for '%s Topic: '%s'", key, webSessionTopicName);
- }
- } catch (IOException e) {
+ try {
+ executor.execute(new WebSessionEventTask(key, value, operation));
+ } catch (RuntimeException e) {
logger.atSevere().withCause(e).log(
- "Cannot serialize event for account id '%s': [Exception: %s]", value.getAccountId());
+ "Cannot send web-session message for '%s Topic: '%s'", key, webSessionTopicName);
}
}
@@ -243,4 +232,41 @@
@Override
public void stop() {}
+
+ private class WebSessionEventTask implements Runnable {
+ private String key;
+ private Val value;
+ private WebSessionEvent.Operation operation;
+
+ public WebSessionEventTask(String key, Val value, WebSessionEvent.Operation operation) {
+ this.key = key;
+ this.value = value;
+ this.operation = operation;
+ }
+
+ @Override
+ public void run() {
+ boolean succeeded = false;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ObjectOutputStream objectOutputStream = new ObjectOutputStream(out)) {
+
+ objectOutputStream.writeObject(value);
+ out.flush();
+ byte[] serializedObject = out.toByteArray();
+ WebSessionEvent webSessionEvent = new WebSessionEvent(key, serializedObject, operation);
+ EventMessage message = brokerApi.get().newMessage(UUID.randomUUID(), webSessionEvent);
+ succeeded = brokerApi.get().send(webSessionTopicName, message);
+ if (succeeded) {
+ webSessionLogger.log(
+ Direction.PUBLISH, webSessionTopicName, webSessionEvent, Optional.ofNullable(value));
+ } else {
+ logger.atSevere().log(
+ "Cannot send web-session message for '%s Topic: '%s'", key, webSessionTopicName);
+ }
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log(
+ "Cannot serialize event for account id '%s': [Exception: %s]", value.getAccountId());
+ }
+ }
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionConfiguration.java
new file mode 100644
index 0000000..8f103a7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionConfiguration.java
@@ -0,0 +1,40 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.websession.broker;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.PluginConfig;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import com.google.inject.Inject;
+
+public class BrokerBasedWebSessionConfiguration {
+
+ private static final int DEFAULT_NUMBER_OF_THREADS = 1;
+
+ private final Integer numberOfThreads;
+
+ @Inject
+ public BrokerBasedWebSessionConfiguration(
+ PluginConfigFactory configFactory, @PluginName String pluginName) {
+
+ PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
+
+ this.numberOfThreads = fromGerritConfig.getInt("numberOfThreads", DEFAULT_NUMBER_OF_THREADS);
+ }
+
+ public Integer getNumberOfThreads() {
+ return numberOfThreads;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/WebSessionProducerExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/WebSessionProducerExecutor.java
new file mode 100644
index 0000000..5d59852
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/WebSessionProducerExecutor.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.websession.broker;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+public @interface WebSessionProducerExecutor {}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 7ac2e37..a834904 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -18,6 +18,10 @@
: Name of the topic to use for publishing web session events.
Default: gerrit\_web\_session
+`plugin.websession-broker.numberOfThreads`
+: Number of threads used to send web session events via events-broker.
+ Default: 1
+
`plugin.websession-broker.cleanupInterval`
: Frequency of the expired web session cleanup operation.
Value should use common time unit suffixes to express their setting:
diff --git a/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
index 60e995e..e6cb20e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
@@ -28,6 +28,7 @@
import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
import com.google.common.cache.Cache;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.httpd.WebSessionManager.Val;
import com.google.gerrit.server.config.PluginConfig;
@@ -40,6 +41,7 @@
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -68,6 +70,8 @@
52, 121, 110, 65, 100, 110, 113, 99, 68, 45, 105, 99, 75, 97, 0, 120
};
+ ExecutorService executorServce = MoreExecutors.newDirectExecutorService();
+
@Mock BrokerApi brokerApi;
@Mock Cache<String, Val> cache;
@Mock TimeMachine timeMachine;
@@ -88,7 +92,7 @@
DynamicItem<BrokerApi> item = DynamicItem.itemOf(BrokerApi.class, brokerApi);
objectUnderTest =
new BrokerBasedWebSessionCache(
- cache, item, timeMachine, cfg, PLUGIN_NAME, webSessionLogger);
+ cache, item, timeMachine, cfg, PLUGIN_NAME, webSessionLogger, executorServce);
}
@Test
@@ -98,7 +102,6 @@
when(brokerApi.newMessage(any(UUID.class), any(Event.class))).thenReturn(eventMessage);
objectUnderTest.put(KEY, value);
-
verify(brokerApi, times(1)).send(anyString(), eventCaptor.capture());
assertThat(eventCaptor.getValue().getEvent()).isNotNull();