Create a broker based shareable websession cache

The plugin replaces the builtin Gerrit WebSession implementation
with one that uses a broker based cache. This allows to share
sessions without common file system. This is partically useful for
multi-site scenario.

Feature: Issue 11600
Change-Id: I9d2fa70d5292d95d495bc391bfca25e82afe063a
diff --git a/BUILD b/BUILD
new file mode 100644
index 0000000..cdd7f31
--- /dev/null
+++ b/BUILD
@@ -0,0 +1,44 @@
+load("//tools/bzl:junit.bzl", "junit_tests")
+load(
+    "//tools/bzl:plugin.bzl",
+    "gerrit_plugin",
+    "PLUGIN_DEPS",
+    "PLUGIN_TEST_DEPS",
+)
+
+gerrit_plugin(
+    name = "websession-broker",
+    srcs = glob(["src/main/java/**/*.java"]),
+    resources = glob(["src/main/resources/**/*"]),
+    deps = [
+        "@events-broker//jar",
+    ],
+    manifest_entries = [
+        "Gerrit-PluginName: websession-broker",
+        "Gerrit-Module: com.googlesource.gerrit.plugins.websession.broker.Module",
+        "Gerrit-HttpModule: com.googlesource.gerrit.plugins.websession.broker.BrokerBasedWebSession$Module",
+        "Implementation-Title: Broker WebSession",
+        "Implementation-URL: https://review.gerrithub.io/admin/repos/GerritForge/plugins_websession-broker",
+    ],
+)
+
+junit_tests(
+    name = "websession-broker_tests",
+    srcs = glob(["src/test/java/**/*.java"]),
+    resources = glob(["src/test/resources/**/*"]),
+    tags = ["websession-broker"],
+    deps = [
+        ":websession-broker__plugin_test_deps",
+    ],
+)
+
+java_library(
+    name = "websession-broker__plugin_test_deps",
+    testonly = 1,
+    visibility = ["//visibility:public"],
+    exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
+        ":websession-broker__plugin",
+        "@mockito//jar",
+        "@events-broker//jar",
+    ],
+)
diff --git a/WORKSPACE b/WORKSPACE
new file mode 100644
index 0000000..47eccca
--- /dev/null
+++ b/WORKSPACE
@@ -0,0 +1,5 @@
+workspace(name = "websession_broker")
+
+load("//:external_plugin_deps.bzl", "external_plugin_deps")
+
+external_plugin_deps()
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
new file mode 100644
index 0000000..60cdaea
--- /dev/null
+++ b/external_plugin_deps.bzl
@@ -0,0 +1,39 @@
+load("//tools/bzl:maven_jar.bzl", "maven_jar")
+
+def external_plugin_deps():
+    maven_jar(
+        name = "mockito",
+        artifact = "org.mockito:mockito-core:2.27.0",
+        sha1 = "835fc3283b481f4758b8ef464cd560c649c08b00",
+        deps = [
+            "@byte-buddy//jar",
+            "@byte-buddy-agent//jar",
+            "@objenesis//jar",
+        ],
+    )
+
+    BYTE_BUDDY_VERSION = "1.9.10"
+
+    maven_jar(
+        name = "events-broker",
+        artifact = "com.gerritforge:events-broker:3.0.3",
+        sha1 = "efdc5bf6897563e2f6f85bfc1b8a5d65e3393424",
+    )
+
+    maven_jar(
+        name = "byte-buddy",
+        artifact = "net.bytebuddy:byte-buddy:" + BYTE_BUDDY_VERSION,
+        sha1 = "211a2b4d3df1eeef2a6cacf78d74a1f725e7a840",
+    )
+
+    maven_jar(
+        name = "byte-buddy-agent",
+        artifact = "net.bytebuddy:byte-buddy-agent:" + BYTE_BUDDY_VERSION,
+        sha1 = "9674aba5ee793e54b864952b001166848da0f26b",
+    )
+
+    maven_jar(
+        name = "objenesis",
+        artifact = "org.objenesis:objenesis:2.6",
+        sha1 = "639033469776fd37c08358c6b92a4761feb2af4b",
+    )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSession.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSession.java
new file mode 100644
index 0000000..3d366be
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSession.java
@@ -0,0 +1,66 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.websession.broker;
+
+import com.google.gerrit.extensions.annotations.RootRelative;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.httpd.CacheBasedWebSession;
+import com.google.gerrit.httpd.WebSession;
+import com.google.gerrit.httpd.WebSessionManagerFactory;
+import com.google.gerrit.server.AnonymousUser;
+import com.google.gerrit.server.IdentifiedUser.RequestFactory;
+import com.google.gerrit.server.cache.CacheModule;
+import com.google.gerrit.server.config.AuthConfig;
+import com.google.gerrit.server.events.EventTypes;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.servlet.RequestScoped;
+import com.google.inject.servlet.ServletScopes;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+public class BrokerBasedWebSession extends CacheBasedWebSession {
+  public static class Module extends CacheModule {
+
+    @Override
+    protected void configure() {
+      EventTypes.register(
+          BrokerBasedWebSessionCache.WebSessionEvent.TYPE,
+          BrokerBasedWebSessionCache.WebSessionEvent.class);
+      bindScope(RequestScoped.class, ServletScopes.REQUEST);
+      DynamicItem.bind(binder(), WebSession.class)
+          .to(BrokerBasedWebSession.class)
+          .in(RequestScoped.class);
+    }
+  }
+
+  @Inject
+  BrokerBasedWebSession(
+      @RootRelative Provider<HttpServletRequest> request,
+      @RootRelative Provider<HttpServletResponse> response,
+      WebSessionManagerFactory managerFactory,
+      BrokerBasedWebSessionCache cache,
+      AuthConfig authConfig,
+      Provider<AnonymousUser> anonymousProvider,
+      RequestFactory identified) {
+    super(
+        request.get(),
+        response.get(),
+        managerFactory.create(cache),
+        authConfig,
+        anonymousProvider,
+        identified);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
new file mode 100644
index 0000000..1cc4827
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
@@ -0,0 +1,201 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.websession.broker;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheStats;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.httpd.WebSessionManager;
+import com.google.gerrit.httpd.WebSessionManager.Val;
+import com.google.gerrit.server.events.Event;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+
+@Singleton
+public class BrokerBasedWebSessionCache implements Cache<String, WebSessionManager.Val> {
+
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  Cache<String, Val> cache;
+  String webSessionTopicName;
+  DynamicItem<BrokerApi> brokerApi;
+
+  @Inject
+  public BrokerBasedWebSessionCache(
+      @Named(WebSessionManager.CACHE_NAME) Cache<String, Val> cache,
+      @WebSessionTopicName String webSessionTopicName,
+      DynamicItem<BrokerApi> brokerApi) {
+    this.cache = cache;
+    this.webSessionTopicName = webSessionTopicName;
+    this.brokerApi = brokerApi;
+    this.brokerApi.get().receiveAsync(webSessionTopicName, this::processMessage);
+  }
+
+  protected void processMessage(EventMessage message) {
+    if (!WebSessionEvent.TYPE.equals(message.getEvent().getType())) {
+      logger.atWarning().log(
+          "Skipping web session message of unknown type:{}", message.getEvent().getType());
+      return;
+    }
+
+    WebSessionEvent event = (WebSessionEvent) message.getEvent();
+
+    switch (event.operation) {
+      case ADD:
+        try (ByteArrayInputStream in = new ByteArrayInputStream(event.payload);
+            ObjectInputStream inputStream = new ObjectInputStream(in)) {
+
+          cache.put(event.key, (Val) inputStream.readObject());
+        } catch (IOException | ClassNotFoundException e) {
+          logger.atSevere().withCause(e).log(
+              "Malformed event '%s': [Exception: %s]", message.getHeader());
+        }
+        break;
+      case REMOVE:
+        cache.invalidate(event.key);
+        break;
+      default:
+        logger.atWarning().log(
+            "Skipping web session message of unknown operation type:{}", event.operation);
+        break;
+    }
+  }
+
+  @Override
+  public @Nullable Val getIfPresent(Object key) {
+    return cache.getIfPresent(key);
+  }
+
+  @Override
+  public Val get(String key, Callable<? extends Val> valueLoader) throws ExecutionException {
+    return cache.get(key, valueLoader);
+  }
+
+  @Override
+  public ImmutableMap<String, Val> getAllPresent(Iterable<?> keys) {
+    return cache.getAllPresent(keys);
+  }
+
+  @Override
+  public void put(String key, Val value) {
+    sendEvent(key, value, WebSessionEvent.Operation.ADD);
+    cache.put(key, value);
+  }
+
+  @Override
+  public void putAll(Map<? extends String, ? extends Val> keys) {
+    for (Entry<? extends String, ? extends Val> e : keys.entrySet()) {
+      put(e.getKey(), e.getValue());
+    }
+  }
+
+  @Override
+  public void invalidate(Object key) {
+    sendEvent((String) key, null, WebSessionEvent.Operation.REMOVE);
+    cache.invalidate(key);
+  }
+
+  @Override
+  public void invalidateAll(Iterable<?> keys) {
+    for (Object key : keys) {
+      invalidate(key);
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    cache.asMap().forEach((key, value) -> invalidate(key));
+  }
+
+  @Override
+  public long size() {
+    return cache.size();
+  }
+
+  @Override
+  public CacheStats stats() {
+    return cache.stats();
+  }
+
+  @Override
+  public ConcurrentMap<String, Val> asMap() {
+    return cache.asMap();
+  }
+
+  @Override
+  public void cleanUp() {
+    cache.cleanUp();
+  }
+
+  private void sendEvent(String key, Val value, WebSessionEvent.Operation operation) {
+    boolean succeeded = false;
+    try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+        ObjectOutputStream objectOutputStream = new ObjectOutputStream(out)) {
+
+      objectOutputStream.writeObject(value);
+      out.flush();
+      byte[] serializedObject = out.toByteArray();
+      EventMessage message =
+          brokerApi
+              .get()
+              .newMessage(UUID.randomUUID(), new WebSessionEvent(key, serializedObject, operation));
+      succeeded = brokerApi.get().send(webSessionTopicName, message);
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log(
+          "Cannot serialize event for account id '%s': [Exception: %s]", value.getAccountId());
+    } finally {
+      if (!succeeded)
+        logger.atSevere().log(
+            "Cannot send web-session message for '%s Topic: '%s'", key, webSessionTopicName);
+    }
+  }
+
+  public static class WebSessionEvent extends Event {
+
+    public enum Operation {
+      ADD,
+      REMOVE;
+    }
+
+    static final String TYPE = "web-session";
+    public String key;
+    public byte[] payload;
+    public Operation operation;
+
+    protected WebSessionEvent(String key, byte[] payload, Operation operation) {
+      super(TYPE);
+      this.key = key;
+      this.payload = payload;
+      this.operation = operation;
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/Module.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/Module.java
new file mode 100644
index 0000000..864ed8f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/Module.java
@@ -0,0 +1,50 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.websession.broker;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import com.google.inject.Inject;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+
+public class Module extends LifecycleModule {
+  private static String DEFAULT_WEB_SESSION_TOPIC = "gerrit_web_session";
+
+  DynamicItem<BrokerApi> brokerApi;
+
+  @Override
+  protected void configure() {
+    if (brokerApi == null) {
+      DynamicItem.itemOf(binder(), BrokerApi.class);
+    }
+  }
+
+  @Inject(optional = true)
+  public void setBrokerApi(DynamicItem<BrokerApi> brokerApi) {
+    this.brokerApi = brokerApi;
+  }
+
+  @Provides
+  @Singleton
+  @WebSessionTopicName
+  public String getWebSessionTopicName(PluginConfigFactory cfg, @PluginName String pluginName) {
+    return cfg.getFromGerritConfig(pluginName)
+        .getString("webSessionTopic", DEFAULT_WEB_SESSION_TOPIC);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/WebSessionTopicName.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/WebSessionTopicName.java
new file mode 100644
index 0000000..e7b191b
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/WebSessionTopicName.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.websession.broker;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+public @interface WebSessionTopicName {}
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
new file mode 100644
index 0000000..cb65323
--- /dev/null
+++ b/src/main/resources/Documentation/about.md
@@ -0,0 +1,17 @@
+This plugin replaces the built-in Gerrit H2 based websession
+cache with a broker based implementation. This allows to share
+sessions without common file system. This is particularly useful for
+multi-site scenario.
+
+## Setup
+
+Prerequisites:
+
+* A message broker implementation has to be deployed across all the sites
+
+For the masters:
+
+* Install and configure @PLUGIN@ plugin
+
+For further information and supported options, refer to [config](config.md)
+documentation.
diff --git a/src/main/resources/Documentation/build.md b/src/main/resources/Documentation/build.md
new file mode 100644
index 0000000..d18e927
--- /dev/null
+++ b/src/main/resources/Documentation/build.md
@@ -0,0 +1,43 @@
+Build
+=====
+
+This @PLUGIN@ plugin is built with Bazel standard in tree [build](https://gerrit-review.googlesource.com/Documentation/dev-build-plugins.html#_bazel_in_tree_driven).
+
+Clone (or link) this plugin to the `plugins` directory of Gerrit's
+source tree. Put the external dependency Bazel build file into
+the Gerrit /plugins directory, replacing the existing empty one.
+
+```
+  cd gerrit/plugins
+  rm external_plugin_deps.bzl
+  ln -s @PLUGIN@/external_plugin_deps.bzl .
+```
+
+From the Gerrit source tree issue the command:
+
+```
+  bazel build plugins/@PLUGIN@
+```
+
+The output is created in
+
+```
+  bazel-genfiles/plugins/@PLUGIN@/@PLUGIN@.jar
+```
+
+This project can be imported into the Eclipse IDE.
+Add the plugin name to the `CUSTOM_PLUGINS` set in
+Gerrit core in `tools/bzl/plugins.bzl`, and execute:
+
+```
+  ./tools/eclipse/project.py
+```
+
+To execute the tests run:
+
+```
+  bazel test plugins/@PLUGIN@:websession-broker_tests
+```
+
+How to build the Gerrit Plugin API is described in the [Gerrit
+documentation](../../../Documentation/dev-bazel.html#_extension_and_plugin_api_jar_files).
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
new file mode 100644
index 0000000..6da02d3
--- /dev/null
+++ b/src/main/resources/Documentation/config.md
@@ -0,0 +1,19 @@
+Broker WebSession Plugin Configuration
+======================
+
+@PLUGIN@ parameters can be configured using Gerrit config file: $site_dir/etc/gerrit.config.
+
+Sample config
+---------------------
+
+```
+[plugin "websession-broker"]
+        webSessionTopic = gerrit_web_session
+```
+
+Configuration parameters
+---------------------
+
+`plugin.websession-broker.webSessionTopic`
+:   Name of the topic to use for publishing web session events.
+    Default: gerrit\_web\_session
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
new file mode 100644
index 0000000..c17641c
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
@@ -0,0 +1,175 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.websession.broker;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
+import com.google.common.cache.Cache;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.httpd.WebSessionManager.Val;
+import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.websession.broker.BrokerBasedWebSessionCache.WebSessionEvent;
+import com.googlesource.gerrit.plugins.websession.broker.BrokerBasedWebSessionCache.WebSessionEvent.Operation;
+import java.util.UUID;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerBasedWebSessionCacheTest {
+
+  private static final int DEFAULT_ACCOUNT_ID = 1000000;
+  private static final String KEY = "aSceprtma6B0qZ0hKxXHvQ5iyfUhCcFXxG";
+  private byte[] emptyPayload = new byte[] {-84, -19, 0, 5, 112};
+  byte[] defaultPayload =
+      new byte[] {
+        -84, -19, 0, 5, 115, 114, 0, 45, 99, 111, 109, 46, 103, 111, 111, 103, 108, 101, 46, 103,
+        101, 114, 114, 105, 116, 46, 104, 116, 116, 112, 100, 46, 87, 101, 98, 83, 101, 115, 115,
+        105, 111, 110, 77, 97, 110, 97, 103, 101, 114, 36, 86, 97, 108, 0, 0, 0, 0, 0, 0, 0, 2, 3,
+        0, 0, 120, 112, 119, 97, 1, -64, -124, 61, 2, 0, 0, 1, 111, 13, -8, 90, 7, 3, 0, 5, 34, 97,
+        83, 99, 101, 112, 114, 113, 86, 87, 54, 85, 79, 45, 88, 51, 107, 51, 116, 102, 85, 109, 86,
+        103, 82, 73, 90, 56, 53, 99, 99, 52, 71, 114, 87, 6, 0, 0, 1, 111, 16, 84, -103, -121, 7,
+        34, 97, 83, 99, 101, 112, 114, 114, 82, 103, 119, 49, 71, 110, 90, 56, 122, 54, 49, 49, 86,
+        52, 121, 110, 65, 100, 110, 113, 99, 68, 45, 105, 99, 75, 97, 0, 120
+      };
+
+  @Mock BrokerApi brokerApi;
+  @Mock Cache<String, Val> cache;
+  @Captor ArgumentCaptor<EventMessage> eventCaptor;
+  @Captor ArgumentCaptor<Val> valCaptor;
+
+  BrokerBasedWebSessionCache objectUnderTest;
+
+  @Before
+  public void setup() {
+    DynamicItem<BrokerApi> item = DynamicItem.itemOf(BrokerApi.class, brokerApi);
+    objectUnderTest = new BrokerBasedWebSessionCache(cache, "web_session_topic", item);
+  }
+
+  @Test
+  public void shouldPublishMessageWhenLoginEvent() {
+    EventMessage eventMessage = createEventMessage();
+    Val value = createVal(eventMessage);
+    when(brokerApi.newMessage(any(UUID.class), any(Event.class))).thenReturn(eventMessage);
+
+    objectUnderTest.put(KEY, value);
+
+    verify(brokerApi, times(1)).send(anyString(), eventCaptor.capture());
+
+    assertThat(eventCaptor.getValue().getEvent()).isNotNull();
+    WebSessionEvent event = (WebSessionEvent) eventCaptor.getValue().getEvent();
+    assertThat(event.operation).isEqualTo(WebSessionEvent.Operation.ADD);
+    assertThat(event.key).isEqualTo(KEY);
+    assertThat(event.payload).isEqualTo(defaultPayload);
+  }
+
+  @Test
+  public void shouldPublishMessageWhenLogoutEvent() {
+    EventMessage eventMessage = createEventMessage(emptyPayload, Operation.REMOVE);
+    when(brokerApi.newMessage(any(UUID.class), any(Event.class))).thenReturn(eventMessage);
+
+    objectUnderTest.invalidate(KEY);
+
+    verify(brokerApi, times(1)).send(anyString(), eventCaptor.capture());
+
+    assertThat(eventCaptor.getValue().getEvent()).isNotNull();
+    WebSessionEvent event = (WebSessionEvent) eventCaptor.getValue().getEvent();
+    assertThat(event.operation).isEqualTo(WebSessionEvent.Operation.REMOVE);
+    assertThat(event.key).isEqualTo(KEY);
+    assertThat(event.payload).isEqualTo(emptyPayload);
+  }
+
+  @Test
+  public void shouldUpdateCacheWhenLoginMessageReceived() {
+    EventMessage eventMessage = createEventMessage();
+
+    objectUnderTest.processMessage(eventMessage);
+
+    verify(cache, times(1)).put(anyString(), valCaptor.capture());
+
+    assertThat(valCaptor.getValue()).isNotNull();
+    Val val = valCaptor.getValue();
+    assertThat(val.getAccountId().get()).isEqualTo(DEFAULT_ACCOUNT_ID);
+  }
+
+  @Test
+  public void shouldUpdateCacheWhenLogoutMessageReceived() {
+    EventMessage eventMessage = createEventMessage(emptyPayload, Operation.REMOVE);
+
+    objectUnderTest.processMessage(eventMessage);
+
+    verify(cache, times(1)).invalidate(KEY);
+  }
+
+  @Test
+  public void shouldSkipCacheUpdateWhenUnknownEventType() {
+    Header header =
+        new Header(
+            UUID.fromString("7cb80dbe-65c4-4f2c-84de-580d98199d4a"),
+            UUID.fromString("97711495-1013-414e-bfd2-44776787520d"));
+    Event event = new Event("sample-event") {};
+    EventMessage eventMessage = new EventMessage(header, event);
+    objectUnderTest.processMessage(eventMessage);
+
+    verifyZeroInteractions(cache);
+  }
+
+  @Test
+  public void shouldSkipCacheUpdateWhenInvalidPayload() {
+    EventMessage eventMessage = createEventMessage(new byte[] {1, 2, 3, 4}, Operation.ADD);
+    objectUnderTest.processMessage(eventMessage);
+
+    verifyZeroInteractions(cache);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Val createVal(EventMessage message) {
+    ArgumentCaptor<Val> valArgumentCaptor = ArgumentCaptor.forClass(Val.class);
+
+    objectUnderTest.processMessage(message);
+    verify(cache).put(anyString(), valArgumentCaptor.capture());
+    reset(cache);
+    return valArgumentCaptor.getValue();
+  }
+
+  private EventMessage createEventMessage() {
+
+    return createEventMessage(defaultPayload, Operation.ADD);
+  }
+
+  private EventMessage createEventMessage(byte[] payload, Operation operation) {
+
+    Header header =
+        new Header(
+            UUID.fromString("7cb80dbe-65c4-4f2c-84de-580d98199d4a"),
+            UUID.fromString("97711495-1013-414e-bfd2-44776787520d"));
+    WebSessionEvent event = new WebSessionEvent(KEY, payload, operation);
+    return new EventMessage(header, event);
+  }
+}