Send/receive Event object instead of EventMessage

Event object contains instance id populated by Gerrit Core. Workaround
with EventMessage using sourceInstanceId field to recognise the event
source node is not needed anymore. Use Event object instead of
EventMessage.

Bug: Issue 14390
Change-Id: I249e808b7f56eeb94d65518fb0e1fd3b8913f9f0
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 4501db7..967f8dc 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -16,8 +16,8 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.4.0-rc2",
-        sha1 = "f72b4166e6d785fd1a41c997a4ffb14461dd7d87",
+        artifact = "com.gerritforge:events-broker:3.4.0.4",
+        sha1 = "8d361d863382290e33828116e65698190118d0f1",
     )
 
     maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
index bc78599..a0d9218 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
@@ -15,7 +15,6 @@
 package com.googlesource.gerrit.plugins.websession.broker;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheStats;
 import com.google.common.collect.ImmutableMap;
@@ -30,6 +29,7 @@
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.httpd.WebSessionManager;
 import com.google.gerrit.httpd.WebSessionManager.Val;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.gerrit.server.config.PluginConfigFactory;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
@@ -47,7 +47,6 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -66,6 +65,7 @@
   TimeMachine timeMachine;
   ExecutorService executor;
   private final WebSessionLogger webSessionLogger;
+  private String instanceId;
 
   @Inject
   public BrokerBasedWebSessionCache(
@@ -75,23 +75,24 @@
       PluginConfigFactory cfg,
       @PluginName String pluginName,
       WebSessionLogger webSessionLogger,
-      @WebSessionProducerExecutor ExecutorService executor) {
+      @WebSessionProducerExecutor ExecutorService executor,
+      @Nullable @GerritInstanceId String gerritInstanceId) {
     this.cache = cache;
     this.brokerApi = brokerApi;
     this.timeMachine = timeMachine;
     this.webSessionTopicName = getWebSessionTopicName(cfg, pluginName);
     this.webSessionLogger = webSessionLogger;
     this.executor = executor;
+    this.instanceId = gerritInstanceId;
   }
 
-  protected void processMessage(EventMessage message) {
-    if (!WebSessionEvent.TYPE.equals(message.getEvent().getType())) {
-      logger.atWarning().log(
-          "Skipping web session message of unknown type:{}", message.getEvent().getType());
+  protected void processMessage(Event message) {
+    if (!WebSessionEvent.TYPE.equals(message.getType())) {
+      logger.atWarning().log("Skipping web session message of unknown type:{}", message.getType());
       return;
     }
 
-    WebSessionEvent event = (WebSessionEvent) message.getEvent();
+    WebSessionEvent event = (WebSessionEvent) message;
 
     switch (event.operation) {
       case ADD:
@@ -106,8 +107,7 @@
           }
 
         } catch (IOException | ClassNotFoundException e) {
-          logger.atSevere().withCause(e).log(
-              "Malformed event '%s': [Exception: %s]", message.getHeader());
+          logger.atSevere().withCause(e).log("Malformed event '%s': [Exception: %s]", message);
         }
         break;
       case REMOVE:
@@ -257,8 +257,9 @@
         out.flush();
         byte[] serializedObject = out.toByteArray();
         WebSessionEvent webSessionEvent = new WebSessionEvent(key, serializedObject, operation);
-        EventMessage message = brokerApi.get().newMessage(UUID.randomUUID(), webSessionEvent);
-        ListenableFuture<Boolean> resultF = brokerApi.get().send(webSessionTopicName, message);
+        webSessionEvent.instanceId = instanceId;
+        ListenableFuture<Boolean> resultF =
+            brokerApi.get().send(webSessionTopicName, webSessionEvent);
         Futures.addCallback(
             resultF,
             new FutureCallback<Boolean>() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
index 8bfeb56..c8afe82 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
@@ -15,15 +15,12 @@
 package com.googlesource.gerrit.plugins.websession.broker;
 
 import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -40,7 +37,6 @@
 import com.googlesource.gerrit.plugins.websession.broker.log.WebSessionLogger;
 import com.googlesource.gerrit.plugins.websession.broker.util.TimeMachine;
 import java.time.Instant;
-import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import org.junit.Before;
 import org.junit.Test;
@@ -79,11 +75,13 @@
   @Mock PluginConfigFactory cfg;
   @Mock PluginConfig pluginConfig;
   @Mock WebSessionLogger webSessionLogger;
-  @Captor ArgumentCaptor<EventMessage> eventCaptor;
+  @Captor ArgumentCaptor<Event> eventCaptor;
   @Captor ArgumentCaptor<Val> valCaptor;
 
   BrokerBasedWebSessionCache objectUnderTest;
 
+  String instanceId = "instance-id";
+
   @Before
   public void setup() {
     cache = CacheBuilder.newBuilder().build();
@@ -94,20 +92,26 @@
     DynamicItem<BrokerApi> item = DynamicItem.itemOf(BrokerApi.class, brokerApi);
     objectUnderTest =
         new BrokerBasedWebSessionCache(
-            cache, item, timeMachine, cfg, PLUGIN_NAME, webSessionLogger, executorServce);
+            cache,
+            item,
+            timeMachine,
+            cfg,
+            PLUGIN_NAME,
+            webSessionLogger,
+            executorServce,
+            instanceId);
   }
 
   @Test
   public void shouldPublishMessageWhenLoginEvent() {
-    EventMessage eventMessage = createEventMessage();
+    WebSessionEvent eventMessage = createEventMessage();
     Val value = createVal(eventMessage);
-    when(brokerApi.newMessage(any(UUID.class), any(Event.class))).thenReturn(eventMessage);
 
     objectUnderTest.put(KEY, value);
     verify(brokerApi, times(1)).send(anyString(), eventCaptor.capture());
 
-    assertThat(eventCaptor.getValue().getEvent()).isNotNull();
-    WebSessionEvent event = (WebSessionEvent) eventCaptor.getValue().getEvent();
+    assertThat(eventCaptor.getValue()).isNotNull();
+    WebSessionEvent event = (WebSessionEvent) eventCaptor.getValue();
     assertThat(event.operation).isEqualTo(WebSessionEvent.Operation.ADD);
     assertThat(event.key).isEqualTo(KEY);
     assertThat(event.payload).isEqualTo(defaultPayload);
@@ -115,15 +119,12 @@
 
   @Test
   public void shouldPublishMessageWhenLogoutEvent() {
-    EventMessage eventMessage = createEventMessage(emptyPayload, Operation.REMOVE);
-    when(brokerApi.newMessage(any(UUID.class), any(Event.class))).thenReturn(eventMessage);
-
     objectUnderTest.invalidate(KEY);
 
     verify(brokerApi, times(1)).send(anyString(), eventCaptor.capture());
 
-    assertThat(eventCaptor.getValue().getEvent()).isNotNull();
-    WebSessionEvent event = (WebSessionEvent) eventCaptor.getValue().getEvent();
+    assertThat(eventCaptor.getValue()).isNotNull();
+    WebSessionEvent event = (WebSessionEvent) eventCaptor.getValue();
     assertThat(event.operation).isEqualTo(WebSessionEvent.Operation.REMOVE);
     assertThat(event.key).isEqualTo(KEY);
     assertThat(event.payload).isEqualTo(emptyPayload);
@@ -131,7 +132,7 @@
 
   @Test
   public void shouldUpdateCacheWhenLoginMessageReceived() {
-    EventMessage eventMessage = createEventMessage();
+    WebSessionEvent eventMessage = createEventMessage();
 
     objectUnderTest.processMessage(eventMessage);
 
@@ -142,7 +143,7 @@
 
   @Test
   public void shouldUpdateCacheWhenLogoutMessageReceived() {
-    EventMessage eventMessage = createEventMessage(emptyPayload, Operation.REMOVE);
+    WebSessionEvent eventMessage = createEventMessage(emptyPayload, Operation.REMOVE);
     cache.put(KEY, VAL);
 
     objectUnderTest.processMessage(eventMessage);
@@ -154,7 +155,7 @@
   public void shouldCleanupExpiredSessions() {
     when(timeMachine.now()).thenReturn(Instant.MIN, Instant.MAX);
 
-    EventMessage eventMessage = createEventMessage();
+    WebSessionEvent eventMessage = createEventMessage();
 
     objectUnderTest.processMessage(eventMessage);
 
@@ -166,30 +167,25 @@
     assertThat(cache.getIfPresent(eventMessageKey(eventMessage))).isNull();
   }
 
-  private Val createVal(EventMessage message) {
-    WebSessionEvent event = (WebSessionEvent) message.getEvent();
+  private Val createVal(Event message) {
+    WebSessionEvent event = (WebSessionEvent) message;
 
     objectUnderTest.processMessage(message);
     return cache.getIfPresent(event.key);
   }
 
-  private EventMessage createEventMessage() {
+  private WebSessionEvent createEventMessage() {
 
     return createEventMessage(defaultPayload, Operation.ADD);
   }
 
-  private String eventMessageKey(EventMessage eventMessage) {
-    WebSessionEvent sessionEvent = (WebSessionEvent) eventMessage.getEvent();
-    return sessionEvent.key;
+  private String eventMessageKey(WebSessionEvent eventMessage) {
+    return eventMessage.key;
   }
 
-  private EventMessage createEventMessage(byte[] payload, Operation operation) {
-
-    Header header =
-        new Header(
-            UUID.fromString("7cb80dbe-65c4-4f2c-84de-580d98199d4a"),
-            UUID.fromString("97711495-1013-414e-bfd2-44776787520d"));
+  private WebSessionEvent createEventMessage(byte[] payload, Operation operation) {
     WebSessionEvent event = new WebSessionEvent(KEY, payload, operation);
-    return new EventMessage(header, event);
+    event.instanceId = instanceId;
+    return event;
   }
 }