Introduce cache.threads option to enable a custom cache executor

Since the introduction of Caffeine as alternative to Guava
in Change 244612, the execution of the cache event listeners moved
to a background thread, run by the ForkJoinPool's common pool [1].

The subtle difference has caused issues to the plugins that are
registering listeners, like the high-availability and multi-site:
the consequences have been quite serious because of the inability
to understand if the eviction was caused by a forwarded cache
eviction event or by an execution of Gerrit API or other internals
that caused the removal of the entry.

The use of the JVM common pool has several disadvantages and, under
certain conditions [2], it may even lead to deadlocks or unexpected
blockages.

By introducing the cache.threads option, decouple the cache background
threads execution and allow to configure an explicit separate
thread pool which can be tuned and decoupled from the rest of the
JVM common threads.

Also, allow to restore the plugins' cache listeners legacy behaviour
without losing the ability to leverage the performance of Caffeine
cache vs. the traditional Guava.

By default, this change is a NO-OP because it preserves the current
behaviour of background execution tasks of the Caffeine cache.

Introduce DefaultMemoryCacheFactoryTest class from stable-3.4 for
avoiding further conflicts when merging upstream.

References:
[1] https://github.com/ben-manes/caffeine/wiki/Guava#asynchronous-notifications
[2] https://dzone.com/articles/be-aware-of-forkjoinpoolcommonpool

Release-Notes: introduce cache.threads option to allow custom executors for Caffeine caches
Bug: Issue 16565
Change-Id: I204abd1bdbf2bbed5b3d982d14cbc5549ac96ace
diff --git a/Documentation/config-gerrit.txt b/Documentation/config-gerrit.txt
index 41435e9..a050021 100644
--- a/Documentation/config-gerrit.txt
+++ b/Documentation/config-gerrit.txt
@@ -701,6 +701,19 @@
 [[cache]]
 === Section cache
 
+[[cache.threads]]cache.threads::
++
+Number of threads to use when running asynchronous cache tasks.
+The threads executor is delegated to when sending removal notifications to listeners,
+when asynchronous computations like refresh, refreshAfterWrite are performed, or when
+performing periodic maintenance.
++
+**NOTE**: Setting it to 0 disables the dedicated thread pool and indexing will be done in the
+same thread as the operation. This may result in evictions taking longer because the
+listeners are executed in the caller's thread.
++
+By default, the JVM common ForkJoinPool is used.
+
 [[cache.directory]]cache.directory::
 +
 Path to a local directory where Gerrit can write cached entities for
diff --git a/java/com/google/gerrit/server/cache/ForwardingRemovalListener.java b/java/com/google/gerrit/server/cache/ForwardingRemovalListener.java
index ee672cd..357cbbb 100644
--- a/java/com/google/gerrit/server/cache/ForwardingRemovalListener.java
+++ b/java/com/google/gerrit/server/cache/ForwardingRemovalListener.java
@@ -14,6 +14,7 @@
 
 package com.google.gerrit.server.cache;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
@@ -40,7 +41,8 @@
   private String pluginName = PluginName.GERRIT;
 
   @Inject
-  ForwardingRemovalListener(
+  @VisibleForTesting
+  protected ForwardingRemovalListener(
       PluginSetContext<CacheRemovalListener> listeners, @Assisted String cacheName) {
     this.listeners = listeners;
     this.cacheName = cacheName;
diff --git a/java/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactory.java b/java/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactory.java
index 9906b3d..afed2f7 100644
--- a/java/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactory.java
+++ b/java/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactory.java
@@ -27,6 +27,7 @@
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalNotification;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gerrit.common.Nullable;
 import com.google.gerrit.server.cache.CacheBackend;
 import com.google.gerrit.server.cache.CacheDef;
@@ -34,20 +35,37 @@
 import com.google.gerrit.server.cache.MemoryCacheFactory;
 import com.google.gerrit.server.config.ConfigUtil;
 import com.google.gerrit.server.config.GerritServerConfig;
+import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
 import java.time.Duration;
+import java.util.concurrent.Executor;
 import org.eclipse.jgit.lib.Config;
 
 class DefaultMemoryCacheFactory implements MemoryCacheFactory {
+  static final String CACHE_EXECUTOR_PREFIX = "InMemoryCacheExecutor";
+  private static final int DEFAULT_CACHE_EXECUTOR_THREADS = -1;
+
   private final Config cfg;
   private final ForwardingRemovalListener.Factory forwardingRemovalListenerFactory;
+  private int executorThreads;
+  private final Executor executor;
 
   @Inject
   DefaultMemoryCacheFactory(
       @GerritServerConfig Config config,
-      ForwardingRemovalListener.Factory forwardingRemovalListenerFactory) {
+      ForwardingRemovalListener.Factory forwardingRemovalListenerFactory,
+      WorkQueue workQueue) {
     this.cfg = config;
     this.forwardingRemovalListenerFactory = forwardingRemovalListenerFactory;
+    this.executorThreads = config.getInt("cache", "threads", DEFAULT_CACHE_EXECUTOR_THREADS);
+
+    if (executorThreads == 0) {
+      executor = MoreExecutors.newDirectExecutorService();
+    } else if (executorThreads > DEFAULT_CACHE_EXECUTOR_THREADS) {
+      executor = workQueue.createQueue(executorThreads, CACHE_EXECUTOR_PREFIX);
+    } else {
+      executor = null;
+    }
   }
 
   @Override
@@ -114,6 +132,10 @@
     builder.maximumWeight(
         cfg.getLong("cache", def.configKey(), "memoryLimit", def.maximumWeight()));
     builder = builder.removalListener(newRemovalListener(def.name()));
+
+    if (executor != null) {
+      builder.executor(executor);
+    }
     builder.weigher(newWeigher(def.weigher()));
 
     Duration expireAfterWrite = def.expireAfterWrite();
diff --git a/javatests/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactoryTest.java b/javatests/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactoryTest.java
new file mode 100644
index 0000000..9e345c0
--- /dev/null
+++ b/javatests/com/google/gerrit/server/cache/mem/DefaultMemoryCacheFactoryTest.java
@@ -0,0 +1,263 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.gerrit.server.cache.mem;
+
+import static com.google.common.base.Functions.identity;
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.Weigher;
+import com.google.common.collect.ImmutableMap;
+import com.google.gerrit.metrics.DisabledMetricMaker;
+import com.google.gerrit.server.cache.CacheBackend;
+import com.google.gerrit.server.cache.CacheDef;
+import com.google.gerrit.server.cache.ForwardingRemovalListener;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.util.IdGenerator;
+import com.google.inject.Guice;
+import com.google.inject.TypeLiteral;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.mina.util.ConcurrentHashSet;
+import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DefaultMemoryCacheFactoryTest {
+
+  private static final String TEST_CACHE = "test-cache";
+  private static final long TEST_TIMEOUT_SEC = 1;
+  private static final int TEST_CACHE_KEY = 1;
+  private static final int TEST_CACHE_VALUE = 2;
+
+  private DefaultMemoryCacheFactory memoryCacheFactory;
+  private DefaultMemoryCacheFactory memoryCacheFactoryDirectExecutor;
+  private DefaultMemoryCacheFactory memoryCacheFactoryWithThreadPool;
+  private Config memoryCacheConfig;
+  private Config memoryCacheConfigDirectExecutor;
+  private Config memoryCacheConfigWithThreadPool;
+  private CyclicBarrier cacheGetStarted;
+  private CyclicBarrier cacheGetCompleted;
+  private CyclicBarrier evictionReceived;
+  private ForwardingRemovalTrackerListener forwardingRemovalListener;
+  private WorkQueue workQueue;
+
+  @Before
+  public void setUp() {
+
+    IdGenerator idGenerator = Guice.createInjector().getInstance(IdGenerator.class);
+    workQueue = new WorkQueue(idGenerator, 10, new DisabledMetricMaker());
+
+    memoryCacheConfig = new Config();
+    memoryCacheConfigDirectExecutor = new Config();
+    memoryCacheConfigDirectExecutor.setInt("cache", null, "threads", 0);
+    memoryCacheConfigWithThreadPool = new Config();
+    memoryCacheConfigWithThreadPool.setInt("cache", null, "threads", 1);
+
+    forwardingRemovalListener = new ForwardingRemovalTrackerListener();
+    memoryCacheFactory =
+        new DefaultMemoryCacheFactory(
+            memoryCacheConfig, (cache) -> forwardingRemovalListener, workQueue);
+    memoryCacheFactoryDirectExecutor =
+        new DefaultMemoryCacheFactory(
+            memoryCacheConfigDirectExecutor, (cache) -> forwardingRemovalListener, workQueue);
+    memoryCacheFactoryWithThreadPool =
+        new DefaultMemoryCacheFactory(
+            memoryCacheConfigWithThreadPool, (cache) -> forwardingRemovalListener, workQueue);
+    cacheGetStarted = new CyclicBarrier(2);
+    cacheGetCompleted = new CyclicBarrier(2);
+    evictionReceived = new CyclicBarrier(2);
+  }
+
+  @Test
+  public void shouldRunEvictionListenerInBackgroundByDefault() throws Exception {
+    shouldRunEvictionListenerInThreadPool(memoryCacheFactory, "ForkJoinPool");
+  }
+
+  @Test
+  public void shouldRunEvictionListenerInThreadPool() throws Exception {
+    shouldRunEvictionListenerInThreadPool(
+        memoryCacheFactoryWithThreadPool, DefaultMemoryCacheFactory.CACHE_EXECUTOR_PREFIX);
+  }
+
+  private void shouldRunEvictionListenerInThreadPool(
+      DefaultMemoryCacheFactory cacheFactory, String threadPoolPrefix) throws Exception {
+    LoadingCache<Integer, Integer> cache =
+        cacheFactory.build(newCacheDef(1), newCacheLoader(identity()), CacheBackend.CAFFEINE);
+
+    cache.put(TEST_CACHE_KEY, TEST_CACHE_VALUE);
+    cache.invalidate(TEST_CACHE_KEY);
+
+    assertThat(forwardingRemovalListener.contains(TEST_CACHE_KEY, TEST_CACHE_VALUE)).isFalse();
+
+    evictionReceived.await(TEST_TIMEOUT_SEC, TimeUnit.SECONDS);
+
+    assertThat(forwardingRemovalListener.contains(TEST_CACHE_KEY, TEST_CACHE_VALUE)).isTrue();
+    assertThat(forwardingRemovalListener.removalThreadName(TEST_CACHE_KEY))
+        .startsWith(threadPoolPrefix);
+  }
+
+  @Test
+  public void shouldRunEvictionListenerWithDirectExecutor() throws Exception {
+    LoadingCache<Integer, Integer> cache =
+        memoryCacheFactoryDirectExecutor.build(
+            newCacheDef(1), newCacheLoader(identity()), CacheBackend.CAFFEINE);
+
+    cache.put(TEST_CACHE_KEY, TEST_CACHE_VALUE);
+    cache.invalidate(TEST_CACHE_KEY);
+
+    assertThat(forwardingRemovalListener.contains(TEST_CACHE_KEY, TEST_CACHE_VALUE)).isTrue();
+  }
+
+  @Test
+  public void shouldLoadAllKeysWithDisabledCache() throws Exception {
+    LoadingCache<Integer, Integer> disabledCache =
+        memoryCacheFactory.build(newCacheDef(0), newCacheLoader(identity()), CacheBackend.CAFFEINE);
+
+    List<Integer> keys = Arrays.asList(1, 2);
+    ImmutableMap<Integer, Integer> entries = disabledCache.getAll(keys);
+
+    assertThat(entries).containsExactly(1, 1, 2, 2);
+  }
+
+  private CacheLoader<Integer, Integer> newCacheLoader(Function<Integer, Integer> loadFunc) {
+    return new CacheLoader<Integer, Integer>() {
+
+      @Override
+      public Integer load(Integer n) throws Exception {
+        Integer v = 0;
+        try {
+          cacheGetStarted.await(TEST_TIMEOUT_SEC, TimeUnit.SECONDS);
+          v = loadFunc.apply(n);
+          cacheGetCompleted.await(TEST_TIMEOUT_SEC, TimeUnit.SECONDS);
+        } catch (TimeoutException | BrokenBarrierException e) {
+        }
+        return v;
+      }
+
+      @Override
+      public Map<Integer, Integer> loadAll(Iterable<? extends Integer> keys) throws Exception {
+        return StreamSupport.stream(keys.spliterator(), false)
+            .collect(Collectors.toMap(identity(), identity()));
+      }
+    };
+  }
+
+  private class ForwardingRemovalTrackerListener extends ForwardingRemovalListener<Object, Object> {
+    private final ConcurrentHashMap<Object, Set<Object>> removalEvents;
+    private final ConcurrentHashMap<Object, String> removalThreads;
+
+    public ForwardingRemovalTrackerListener() {
+      super(null, null);
+
+      removalEvents = new ConcurrentHashMap<>();
+      removalThreads = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public void onRemoval(RemovalNotification<Object, Object> notification) {
+      Set<Object> setOfValues =
+          removalEvents.computeIfAbsent(
+              notification.getKey(),
+              (key) -> {
+                Set<Object> elements = new ConcurrentHashSet<>();
+                return elements;
+              });
+      setOfValues.add(notification.getValue());
+
+      removalThreads.put(notification.getKey(), Thread.currentThread().getName());
+
+      try {
+        evictionReceived.await(TEST_TIMEOUT_SEC, TimeUnit.SECONDS);
+      } catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+
+    private boolean contains(Object key, Object value) {
+      return Optional.ofNullable(removalEvents.get(key))
+          .map(sv -> sv.contains(value))
+          .orElse(false);
+    }
+
+    private String removalThreadName(Object key) {
+      return removalThreads.get(key);
+    }
+  }
+
+  private CacheDef<Integer, Integer> newCacheDef(long maximumWeight) {
+    return new CacheDef<Integer, Integer>() {
+
+      @Override
+      public String name() {
+        return TEST_CACHE;
+      }
+
+      @Override
+      public String configKey() {
+        return TEST_CACHE;
+      }
+
+      @Override
+      public TypeLiteral<Integer> keyType() {
+        return null;
+      }
+
+      @Override
+      public TypeLiteral<Integer> valueType() {
+        return null;
+      }
+
+      @Override
+      public long maximumWeight() {
+        return maximumWeight;
+      }
+
+      @Override
+      public Duration expireAfterWrite() {
+        return null;
+      }
+
+      @Override
+      public Duration expireFromMemoryAfterAccess() {
+        return null;
+      }
+
+      @Override
+      public Weigher<Integer, Integer> weigher() {
+        return null;
+      }
+
+      @Override
+      public CacheLoader<Integer, Integer> loader() {
+        return null;
+      }
+    };
+  }
+}