Merge branch 'stable-3.3' into stable-3.4
* stable-3.3:
Bump events-broker to v3.3.2
Change-Id: I5f4a99fa43a96ba267a608734e6f1323a9cae80b
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 67e5a5f..967f8dc 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -16,8 +16,8 @@
maven_jar(
name = "events-broker",
- artifact = "com.gerritforge:events-broker:3.3.2",
- sha1 = "d8bcb77047cc12dd7c623b5b4de70a25499d3d6c",
+ artifact = "com.gerritforge:events-broker:3.4.0.4",
+ sha1 = "8d361d863382290e33828116e65698190118d0f1",
)
maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
index 678d246..a0d9218 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
@@ -15,17 +15,21 @@
package com.googlesource.gerrit.plugins.websession.broker;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheStats;
import com.google.common.collect.ImmutableMap;
import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.httpd.WebSessionManager;
import com.google.gerrit.httpd.WebSessionManager.Val;
+import com.google.gerrit.server.config.GerritInstanceId;
import com.google.gerrit.server.config.PluginConfigFactory;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
@@ -43,7 +47,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -62,6 +65,7 @@
TimeMachine timeMachine;
ExecutorService executor;
private final WebSessionLogger webSessionLogger;
+ private String instanceId;
@Inject
public BrokerBasedWebSessionCache(
@@ -71,23 +75,24 @@
PluginConfigFactory cfg,
@PluginName String pluginName,
WebSessionLogger webSessionLogger,
- @WebSessionProducerExecutor ExecutorService executor) {
+ @WebSessionProducerExecutor ExecutorService executor,
+ @Nullable @GerritInstanceId String gerritInstanceId) {
this.cache = cache;
this.brokerApi = brokerApi;
this.timeMachine = timeMachine;
this.webSessionTopicName = getWebSessionTopicName(cfg, pluginName);
this.webSessionLogger = webSessionLogger;
this.executor = executor;
+ this.instanceId = gerritInstanceId;
}
- protected void processMessage(EventMessage message) {
- if (!WebSessionEvent.TYPE.equals(message.getEvent().getType())) {
- logger.atWarning().log(
- "Skipping web session message of unknown type:{}", message.getEvent().getType());
+ protected void processMessage(Event message) {
+ if (!WebSessionEvent.TYPE.equals(message.getType())) {
+ logger.atWarning().log("Skipping web session message of unknown type:{}", message.getType());
return;
}
- WebSessionEvent event = (WebSessionEvent) message.getEvent();
+ WebSessionEvent event = (WebSessionEvent) message;
switch (event.operation) {
case ADD:
@@ -102,8 +107,7 @@
}
} catch (IOException | ClassNotFoundException e) {
- logger.atSevere().withCause(e).log(
- "Malformed event '%s': [Exception: %s]", message.getHeader());
+ logger.atSevere().withCause(e).log("Malformed event '%s': [Exception: %s]", message);
}
break;
case REMOVE:
@@ -246,7 +250,6 @@
@Override
public void run() {
- boolean succeeded = false;
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(out)) {
@@ -254,15 +257,29 @@
out.flush();
byte[] serializedObject = out.toByteArray();
WebSessionEvent webSessionEvent = new WebSessionEvent(key, serializedObject, operation);
- EventMessage message = brokerApi.get().newMessage(UUID.randomUUID(), webSessionEvent);
- succeeded = brokerApi.get().send(webSessionTopicName, message);
- if (succeeded) {
- webSessionLogger.log(
- Direction.PUBLISH, webSessionTopicName, webSessionEvent, Optional.ofNullable(value));
- } else {
- logger.atSevere().log(
- "Cannot send web-session message for '%s Topic: '%s'", key, webSessionTopicName);
- }
+ webSessionEvent.instanceId = instanceId;
+ ListenableFuture<Boolean> resultF =
+ brokerApi.get().send(webSessionTopicName, webSessionEvent);
+ Futures.addCallback(
+ resultF,
+ new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(Boolean aBoolean) {
+ webSessionLogger.log(
+ Direction.PUBLISH,
+ webSessionTopicName,
+ webSessionEvent,
+ Optional.ofNullable(value));
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ logger.atSevere().log(
+ "Cannot send web-session message for '%s Topic: '%s'",
+ key, webSessionTopicName);
+ }
+ },
+ MoreExecutors.directExecutor());
} catch (IOException e) {
logger.atSevere().withCause(e).log(
"Cannot serialize event for account id '%s': [Exception: %s]", value.getAccountId());
diff --git a/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
index 8bfeb56..c8afe82 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
@@ -15,15 +15,12 @@
package com.googlesource.gerrit.plugins.websession.broker;
import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.MoreExecutors;
@@ -40,7 +37,6 @@
import com.googlesource.gerrit.plugins.websession.broker.log.WebSessionLogger;
import com.googlesource.gerrit.plugins.websession.broker.util.TimeMachine;
import java.time.Instant;
-import java.util.UUID;
import java.util.concurrent.ExecutorService;
import org.junit.Before;
import org.junit.Test;
@@ -79,11 +75,13 @@
@Mock PluginConfigFactory cfg;
@Mock PluginConfig pluginConfig;
@Mock WebSessionLogger webSessionLogger;
- @Captor ArgumentCaptor<EventMessage> eventCaptor;
+ @Captor ArgumentCaptor<Event> eventCaptor;
@Captor ArgumentCaptor<Val> valCaptor;
BrokerBasedWebSessionCache objectUnderTest;
+ String instanceId = "instance-id";
+
@Before
public void setup() {
cache = CacheBuilder.newBuilder().build();
@@ -94,20 +92,26 @@
DynamicItem<BrokerApi> item = DynamicItem.itemOf(BrokerApi.class, brokerApi);
objectUnderTest =
new BrokerBasedWebSessionCache(
- cache, item, timeMachine, cfg, PLUGIN_NAME, webSessionLogger, executorServce);
+ cache,
+ item,
+ timeMachine,
+ cfg,
+ PLUGIN_NAME,
+ webSessionLogger,
+ executorServce,
+ instanceId);
}
@Test
public void shouldPublishMessageWhenLoginEvent() {
- EventMessage eventMessage = createEventMessage();
+ WebSessionEvent eventMessage = createEventMessage();
Val value = createVal(eventMessage);
- when(brokerApi.newMessage(any(UUID.class), any(Event.class))).thenReturn(eventMessage);
objectUnderTest.put(KEY, value);
verify(brokerApi, times(1)).send(anyString(), eventCaptor.capture());
- assertThat(eventCaptor.getValue().getEvent()).isNotNull();
- WebSessionEvent event = (WebSessionEvent) eventCaptor.getValue().getEvent();
+ assertThat(eventCaptor.getValue()).isNotNull();
+ WebSessionEvent event = (WebSessionEvent) eventCaptor.getValue();
assertThat(event.operation).isEqualTo(WebSessionEvent.Operation.ADD);
assertThat(event.key).isEqualTo(KEY);
assertThat(event.payload).isEqualTo(defaultPayload);
@@ -115,15 +119,12 @@
@Test
public void shouldPublishMessageWhenLogoutEvent() {
- EventMessage eventMessage = createEventMessage(emptyPayload, Operation.REMOVE);
- when(brokerApi.newMessage(any(UUID.class), any(Event.class))).thenReturn(eventMessage);
-
objectUnderTest.invalidate(KEY);
verify(brokerApi, times(1)).send(anyString(), eventCaptor.capture());
- assertThat(eventCaptor.getValue().getEvent()).isNotNull();
- WebSessionEvent event = (WebSessionEvent) eventCaptor.getValue().getEvent();
+ assertThat(eventCaptor.getValue()).isNotNull();
+ WebSessionEvent event = (WebSessionEvent) eventCaptor.getValue();
assertThat(event.operation).isEqualTo(WebSessionEvent.Operation.REMOVE);
assertThat(event.key).isEqualTo(KEY);
assertThat(event.payload).isEqualTo(emptyPayload);
@@ -131,7 +132,7 @@
@Test
public void shouldUpdateCacheWhenLoginMessageReceived() {
- EventMessage eventMessage = createEventMessage();
+ WebSessionEvent eventMessage = createEventMessage();
objectUnderTest.processMessage(eventMessage);
@@ -142,7 +143,7 @@
@Test
public void shouldUpdateCacheWhenLogoutMessageReceived() {
- EventMessage eventMessage = createEventMessage(emptyPayload, Operation.REMOVE);
+ WebSessionEvent eventMessage = createEventMessage(emptyPayload, Operation.REMOVE);
cache.put(KEY, VAL);
objectUnderTest.processMessage(eventMessage);
@@ -154,7 +155,7 @@
public void shouldCleanupExpiredSessions() {
when(timeMachine.now()).thenReturn(Instant.MIN, Instant.MAX);
- EventMessage eventMessage = createEventMessage();
+ WebSessionEvent eventMessage = createEventMessage();
objectUnderTest.processMessage(eventMessage);
@@ -166,30 +167,25 @@
assertThat(cache.getIfPresent(eventMessageKey(eventMessage))).isNull();
}
- private Val createVal(EventMessage message) {
- WebSessionEvent event = (WebSessionEvent) message.getEvent();
+ private Val createVal(Event message) {
+ WebSessionEvent event = (WebSessionEvent) message;
objectUnderTest.processMessage(message);
return cache.getIfPresent(event.key);
}
- private EventMessage createEventMessage() {
+ private WebSessionEvent createEventMessage() {
return createEventMessage(defaultPayload, Operation.ADD);
}
- private String eventMessageKey(EventMessage eventMessage) {
- WebSessionEvent sessionEvent = (WebSessionEvent) eventMessage.getEvent();
- return sessionEvent.key;
+ private String eventMessageKey(WebSessionEvent eventMessage) {
+ return eventMessage.key;
}
- private EventMessage createEventMessage(byte[] payload, Operation operation) {
-
- Header header =
- new Header(
- UUID.fromString("7cb80dbe-65c4-4f2c-84de-580d98199d4a"),
- UUID.fromString("97711495-1013-414e-bfd2-44776787520d"));
+ private WebSessionEvent createEventMessage(byte[] payload, Operation operation) {
WebSessionEvent event = new WebSessionEvent(KEY, payload, operation);
- return new EventMessage(header, event);
+ event.instanceId = instanceId;
+ return event;
}
}