Rebuild web sessions cache on start

Trigger replay of all stored web session events to rebuild up to date
state of the system after node start. Cleanup expired web sessions to
minimize size cache.

Feature: Issue 11600
Change-Id: I10c04f770c65e0be07248961d3bee1efd80d1f39
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 60cdaea..09762f2 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -16,8 +16,8 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.0.3",
-        sha1 = "efdc5bf6897563e2f6f85bfc1b8a5d65e3393424",
+        artifact = "com.gerritforge:events-broker:3.0.4",
+        sha1 = "350b438f532678b1f9a277b7e7b6fa9da4b725b3",
     )
 
     maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSession.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSession.java
index 3d366be..fbd0882 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSession.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.websession.broker;
 
 import com.google.gerrit.extensions.annotations.RootRelative;
+import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.httpd.CacheBasedWebSession;
 import com.google.gerrit.httpd.WebSession;
@@ -26,8 +27,10 @@
 import com.google.gerrit.server.events.EventTypes;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
+import com.google.inject.internal.UniqueAnnotations;
 import com.google.inject.servlet.RequestScoped;
 import com.google.inject.servlet.ServletScopes;
+import java.lang.annotation.Annotation;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
@@ -43,6 +46,14 @@
       DynamicItem.bind(binder(), WebSession.class)
           .to(BrokerBasedWebSession.class)
           .in(RequestScoped.class);
+
+      listener(BrokerBasedWebSessionCache.class);
+      listener(BrokerBasedWebSessionCacheCleaner.class);
+    }
+
+    private void listener(Class<? extends LifecycleListener> classObj) {
+      final Annotation id = UniqueAnnotations.create();
+      bind(LifecycleListener.class).annotatedWith(id).to(classObj);
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
index 1cc4827..ba543ba 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCache.java
@@ -21,18 +21,23 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.common.Nullable;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.httpd.WebSessionManager;
 import com.google.gerrit.httpd.WebSessionManager.Val;
+import com.google.gerrit.server.config.PluginConfigFactory;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.google.inject.name.Named;
+import com.googlesource.gerrit.plugins.websession.broker.util.TimeMachine;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.time.Instant;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.UUID;
@@ -41,23 +46,28 @@
 import java.util.concurrent.ExecutionException;
 
 @Singleton
-public class BrokerBasedWebSessionCache implements Cache<String, WebSessionManager.Val> {
+public class BrokerBasedWebSessionCache
+    implements Cache<String, WebSessionManager.Val>, LifecycleListener {
 
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static String DEFAULT_WEB_SESSION_TOPIC = "gerrit_web_session";
 
   Cache<String, Val> cache;
   String webSessionTopicName;
   DynamicItem<BrokerApi> brokerApi;
+  TimeMachine timeMachine;
 
   @Inject
   public BrokerBasedWebSessionCache(
       @Named(WebSessionManager.CACHE_NAME) Cache<String, Val> cache,
-      @WebSessionTopicName String webSessionTopicName,
-      DynamicItem<BrokerApi> brokerApi) {
+      DynamicItem<BrokerApi> brokerApi,
+      TimeMachine timeMachine,
+      PluginConfigFactory cfg,
+      @PluginName String pluginName) {
     this.cache = cache;
-    this.webSessionTopicName = webSessionTopicName;
     this.brokerApi = brokerApi;
-    this.brokerApi.get().receiveAsync(webSessionTopicName, this::processMessage);
+    this.timeMachine = timeMachine;
+    this.webSessionTopicName = getWebSessionTopicName(cfg, pluginName);
   }
 
   protected void processMessage(EventMessage message) {
@@ -73,8 +83,12 @@
       case ADD:
         try (ByteArrayInputStream in = new ByteArrayInputStream(event.payload);
             ObjectInputStream inputStream = new ObjectInputStream(in)) {
+          Val value = (Val) inputStream.readObject();
+          Instant expires = Instant.ofEpochMilli(value.getExpiresAt());
+          if (expires.isAfter(timeMachine.now())) {
+            cache.put(event.key, value);
+          }
 
-          cache.put(event.key, (Val) inputStream.readObject());
         } catch (IOException | ClassNotFoundException e) {
           logger.atSevere().withCause(e).log(
               "Malformed event '%s': [Exception: %s]", message.getHeader());
@@ -153,7 +167,10 @@
 
   @Override
   public void cleanUp() {
-    cache.cleanUp();
+    Instant now = timeMachine.now();
+    cache.asMap().entrySet().stream()
+        .filter(entry -> Instant.ofEpochMilli(entry.getValue().getExpiresAt()).isBefore(now))
+        .forEach(entry -> cache.invalidate(entry.getKey()));
   }
 
   private void sendEvent(String key, Val value, WebSessionEvent.Operation operation) {
@@ -179,6 +196,11 @@
     }
   }
 
+  public String getWebSessionTopicName(PluginConfigFactory cfg, String pluginName) {
+    return cfg.getFromGerritConfig(pluginName)
+        .getString("webSessionTopic", DEFAULT_WEB_SESSION_TOPIC);
+  }
+
   public static class WebSessionEvent extends Event {
 
     public enum Operation {
@@ -198,4 +220,13 @@
       this.operation = operation;
     }
   }
+
+  @Override
+  public void start() {
+    brokerApi.get().receiveAsync(webSessionTopicName, this::processMessage);
+    brokerApi.get().replayAllEvents(webSessionTopicName);
+  }
+
+  @Override
+  public void stop() {}
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheCleaner.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheCleaner.java
new file mode 100644
index 0000000..b687f76
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheCleaner.java
@@ -0,0 +1,100 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.websession.broker;
+
+import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import com.google.common.base.Strings;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.server.config.ConfigUtil;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import java.util.concurrent.ScheduledFuture;
+
+@Singleton
+public class BrokerBasedWebSessionCacheCleaner implements LifecycleListener {
+
+  private static int DEFAULT_CLEANUP_INTERVAL = 24;
+
+  WorkQueue queue;
+  Provider<CleanupTask> cleanupTaskProvider;
+  ScheduledFuture<?> scheduledCleanupTask;
+  long cleanupIntervalMillis;
+
+  static class CleanupTask implements Runnable {
+    private final BrokerBasedWebSessionCache brokerBasedWebSessionCache;
+    private final String pluginName;
+
+    @Inject
+    CleanupTask(
+        BrokerBasedWebSessionCache brokerBasedWebSessionCache, @PluginName String pluginName) {
+      this.brokerBasedWebSessionCache = brokerBasedWebSessionCache;
+      this.pluginName = pluginName;
+    }
+
+    @Override
+    public void run() {
+      brokerBasedWebSessionCache.cleanUp();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("[%s] Clean up expired file based websessions", pluginName);
+    }
+  }
+
+  @Inject
+  public BrokerBasedWebSessionCacheCleaner(
+      WorkQueue queue,
+      Provider<CleanupTask> cleanupTaskProvider,
+      PluginConfigFactory cfg,
+      @PluginName String pluginName) {
+    this.queue = queue;
+    this.cleanupTaskProvider = cleanupTaskProvider;
+    this.cleanupIntervalMillis = getCleanupInterval(cfg, pluginName);
+  }
+
+  @Override
+  public void start() {
+    scheduledCleanupTask =
+        queue
+            .getDefaultQueue()
+            .scheduleAtFixedRate(
+                cleanupTaskProvider.get(),
+                SECONDS.toMillis(1),
+                cleanupIntervalMillis,
+                MILLISECONDS);
+  }
+
+  @Override
+  public void stop() {
+    if (scheduledCleanupTask != null) {
+      scheduledCleanupTask.cancel(true);
+      scheduledCleanupTask = null;
+    }
+  }
+
+  private Long getCleanupInterval(PluginConfigFactory cfg, String pluginName) {
+    String fromConfig =
+        Strings.nullToEmpty(cfg.getFromGerritConfig(pluginName).getString("cleanupInterval"));
+    return HOURS.toMillis(ConfigUtil.getTimeUnit(fromConfig, DEFAULT_CLEANUP_INTERVAL, HOURS));
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/Module.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/Module.java
index 864ed8f..b3a9c7e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/Module.java
@@ -15,17 +15,11 @@
 package com.googlesource.gerrit.plugins.websession.broker;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.gerrit.server.config.PluginConfigFactory;
 import com.google.inject.Inject;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
 
 public class Module extends LifecycleModule {
-  private static String DEFAULT_WEB_SESSION_TOPIC = "gerrit_web_session";
-
   DynamicItem<BrokerApi> brokerApi;
 
   @Override
@@ -39,12 +33,4 @@
   public void setBrokerApi(DynamicItem<BrokerApi> brokerApi) {
     this.brokerApi = brokerApi;
   }
-
-  @Provides
-  @Singleton
-  @WebSessionTopicName
-  public String getWebSessionTopicName(PluginConfigFactory cfg, @PluginName String pluginName) {
-    return cfg.getFromGerritConfig(pluginName)
-        .getString("webSessionTopic", DEFAULT_WEB_SESSION_TOPIC);
-  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/WebSessionTopicName.java b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/util/TimeMachine.java
similarity index 62%
rename from src/main/java/com/googlesource/gerrit/plugins/websession/broker/WebSessionTopicName.java
rename to src/main/java/com/googlesource/gerrit/plugins/websession/broker/util/TimeMachine.java
index e7b191b..f7697e6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/websession/broker/WebSessionTopicName.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/websession/broker/util/TimeMachine.java
@@ -12,13 +12,22 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.websession.broker;
+package com.googlesource.gerrit.plugins.websession.broker.util;
 
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import com.google.inject.Singleton;
+import java.time.Clock;
+import java.time.Instant;
 
-import com.google.inject.BindingAnnotation;
-import java.lang.annotation.Retention;
+@Singleton
+public class TimeMachine {
 
-@Retention(RUNTIME)
-@BindingAnnotation
-public @interface WebSessionTopicName {}
+  private Clock clock = Clock.systemDefaultZone();
+
+  public Instant now() {
+    return Instant.now(getClock());
+  }
+
+  public Clock getClock() {
+    return clock;
+  }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 6da02d3..7ac2e37 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -16,4 +16,16 @@
 
 `plugin.websession-broker.webSessionTopic`
 :   Name of the topic to use for publishing web session events.
-    Default: gerrit\_web\_session
\ No newline at end of file
+    Default: gerrit\_web\_session
+
+`plugin.websession-broker.cleanupInterval`
+:   Frequency of the expired web session cleanup operation.
+    Value should use common time unit suffixes to express their setting:
+    * h, hr, hour, hours
+    * d, day, days
+    * w, week, weeks (`1 week` is treated as `7 days`)
+    * mon, month, months (`1 month` is treated as `30 days`)
+    * y, year, years (`1 year` is treated as `365 days`)
+    If a time unit suffix is not specified, `hours` is assumed.
+    Time intervals smaller than one hour are not supported.
+    Default: 24 hours
diff --git a/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheCleanerTest.java b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheCleanerTest.java
new file mode 100644
index 0000000..dbeed88
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheCleanerTest.java
@@ -0,0 +1,101 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.websession.broker;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.server.config.PluginConfig;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Provider;
+import com.googlesource.gerrit.plugins.websession.broker.BrokerBasedWebSessionCacheCleaner.CleanupTask;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerBasedWebSessionCacheCleanerTest {
+  private static Long CLEANUP_INTERVAL = 1L;
+  private static String SOME_PLUGIN_NAME = "somePluginName";
+
+  @Mock private ScheduledThreadPoolExecutor executorMock;
+  @Mock private ScheduledFuture<?> scheduledFutureMock;
+  @Mock private WorkQueue workQueueMock;
+  @Mock private Provider<CleanupTask> cleanupTaskProviderMock;
+  @Mock PluginConfigFactory cfg;
+  @Mock PluginConfig pluginConfig;
+
+  private BrokerBasedWebSessionCacheCleaner objectUnderTest;
+
+  @Before
+  public void setUp() {
+    when(pluginConfig.getString("cleanupInterval")).thenReturn(CLEANUP_INTERVAL.toString());
+    when(cfg.getFromGerritConfig(SOME_PLUGIN_NAME)).thenReturn(pluginConfig);
+    when(cleanupTaskProviderMock.get()).thenReturn(new CleanupTask(null, null));
+    when(workQueueMock.getDefaultQueue()).thenReturn(executorMock);
+    doReturn(scheduledFutureMock)
+        .when(executorMock)
+        .scheduleAtFixedRate(isA(CleanupTask.class), anyLong(), anyLong(), isA(TimeUnit.class));
+    objectUnderTest =
+        new BrokerBasedWebSessionCacheCleaner(
+            workQueueMock, cleanupTaskProviderMock, cfg, SOME_PLUGIN_NAME);
+  }
+
+  @Test
+  public void testCleanupTaskRun() {
+    BrokerBasedWebSessionCache cacheMock = mock(BrokerBasedWebSessionCache.class);
+    CleanupTask task = new CleanupTask(cacheMock, null);
+    int numberOfRuns = 5;
+    for (int i = 0; i < numberOfRuns; i++) {
+      task.run();
+    }
+    verify(cacheMock, times(numberOfRuns)).cleanUp();
+  }
+
+  @Test
+  public void testCleanupTaskIsScheduledOnStart() {
+    objectUnderTest.start();
+    verify(executorMock, times(1))
+        .scheduleAtFixedRate(
+            isA(CleanupTask.class), eq(1000l), eq(3600000L), eq(TimeUnit.MILLISECONDS));
+  }
+
+  @Test
+  public void testCleanupTaskIsCancelledOnStop() {
+    objectUnderTest.start();
+    objectUnderTest.stop();
+    verify(scheduledFutureMock, times(1)).cancel(true);
+  }
+
+  @Test
+  public void testCleanupTaskIsCancelledOnlyOnce() {
+    objectUnderTest.start();
+    objectUnderTest.stop();
+    objectUnderTest.stop();
+    verify(scheduledFutureMock, times(1)).cancel(true);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
index c17641c..746d3e6 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/websession/broker/BrokerBasedWebSessionCacheTest.java
@@ -27,12 +27,18 @@
 import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
 import com.google.common.cache.Cache;
+import com.google.common.collect.Maps;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.httpd.WebSessionManager.Val;
+import com.google.gerrit.server.config.PluginConfig;
+import com.google.gerrit.server.config.PluginConfigFactory;
 import com.google.gerrit.server.events.Event;
 import com.googlesource.gerrit.plugins.websession.broker.BrokerBasedWebSessionCache.WebSessionEvent;
 import com.googlesource.gerrit.plugins.websession.broker.BrokerBasedWebSessionCache.WebSessionEvent.Operation;
+import com.googlesource.gerrit.plugins.websession.broker.util.TimeMachine;
+import java.time.Instant;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -46,6 +52,8 @@
 
   private static final int DEFAULT_ACCOUNT_ID = 1000000;
   private static final String KEY = "aSceprtma6B0qZ0hKxXHvQ5iyfUhCcFXxG";
+  private static final String PLUGIN_NAME = "websession-broker";
+
   private byte[] emptyPayload = new byte[] {-84, -19, 0, 5, 112};
   byte[] defaultPayload =
       new byte[] {
@@ -61,6 +69,9 @@
 
   @Mock BrokerApi brokerApi;
   @Mock Cache<String, Val> cache;
+  @Mock TimeMachine timeMachine;
+  @Mock PluginConfigFactory cfg;
+  @Mock PluginConfig pluginConfig;
   @Captor ArgumentCaptor<EventMessage> eventCaptor;
   @Captor ArgumentCaptor<Val> valCaptor;
 
@@ -68,8 +79,12 @@
 
   @Before
   public void setup() {
+    when(pluginConfig.getString("webSessionTopic", "gerrit_web_session"))
+        .thenReturn("gerrit_web_session");
+    when(cfg.getFromGerritConfig(PLUGIN_NAME)).thenReturn(pluginConfig);
+    when(timeMachine.now()).thenReturn(Instant.EPOCH);
     DynamicItem<BrokerApi> item = DynamicItem.itemOf(BrokerApi.class, brokerApi);
-    objectUnderTest = new BrokerBasedWebSessionCache(cache, "web_session_topic", item);
+    objectUnderTest = new BrokerBasedWebSessionCache(cache, item, timeMachine, cfg, PLUGIN_NAME);
   }
 
   @Test
@@ -148,6 +163,34 @@
     verifyZeroInteractions(cache);
   }
 
+  @Test
+  public void shouldSkipCacheUpdateWhenSessionExpired() {
+    when(timeMachine.now()).thenReturn(Instant.MAX);
+    EventMessage eventMessage = createEventMessage();
+    objectUnderTest.processMessage(eventMessage);
+
+    verifyZeroInteractions(cache);
+  }
+
+  @Test
+  public void shouldCleanupExpiredSessions() {
+    when(timeMachine.now()).thenReturn(Instant.MIN, Instant.MAX);
+
+    EventMessage eventMessage = createEventMessage();
+
+    objectUnderTest.processMessage(eventMessage);
+    verify(cache, times(1)).put(anyString(), valCaptor.capture());
+    assertThat(valCaptor.getValue()).isNotNull();
+
+    ConcurrentMap<String, Val> cacheMap = Maps.newConcurrentMap();
+    cacheMap.put(KEY, valCaptor.getValue());
+    when(cache.asMap()).thenReturn(cacheMap);
+
+    objectUnderTest.cleanUp();
+
+    verify(cache, times(1)).invalidate(KEY);
+  }
+
   @SuppressWarnings("unchecked")
   private Val createVal(EventMessage message) {
     ArgumentCaptor<Val> valArgumentCaptor = ArgumentCaptor.forClass(Val.class);