Merge branch 'stable-3.3' into stable-3.4

* stable-3.3:
  Bump events-broker to v3.3.2

Change-Id: I5f4a99fa43a96ba267a608734e6f1323a9cae80b
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 67e5a5f..967f8dc 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -16,8 +16,8 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.3.2",
-        sha1 = "d8bcb77047cc12dd7c623b5b4de70a25499d3d6c",
+        artifact = "com.gerritforge:events-broker:3.4.0.4",
+        sha1 = "8d361d863382290e33828116e65698190118d0f1",
     )
 
     maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
index 678d246..a0d9218 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
@@ -15,17 +15,21 @@
 package com.googlesource.gerrit.plugins.websession.broker;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheStats;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gerrit.common.Nullable;
 import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.httpd.WebSessionManager;
 import com.google.gerrit.httpd.WebSessionManager.Val;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.gerrit.server.config.PluginConfigFactory;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
@@ -43,7 +47,6 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -62,6 +65,7 @@
   TimeMachine timeMachine;
   ExecutorService executor;
   private final WebSessionLogger webSessionLogger;
+  private String instanceId;
 
   @Inject
   public BrokerBasedWebSessionCache(
@@ -71,23 +75,24 @@
       PluginConfigFactory cfg,
       @PluginName String pluginName,
       WebSessionLogger webSessionLogger,
-      @WebSessionProducerExecutor ExecutorService executor) {
+      @WebSessionProducerExecutor ExecutorService executor,
+      @Nullable @GerritInstanceId String gerritInstanceId) {
     this.cache = cache;
     this.brokerApi = brokerApi;
     this.timeMachine = timeMachine;
     this.webSessionTopicName = getWebSessionTopicName(cfg, pluginName);
     this.webSessionLogger = webSessionLogger;
     this.executor = executor;
+    this.instanceId = gerritInstanceId;
   }
 
-  protected void processMessage(EventMessage message) {
-    if (!WebSessionEvent.TYPE.equals(message.getEvent().getType())) {
-      logger.atWarning().log(
-          "Skipping web session message of unknown type:{}", message.getEvent().getType());
+  protected void processMessage(Event message) {
+    if (!WebSessionEvent.TYPE.equals(message.getType())) {
+      logger.atWarning().log("Skipping web session message of unknown type:{}", message.getType());
       return;
     }
 
-    WebSessionEvent event = (WebSessionEvent) message.getEvent();
+    WebSessionEvent event = (WebSessionEvent) message;
 
     switch (event.operation) {
       case ADD:
@@ -102,8 +107,7 @@
           }
 
         } catch (IOException | ClassNotFoundException e) {
-          logger.atSevere().withCause(e).log(
-              "Malformed event '%s': [Exception: %s]", message.getHeader());
+          logger.atSevere().withCause(e).log("Malformed event '%s': [Exception: %s]", message);
         }
         break;
       case REMOVE:
@@ -246,7 +250,6 @@
 
     @Override
     public void run() {
-      boolean succeeded = false;
       try (ByteArrayOutputStream out = new ByteArrayOutputStream();
           ObjectOutputStream objectOutputStream = new ObjectOutputStream(out)) {
 
@@ -254,15 +257,29 @@
         out.flush();
         byte[] serializedObject = out.toByteArray();
         WebSessionEvent webSessionEvent = new WebSessionEvent(key, serializedObject, operation);
-        EventMessage message = brokerApi.get().newMessage(UUID.randomUUID(), webSessionEvent);
-        succeeded = brokerApi.get().send(webSessionTopicName, message);
-        if (succeeded) {
-          webSessionLogger.log(
-              Direction.PUBLISH, webSessionTopicName, webSessionEvent, Optional.ofNullable(value));
-        } else {
-          logger.atSevere().log(
-              "Cannot send web-session message for '%s Topic: '%s'", key, webSessionTopicName);
-        }
+        webSessionEvent.instanceId = instanceId;
+        ListenableFuture<Boolean> resultF =
+            brokerApi.get().send(webSessionTopicName, webSessionEvent);
+        Futures.addCallback(
+            resultF,
+            new FutureCallback<Boolean>() {
+              @Override
+              public void onSuccess(Boolean aBoolean) {
+                webSessionLogger.log(
+                    Direction.PUBLISH,
+                    webSessionTopicName,
+                    webSessionEvent,
+                    Optional.ofNullable(value));
+              }
+
+              @Override
+              public void onFailure(Throwable throwable) {
+                logger.atSevere().log(
+                    "Cannot send web-session message for '%s Topic: '%s'",
+                    key, webSessionTopicName);
+              }
+            },
+            MoreExecutors.directExecutor());
       } catch (IOException e) {
         logger.atSevere().withCause(e).log(
             "Cannot serialize event for account id '%s': [Exception: %s]", value.getAccountId());
diff --git a/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
index 8bfeb56..c8afe82 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
@@ -15,15 +15,12 @@
 package com.googlesource.gerrit.plugins.websession.broker;
 
 import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -40,7 +37,6 @@
 import com.googlesource.gerrit.plugins.websession.broker.log.WebSessionLogger;
 import com.googlesource.gerrit.plugins.websession.broker.util.TimeMachine;
 import java.time.Instant;
-import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import org.junit.Before;
 import org.junit.Test;
@@ -79,11 +75,13 @@
   @Mock PluginConfigFactory cfg;
   @Mock PluginConfig pluginConfig;
   @Mock WebSessionLogger webSessionLogger;
-  @Captor ArgumentCaptor<EventMessage> eventCaptor;
+  @Captor ArgumentCaptor<Event> eventCaptor;
   @Captor ArgumentCaptor<Val> valCaptor;
 
   BrokerBasedWebSessionCache objectUnderTest;
 
+  String instanceId = "instance-id";
+
   @Before
   public void setup() {
     cache = CacheBuilder.newBuilder().build();
@@ -94,20 +92,26 @@
     DynamicItem<BrokerApi> item = DynamicItem.itemOf(BrokerApi.class, brokerApi);
     objectUnderTest =
         new BrokerBasedWebSessionCache(
-            cache, item, timeMachine, cfg, PLUGIN_NAME, webSessionLogger, executorServce);
+            cache,
+            item,
+            timeMachine,
+            cfg,
+            PLUGIN_NAME,
+            webSessionLogger,
+            executorServce,
+            instanceId);
   }
 
   @Test
   public void shouldPublishMessageWhenLoginEvent() {
-    EventMessage eventMessage = createEventMessage();
+    WebSessionEvent eventMessage = createEventMessage();
     Val value = createVal(eventMessage);
-    when(brokerApi.newMessage(any(UUID.class), any(Event.class))).thenReturn(eventMessage);
 
     objectUnderTest.put(KEY, value);
     verify(brokerApi, times(1)).send(anyString(), eventCaptor.capture());
 
-    assertThat(eventCaptor.getValue().getEvent()).isNotNull();
-    WebSessionEvent event = (WebSessionEvent) eventCaptor.getValue().getEvent();
+    assertThat(eventCaptor.getValue()).isNotNull();
+    WebSessionEvent event = (WebSessionEvent) eventCaptor.getValue();
     assertThat(event.operation).isEqualTo(WebSessionEvent.Operation.ADD);
     assertThat(event.key).isEqualTo(KEY);
     assertThat(event.payload).isEqualTo(defaultPayload);
@@ -115,15 +119,12 @@
 
   @Test
   public void shouldPublishMessageWhenLogoutEvent() {
-    EventMessage eventMessage = createEventMessage(emptyPayload, Operation.REMOVE);
-    when(brokerApi.newMessage(any(UUID.class), any(Event.class))).thenReturn(eventMessage);
-
     objectUnderTest.invalidate(KEY);
 
     verify(brokerApi, times(1)).send(anyString(), eventCaptor.capture());
 
-    assertThat(eventCaptor.getValue().getEvent()).isNotNull();
-    WebSessionEvent event = (WebSessionEvent) eventCaptor.getValue().getEvent();
+    assertThat(eventCaptor.getValue()).isNotNull();
+    WebSessionEvent event = (WebSessionEvent) eventCaptor.getValue();
     assertThat(event.operation).isEqualTo(WebSessionEvent.Operation.REMOVE);
     assertThat(event.key).isEqualTo(KEY);
     assertThat(event.payload).isEqualTo(emptyPayload);
@@ -131,7 +132,7 @@
 
   @Test
   public void shouldUpdateCacheWhenLoginMessageReceived() {
-    EventMessage eventMessage = createEventMessage();
+    WebSessionEvent eventMessage = createEventMessage();
 
     objectUnderTest.processMessage(eventMessage);
 
@@ -142,7 +143,7 @@
 
   @Test
   public void shouldUpdateCacheWhenLogoutMessageReceived() {
-    EventMessage eventMessage = createEventMessage(emptyPayload, Operation.REMOVE);
+    WebSessionEvent eventMessage = createEventMessage(emptyPayload, Operation.REMOVE);
     cache.put(KEY, VAL);
 
     objectUnderTest.processMessage(eventMessage);
@@ -154,7 +155,7 @@
   public void shouldCleanupExpiredSessions() {
     when(timeMachine.now()).thenReturn(Instant.MIN, Instant.MAX);
 
-    EventMessage eventMessage = createEventMessage();
+    WebSessionEvent eventMessage = createEventMessage();
 
     objectUnderTest.processMessage(eventMessage);
 
@@ -166,30 +167,25 @@
     assertThat(cache.getIfPresent(eventMessageKey(eventMessage))).isNull();
   }
 
-  private Val createVal(EventMessage message) {
-    WebSessionEvent event = (WebSessionEvent) message.getEvent();
+  private Val createVal(Event message) {
+    WebSessionEvent event = (WebSessionEvent) message;
 
     objectUnderTest.processMessage(message);
     return cache.getIfPresent(event.key);
   }
 
-  private EventMessage createEventMessage() {
+  private WebSessionEvent createEventMessage() {
 
     return createEventMessage(defaultPayload, Operation.ADD);
   }
 
-  private String eventMessageKey(EventMessage eventMessage) {
-    WebSessionEvent sessionEvent = (WebSessionEvent) eventMessage.getEvent();
-    return sessionEvent.key;
+  private String eventMessageKey(WebSessionEvent eventMessage) {
+    return eventMessage.key;
   }
 
-  private EventMessage createEventMessage(byte[] payload, Operation operation) {
-
-    Header header =
-        new Header(
-            UUID.fromString("7cb80dbe-65c4-4f2c-84de-580d98199d4a"),
-            UUID.fromString("97711495-1013-414e-bfd2-44776787520d"));
+  private WebSessionEvent createEventMessage(byte[] payload, Operation operation) {
     WebSessionEvent event = new WebSessionEvent(KEY, payload, operation);
-    return new EventMessage(header, event);
+    event.instanceId = instanceId;
+    return event;
   }
 }