Merge branch 'stable-3.4' into stable-3.5

* stable-3.4:
  Provide default configuration for Gerrit persistent caches
  Introduce metric for caches that fall back to default config
  Add test that detects persistent caches without defaults
  Remove metrics when cache gets closed
  Use static, per-thread buffers for (de)serialization
  Add read/write metrics to TimedValueMarshaller
  ChronicleMapCacheIT: remove Truth8 qualifier from assertThat
  *Marshaller: get CacheSerilizer only once
  Persist caches keys index at configurable pace
  Add restore/persist operations related metrics
  Persist cache keys index to a file
  Add cache keys index metrics
  Avoid full cache scanning for pruning

Change-Id: Iab85e600cfdcb9709484d6f694a0e3639ac9f6b0
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCaches.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCaches.java
index 948aaee..a07d8c4 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCaches.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCaches.java
@@ -27,6 +27,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -52,8 +53,10 @@
   private final ChronicleMapCacheConfig.Factory configFactory;
   private final Path cacheDir;
   private final AdministerCachePermission adminCachePermission;
+  private final CachesWithoutChronicleMapConfigMetric metric;
 
   private boolean dryRun;
+  private boolean adjustCachesOnDefaults;
   private Optional<Long> optionalMaxEntries = Optional.empty();
   private Set<String> cacheNames = new HashSet<>();
 
@@ -63,11 +66,13 @@
       SitePaths site,
       DynamicMap<Cache<?, ?>> cacheMap,
       ChronicleMapCacheConfig.Factory configFactory,
-      AdministerCachePermission adminCachePermission) {
+      AdministerCachePermission adminCachePermission,
+      CachesWithoutChronicleMapConfigMetric metric) {
     this.cacheMap = cacheMap;
     this.configFactory = configFactory;
     this.cacheDir = getCacheDir(site, cfg.getString("cache", null, "directory"));
     this.adminCachePermission = adminCachePermission;
+    this.metric = metric;
   }
 
   public boolean isDryRun() {
@@ -78,6 +83,14 @@
     this.dryRun = dryRun;
   }
 
+  public boolean isAdjustCachesOnDefaults() {
+    return adjustCachesOnDefaults;
+  }
+
+  public void setAdjustCachesOnDefaults(boolean adjustCachesOnDefaults) {
+    this.adjustCachesOnDefaults = adjustCachesOnDefaults;
+  }
+
   public Optional<Long> getOptionalMaxEntries() {
     return optionalMaxEntries;
   }
@@ -206,13 +219,14 @@
 
     return configFactory.createWithValues(
         currentChronicleMapConfig.getConfigKey(),
-        resolveNewFile(currentChronicleMapConfig.getPersistedFile().getName()),
+        resolveNewFile(currentChronicleMapConfig.getCacheFile().getName()),
         currentChronicleMapConfig.getExpireAfterWrite(),
         currentChronicleMapConfig.getRefreshAfterWrite(),
         newMaxEntries,
         averageKeySize,
         averageValueSize,
-        currentChronicleMapConfig.getMaxBloatFactor());
+        currentChronicleMapConfig.getMaxBloatFactor(),
+        currentChronicleMapConfig.getPersistIndexEvery());
   }
 
   private long newMaxEntries(ChronicleMapCacheImpl<Object, Object> currentCache) {
@@ -261,6 +275,13 @@
 
   @SuppressWarnings({"unchecked", "rawtypes"})
   private Map<String, ChronicleMapCacheImpl<Object, Object>> getChronicleMapCaches() {
+    if (isAdjustCachesOnDefaults()) {
+      if (metric.cachesOnDefaults().isEmpty()) {
+        return Collections.emptyMap();
+      }
+      cacheNames.addAll(metric.cachesOnDefaults());
+    }
+
     return cacheMap.plugins().stream()
         .map(cacheMap::byPlugin)
         .flatMap(
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesCommand.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesCommand.java
index 580bd98..9e7efd3 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesCommand.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesCommand.java
@@ -49,6 +49,14 @@
     autoAdjustCachesEngine.setOptionalMaxEntries(Optional.of(maxEntries));
   }
 
+  @Option(
+      name = "--adjust-caches-on-defaults",
+      aliases = {"-a"},
+      usage = "Adjust caches that fall back to default configuration.")
+  public void setAdjustCachesOnDefaults(boolean adjustCachesOnDefaults) {
+    autoAdjustCachesEngine.setAdjustCachesOnDefaults(adjustCachesOnDefaults);
+  }
+
   @Argument(
       index = 0,
       required = false,
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesServlet.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesServlet.java
index 49f7a97..91950ec 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesServlet.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesServlet.java
@@ -57,6 +57,11 @@
             .or(() -> Optional.ofNullable(req.getParameter("d")))
             .isPresent());
 
+    autoAdjustCachesEngine.setAdjustCachesOnDefaults(
+        Optional.ofNullable(req.getParameter("adjust-caches-on-defaults"))
+            .or(() -> Optional.ofNullable(req.getParameter("a")))
+            .isPresent());
+
     autoAdjustCachesEngine.setOptionalMaxEntries(
         Optional.ofNullable(req.getParameter("max-entries"))
             .or(() -> Optional.ofNullable(req.getParameter("m")))
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheKeysIndex.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheKeysIndex.java
new file mode 100644
index 0000000..0730380
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheKeysIndex.java
@@ -0,0 +1,379 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.modules.cache.chroniclemap;
+
+import static java.util.stream.Collectors.toUnmodifiableSet;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
+import com.google.errorprone.annotations.CompatibleWith;
+import com.google.gerrit.extensions.registration.RegistrationHandle;
+import com.google.gerrit.metrics.Counter0;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Description.Units;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.metrics.Timer0;
+import com.google.gerrit.server.cache.serialize.CacheSerializer;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+class CacheKeysIndex<T> {
+  /** As opposed to TimedValue keys are equal when their key equals. */
+  class TimedKey {
+    private final T key;
+    private final long created;
+
+    private TimedKey(T key, long created) {
+      this.key = key;
+      this.created = created;
+    }
+
+    T getKey() {
+      return key;
+    }
+
+    long getCreated() {
+      return created;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o instanceof CacheKeysIndex.TimedKey) {
+        @SuppressWarnings("unchecked")
+        TimedKey other = (TimedKey) o;
+        return Objects.equals(key, other.key);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(key);
+    }
+  }
+
+  private class Metrics {
+    private final RegistrationHandle indexSize;
+    private final Timer0 addLatency;
+    private final Timer0 removeAndConsumeOlderThanLatency;
+    private final Timer0 removeAndConsumeLruKeyLatency;
+    private final Timer0 restoreLatency;
+    private final Timer0 persistLatency;
+    private final Counter0 restoreFailures;
+    private final Counter0 persistFailures;
+
+    private Metrics(MetricMaker metricMaker, String name) {
+      String sanitizedName = metricMaker.sanitizeMetricName(name);
+
+      indexSize =
+          metricMaker.newCallbackMetric(
+              "cache/chroniclemap/keys_index_size_" + sanitizedName,
+              Integer.class,
+              new Description(
+                  String.format(
+                      "The number of cache index keys for %s cache that are currently in memory",
+                      name)),
+              keys::size);
+
+      addLatency =
+          metricMaker.newTimer(
+              "cache/chroniclemap/keys_index_add_latency_" + sanitizedName,
+              new Description(
+                      String.format("The latency of adding key to the index for %s cache", name))
+                  .setCumulative()
+                  .setUnit(Units.NANOSECONDS));
+
+      removeAndConsumeOlderThanLatency =
+          metricMaker.newTimer(
+              "cache/chroniclemap/keys_index_remove_and_consume_older_than_latency_"
+                  + sanitizedName,
+              new Description(
+                      String.format(
+                          "The latency of removing and consuming all keys older than expiration"
+                              + " time for the index for %s cache",
+                          name))
+                  .setCumulative()
+                  .setUnit(Units.NANOSECONDS));
+
+      removeAndConsumeLruKeyLatency =
+          metricMaker.newTimer(
+              "cache/chroniclemap/keys_index_remove_lru_key_latency_" + sanitizedName,
+              new Description(
+                      String.format(
+                          "The latency of removing and consuming LRU key from the index for %s"
+                              + " cache",
+                          name))
+                  .setCumulative()
+                  .setUnit(Units.NANOSECONDS));
+
+      restoreLatency =
+          metricMaker.newTimer(
+              "cache/chroniclemap/keys_index_restore_latency_" + sanitizedName,
+              new Description(
+                      String.format("The latency of restoring %s cache's index from file", name))
+                  .setCumulative()
+                  .setUnit(Units.MICROSECONDS));
+
+      persistLatency =
+          metricMaker.newTimer(
+              "cache/chroniclemap/keys_index_persist_latency_" + sanitizedName,
+              new Description(
+                      String.format("The latency of perststing %s cache's index to file", name))
+                  .setCumulative()
+                  .setUnit(Units.MICROSECONDS));
+
+      restoreFailures =
+          metricMaker.newCounter(
+              "cache/chroniclemap/keys_index_restore_failures_" + sanitizedName,
+              new Description(
+                      String.format(
+                          "The number of errors caught when restore %s cache index from file operation was performed: ",
+                          name))
+                  .setCumulative()
+                  .setUnit("errors"));
+
+      persistFailures =
+          metricMaker.newCounter(
+              "cache/chroniclemap/keys_index_persist_failures_" + sanitizedName,
+              new Description(
+                      String.format(
+                          "The number of errors caught when persist %s cache index to file operation was performed: ",
+                          name))
+                  .setCumulative()
+                  .setUnit("errors"));
+    }
+
+    private void close() {
+      indexSize.remove();
+      addLatency.remove();
+      removeAndConsumeOlderThanLatency.remove();
+      removeAndConsumeLruKeyLatency.remove();
+      restoreLatency.remove();
+      persistLatency.remove();
+      restoreFailures.remove();
+      persistFailures.remove();
+    }
+  }
+
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final Set<TimedKey> keys;
+  private final Metrics metrics;
+  private final String name;
+  private final File indexFile;
+  private final File tempIndexFile;
+  private final AtomicBoolean persistInProgress;
+
+  CacheKeysIndex(MetricMaker metricMaker, String name, File indexFile, boolean cacheFileExists) {
+    this.keys = Collections.synchronizedSet(new LinkedHashSet<>());
+    this.metrics = new Metrics(metricMaker, name);
+    this.name = name;
+    this.indexFile = indexFile;
+    this.tempIndexFile = tempIndexFile(indexFile);
+    this.persistInProgress = new AtomicBoolean(false);
+    restore(cacheFileExists);
+  }
+
+  @SuppressWarnings("unchecked")
+  void add(@CompatibleWith("T") Object key, long created) {
+    Objects.requireNonNull(key, "Key value cannot be [null].");
+    TimedKey timedKey = new TimedKey((T) key, created);
+
+    // bubble up MRU key by re-adding it to a set
+    try (Timer0.Context timer = metrics.addLatency.start()) {
+      keys.remove(timedKey);
+      keys.add(timedKey);
+    }
+  }
+
+  void refresh(T key) {
+    add(key, System.currentTimeMillis());
+  }
+
+  void removeAndConsumeKeysOlderThan(long time, Consumer<T> consumer) {
+    try (Timer0.Context timer = metrics.removeAndConsumeOlderThanLatency.start()) {
+      Set<TimedKey> toRemoveAndConsume;
+      synchronized (keys) {
+        toRemoveAndConsume =
+            keys.stream().filter(key -> key.created < time).collect(toUnmodifiableSet());
+      }
+      toRemoveAndConsume.forEach(
+          key -> {
+            keys.remove(key);
+            consumer.accept(key.getKey());
+          });
+    }
+  }
+
+  boolean removeAndConsumeLruKey(Consumer<T> consumer) {
+    try (Timer0.Context timer = metrics.removeAndConsumeLruKeyLatency.start()) {
+      Optional<TimedKey> lruKey;
+      synchronized (keys) {
+        lruKey = keys.stream().findFirst();
+      }
+
+      return lruKey
+          .map(
+              key -> {
+                keys.remove(key);
+                consumer.accept(key.getKey());
+                return true;
+              })
+          .orElse(false);
+    }
+  }
+
+  void invalidate(@CompatibleWith("T") Object key) {
+    keys.remove(key);
+  }
+
+  void clear() {
+    keys.clear();
+  }
+
+  @VisibleForTesting
+  Set<TimedKey> keys() {
+    return keys;
+  }
+
+  void persist() {
+    if (!persistInProgress.compareAndSet(false, true)) {
+      logger.atWarning().log(
+          "Persist cache keys index %s to %s file is already in progress. This persist request was"
+              + " skipped.",
+          name, indexFile);
+      return;
+    }
+
+    logger.atInfo().log("Persisting cache keys index %s to %s file", name, indexFile);
+    try (Timer0.Context timer = metrics.persistLatency.start()) {
+      ArrayList<TimedKey> toPersist;
+      synchronized (keys) {
+        toPersist = new ArrayList<>(keys);
+      }
+      CacheSerializer<T> serializer = CacheSerializers.getKeySerializer(name);
+      try (DataOutputStream dos =
+          new DataOutputStream(new BufferedOutputStream(new FileOutputStream(tempIndexFile)))) {
+        for (TimedKey key : toPersist) {
+          writeKey(serializer, dos, key);
+        }
+        dos.flush();
+        indexFile.delete();
+        if (!tempIndexFile.renameTo(indexFile)) {
+          logger.atWarning().log(
+              "Renaming temporary index file %s to %s was not successful",
+              tempIndexFile, indexFile);
+          metrics.persistFailures.increment();
+          return;
+        }
+        logger.atInfo().log("Cache keys index %s was persisted to %s file", name, indexFile);
+      } catch (Exception e) {
+        logger.atSevere().withCause(e).log("Persisting cache keys index %s failed", name);
+        metrics.persistFailures.increment();
+      }
+    } finally {
+      persistInProgress.set(false);
+    }
+  }
+
+  void restore(boolean cacheFileExists) {
+    try {
+      if (tempIndexFile.isFile()) {
+        logger.atWarning().log(
+            "Gerrit was not closed properly as index persist operation was not finished: temporary"
+                + " index storage file %s exists for %s cache.",
+            tempIndexFile, name);
+        metrics.restoreFailures.increment();
+        if (!tempIndexFile.delete()) {
+          logger.atSevere().log(
+              "Cannot delete the temporary index storage file %s.", tempIndexFile);
+          return;
+        }
+      }
+
+      if (!indexFile.isFile() || !indexFile.canRead()) {
+        if (cacheFileExists) {
+          logger.atWarning().log(
+              "Restoring cache keys index %s not possible. File %s doesn't exist or cannot be read."
+                  + " Existing persisted entries will be pruned only when they are accessed after"
+                  + " Gerrit start.",
+              name, indexFile.getPath());
+          metrics.restoreFailures.increment();
+        }
+        return;
+      }
+
+      logger.atInfo().log("Restoring cache keys index %s from %s file", name, indexFile);
+      try (Timer0.Context timer = metrics.restoreLatency.start()) {
+        CacheSerializer<T> serializer = CacheSerializers.getKeySerializer(name);
+        try (DataInputStream dis =
+            new DataInputStream(new BufferedInputStream(new FileInputStream(indexFile)))) {
+          while (dis.available() > 0) {
+            keys.add(readKey(serializer, dis));
+          }
+          logger.atInfo().log("Cache keys index %s was restored from %s file", name, indexFile);
+        }
+      }
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("Restoring cache keys index %s failed", name);
+      metrics.restoreFailures.increment();
+    }
+  }
+
+  void close() {
+    persist();
+    metrics.close();
+  }
+
+  static final File tempIndexFile(File indexFile) {
+    return new File(String.format("%s.tmp", indexFile.getPath()));
+  }
+
+  private void writeKey(CacheSerializer<T> serializer, DataOutput out, TimedKey key)
+      throws IOException {
+    byte[] serializeKey = serializer.serialize(key.getKey());
+    out.writeLong(key.getCreated());
+    out.writeInt(serializeKey.length);
+    out.write(serializeKey);
+  }
+
+  private TimedKey readKey(CacheSerializer<T> serializer, DataInput in) throws IOException {
+    long created = in.readLong();
+    int keySize = in.readInt();
+    byte[] serializedKey = new byte[keySize];
+    in.readFully(serializedKey);
+    T key = serializer.deserialize(serializedKey);
+    return new TimedKey(key, created);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheSerializers.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheSerializers.java
index 8546981..387391e 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheSerializers.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheSerializers.java
@@ -13,10 +13,12 @@
 // limitations under the License.
 package com.googlesource.gerrit.modules.cache.chroniclemap;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.gerrit.server.cache.PersistentCacheDef;
 import com.google.gerrit.server.cache.serialize.CacheSerializer;
 import com.google.inject.Singleton;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 @Singleton
@@ -30,16 +32,6 @@
     registerCacheValueSerializer(cacheName, def.valueSerializer());
   }
 
-  static <K, V> void registerCacheKeySerializer(
-      String cacheName, CacheSerializer<K> keySerializer) {
-    keySerializers.computeIfAbsent(cacheName, (name) -> keySerializer);
-  }
-
-  static <K, V> void registerCacheValueSerializer(
-      String cacheName, CacheSerializer<V> valueSerializer) {
-    valueSerializers.computeIfAbsent(cacheName, (name) -> valueSerializer);
-  }
-
   @SuppressWarnings("unchecked")
   public static <K> CacheSerializer<K> getKeySerializer(String name) {
     if (keySerializers.containsKey(name)) {
@@ -55,4 +47,29 @@
     }
     throw new IllegalStateException("Could not find value serializer for " + name);
   }
+
+  @VisibleForTesting
+  static <K, V> void registerCacheKeySerializer(
+      String cacheName, CacheSerializer<K> keySerializer) {
+    keySerializers.computeIfAbsent(cacheName, (name) -> keySerializer);
+  }
+
+  @VisibleForTesting
+  static <K, V> void registerCacheValueSerializer(
+      String cacheName, CacheSerializer<V> valueSerializer) {
+    valueSerializers.computeIfAbsent(cacheName, (name) -> valueSerializer);
+  }
+
+  @VisibleForTesting
+  static Set<String> getSerializersNames() {
+    // caches registration during Gerrit's start is performed through registerCacheDef hence there
+    // is no need to check both maps for all serializers names
+    return keySerializers.keySet();
+  }
+
+  @VisibleForTesting
+  static void clear() {
+    keySerializers.clear();
+    valueSerializers.clear();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CachesWithoutChronicleMapConfigMetric.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CachesWithoutChronicleMapConfigMetric.java
new file mode 100644
index 0000000..0443fc6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CachesWithoutChronicleMapConfigMetric.java
@@ -0,0 +1,57 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.modules.cache.chroniclemap;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.metrics.Counter0;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+@Singleton
+class CachesWithoutChronicleMapConfigMetric {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final Set<String> uniqueCacheNames;
+  private final Counter0 numberOfCachesWithoutConfig;
+
+  @Inject
+  CachesWithoutChronicleMapConfigMetric(MetricMaker metricMaker) {
+    this.uniqueCacheNames = Collections.synchronizedSet(new HashSet<>());
+
+    String metricName = "cache/chroniclemap/caches_without_configuration";
+    this.numberOfCachesWithoutConfig =
+        metricMaker.newCounter(
+            metricName,
+            new Description(
+                    "The number of caches that have no chronicle map configuration provided in 'gerrit.config' and use defaults.")
+                .setUnit("caches"));
+  }
+
+  void incrementForCache(String name) {
+    if (uniqueCacheNames.add(name)) {
+      numberOfCachesWithoutConfig.increment();
+      logger.atWarning().log("Fall back to default configuration for '%s' cache", name);
+    }
+  }
+
+  Set<String> cachesOnDefaults() {
+    return uniqueCacheNames;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfig.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfig.java
index 79e4b38..e2c5afe 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfig.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfig.java
@@ -13,21 +13,31 @@
 // limitations under the License.
 package com.googlesource.gerrit.modules.cache.chroniclemap;
 
+import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheFactory.PRUNE_DELAY;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.common.Nullable;
 import com.google.gerrit.server.config.ConfigUtil;
 import com.google.gerrit.server.config.GerritServerConfig;
+import com.google.inject.Singleton;
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
 import java.io.File;
+import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Optional;
+import org.apache.commons.io.FilenameUtils;
 import org.eclipse.jgit.lib.Config;
 
 public class ChronicleMapCacheConfig {
-  private final File persistedFile;
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final File cacheFile;
+  private final boolean cacheFileExists;
+  private final File indexFile;
   private final long maxEntries;
   private final long averageKeySize;
   private final long averageValueSize;
@@ -35,59 +45,74 @@
   private final Duration refreshAfterWrite;
   private final int maxBloatFactor;
   private final int percentageFreeSpaceEvictionThreshold;
-  private final int percentageHotKeys;
   private final String configKey;
+  private final Duration persistIndexEvery;
+  private final long persistIndexEveryNthPrune;
 
   public interface Factory {
     ChronicleMapCacheConfig create(
         @Assisted("ConfigKey") String configKey,
-        @Assisted File persistedFile,
+        @Assisted File cacheFile,
         @Nullable @Assisted("ExpireAfterWrite") Duration expireAfterWrite,
         @Nullable @Assisted("RefreshAfterWrite") Duration refreshAfterWrite);
 
     ChronicleMapCacheConfig createWithValues(
         @Assisted("ConfigKey") String configKey,
-        @Assisted File persistedFile,
+        @Assisted File cacheFile,
         @Nullable @Assisted("ExpireAfterWrite") Duration expireAfterWrite,
         @Nullable @Assisted("RefreshAfterWrite") Duration refreshAfterWrite,
         @Assisted("maxEntries") long maxEntries,
         @Assisted("avgKeySize") long avgKeySize,
         @Assisted("avgValueSize") long avgValueSize,
-        @Assisted("maxBloatFactor") int maxBloatFactor);
+        @Assisted("maxBloatFactor") int maxBloatFactor,
+        @Assisted("PersistIndexEvery") Duration persistIndexEver);
   }
 
   @AssistedInject
   ChronicleMapCacheConfig(
       @GerritServerConfig Config cfg,
+      CachesWithoutChronicleMapConfigMetric cachesWithoutConfigMetric,
       @Assisted("ConfigKey") String configKey,
-      @Assisted File persistedFile,
+      @Assisted File cacheFile,
       @Nullable @Assisted("ExpireAfterWrite") Duration expireAfterWrite,
       @Nullable @Assisted("RefreshAfterWrite") Duration refreshAfterWrite) {
 
     this(
         cfg,
+        cachesWithoutConfigMetric,
         configKey,
-        persistedFile,
+        cacheFile,
         expireAfterWrite,
         refreshAfterWrite,
         cfg.getLong("cache", configKey, "maxEntries", Defaults.maxEntriesFor(configKey)),
         cfg.getLong("cache", configKey, "avgKeySize", Defaults.averageKeySizeFor(configKey)),
         cfg.getLong("cache", configKey, "avgValueSize", Defaults.avgValueSizeFor(configKey)),
-        cfg.getInt("cache", configKey, "maxBloatFactor", Defaults.maxBloatFactorFor(configKey)));
+        cfg.getInt("cache", configKey, "maxBloatFactor", Defaults.maxBloatFactorFor(configKey)),
+        Duration.ofSeconds(
+            cfg.getTimeUnit(
+                "cache",
+                null,
+                "persistIndexEvery",
+                Defaults.persistIndexEvery().toSeconds(),
+                SECONDS)));
   }
 
   @AssistedInject
   ChronicleMapCacheConfig(
       @GerritServerConfig Config cfg,
+      CachesWithoutChronicleMapConfigMetric cachesWithoutConfigMetric,
       @Assisted("ConfigKey") String configKey,
-      @Assisted File persistedFile,
+      @Assisted File cacheFile,
       @Nullable @Assisted("ExpireAfterWrite") Duration expireAfterWrite,
       @Nullable @Assisted("RefreshAfterWrite") Duration refreshAfterWrite,
       @Assisted("maxEntries") long maxEntries,
       @Assisted("avgKeySize") long avgKeySize,
       @Assisted("avgValueSize") long avgValueSize,
-      @Assisted("maxBloatFactor") int maxBloatFactor) {
-    this.persistedFile = persistedFile;
+      @Assisted("maxBloatFactor") int maxBloatFactor,
+      @Assisted("PersistIndexEvery") Duration persistIndexEvery) {
+    this.cacheFile = cacheFile;
+    this.cacheFileExists = cacheFile.exists();
+    this.indexFile = resolveIndexFile(cacheFile);
     this.configKey = configKey;
 
     this.maxEntries = maxEntries;
@@ -116,22 +141,30 @@
             "percentageFreeSpaceEvictionThreshold",
             Defaults.percentageFreeSpaceEvictionThreshold());
 
-    this.percentageHotKeys =
-        cfg.getInt("cache", configKey, "percentageHotKeys", Defaults.percentageHotKeys());
-
-    if (percentageHotKeys <= 0 || percentageHotKeys >= 100) {
-      throw new IllegalArgumentException("Invalid 'percentageHotKeys': should be in range [1-99]");
+    long persistIndexEverySeconds = persistIndexEvery.getSeconds();
+    if (persistIndexEverySeconds < PRUNE_DELAY) {
+      logger.atWarning().log(
+          "Configured 'persistIndexEvery' duration [%ds] is lower than minimal threshold [%ds]."
+              + " Minimal threshold will be used.",
+          persistIndexEverySeconds, PRUNE_DELAY);
+      persistIndexEverySeconds = PRUNE_DELAY;
+    } else if (persistIndexEverySeconds % PRUNE_DELAY != 0L) {
+      logger.atWarning().log(
+          "Configured 'persistIndexEvery' duration [%ds] will be rounded down to a multiple of"
+              + " [%ds]. [%ds] is the minimal 'persistIndexEvery' resolution.",
+          persistIndexEverySeconds, PRUNE_DELAY, PRUNE_DELAY);
+      persistIndexEverySeconds = (persistIndexEverySeconds / PRUNE_DELAY) * PRUNE_DELAY;
     }
+    this.persistIndexEvery = Duration.ofSeconds(persistIndexEverySeconds);
+    this.persistIndexEveryNthPrune = persistIndexEverySeconds / PRUNE_DELAY;
+
+    emitMetricForMissingConfig(cfg, cachesWithoutConfigMetric, configKey);
   }
 
   public int getPercentageFreeSpaceEvictionThreshold() {
     return percentageFreeSpaceEvictionThreshold;
   }
 
-  public int getpercentageHotKeys() {
-    return percentageHotKeys;
-  }
-
   public Duration getExpireAfterWrite() {
     return expireAfterWrite;
   }
@@ -140,12 +173,28 @@
     return refreshAfterWrite;
   }
 
+  public Duration getPersistIndexEvery() {
+    return persistIndexEvery;
+  }
+
+  public long getPersistIndexEveryNthPrune() {
+    return persistIndexEveryNthPrune;
+  }
+
   public long getMaxEntries() {
     return maxEntries;
   }
 
-  public File getPersistedFile() {
-    return persistedFile;
+  public File getCacheFile() {
+    return cacheFile;
+  }
+
+  public boolean getCacheFileExists() {
+    return cacheFileExists;
+  }
+
+  public File getIndexFile() {
+    return indexFile;
   }
 
   public long getAverageKeySize() {
@@ -164,11 +213,41 @@
     return configKey;
   }
 
+  private static void emitMetricForMissingConfig(
+      Config cfg,
+      CachesWithoutChronicleMapConfigMetric cachesWithoutConfigMetric,
+      String configKey) {
+    if (!cfg.getSubsections("cache").stream()
+        .filter(cache -> cache.equalsIgnoreCase(configKey))
+        .filter(
+            cache ->
+                isConfigured(cfg, cache, "maxEntries")
+                    || isConfigured(cfg, cache, "avgKeySize")
+                    || isConfigured(cfg, cache, "avgValueSize")
+                    || isConfigured(cfg, cache, "maxBloatFactor"))
+        .findAny()
+        .isPresent()) {
+      cachesWithoutConfigMetric.incrementForCache(configKey);
+    }
+  }
+
+  private static boolean isConfigured(Config cfg, String configKey, String name) {
+    return cfg.getString("cache", configKey, name) != null;
+  }
+
+  private static File resolveIndexFile(File persistedCacheFile) {
+    String cacheFileName = persistedCacheFile.getName();
+    String indexFileName = String.format("%s.index", FilenameUtils.getBaseName(cacheFileName));
+
+    return Path.of(persistedCacheFile.getParent()).resolve(indexFileName).toFile();
+  }
+
   private static long toSeconds(@Nullable Duration duration) {
     return duration != null ? duration.getSeconds() : 0;
   }
 
-  protected static class Defaults {
+  @Singleton
+  static class Defaults {
 
     public static final long DEFAULT_MAX_ENTRIES = 1000;
 
@@ -178,24 +257,32 @@
     public static final int DEFAULT_MAX_BLOAT_FACTOR = 1;
 
     public static final int DEFAULT_PERCENTAGE_FREE_SPACE_EVICTION_THRESHOLD = 90;
-    public static final int DEFAULT_PERCENTAGE_HOT_KEYS = 50;
 
-    private static final ImmutableMap<String, DefaultConfig> defaultMap =
+    public static final Duration DEFAULT_PERSIST_INDEX_EVERY = Duration.ofMinutes(15);
+
+    @VisibleForTesting
+    static final ImmutableMap<String, DefaultConfig> defaultMap =
         new ImmutableMap.Builder<String, DefaultConfig>()
-            .put("web_sessions", DefaultConfig.create(45, 221, 1000, 1))
-            .put("change_notes", DefaultConfig.create(36, 10240, 1000, 3))
             .put("accounts", DefaultConfig.create(30, 256, 1000, 1))
-            .put("gerrit_file_diff", DefaultConfig.create(98, 10240, 1000, 2))
-            .put("git_file_diff", DefaultConfig.create(98, 10240, 1000, 2))
+            .put("approvals", DefaultConfig.create(103, 365, 1000, 3))
+            .put("change_kind", DefaultConfig.create(59, 26, 1000, 1))
+            .put("change_notes", DefaultConfig.create(36, 10240, 1000, 3))
+            .put("comment_context", DefaultConfig.create(80, 662, 2000, 3))
+            .put("conflicts", DefaultConfig.create(70, 16, 1000, 1))
             .put("diff_intraline", DefaultConfig.create(512, 2048, 1000, 2))
             .put("diff_summary", DefaultConfig.create(128, 2048, 1000, 1))
             .put("external_ids_map", DefaultConfig.create(128, 204800, 2, 1))
-            .put("oauth_tokens", DefaultConfig.create(8, 2048, 1000, 1))
-            .put("change_kind", DefaultConfig.create(59, 26, 1000, 1))
+            .put("gerrit_file_diff", DefaultConfig.create(342, 883, 1000, 1))
+            .put("git_file_diff", DefaultConfig.create(349, 943, 1000, 1))
+            .put("git_modified_files", DefaultConfig.create(349, 943, 1000, 1))
+            .put("git_tags", DefaultConfig.create(43, 6673, 1000, 3))
+            .put("groups_byuuid_persisted", DefaultConfig.create(64, 182, 1000, 1))
             .put("mergeability", DefaultConfig.create(79, 16, 65000, 2))
-            .put("pure_revert", DefaultConfig.create(55, 16, 1000, 1))
+            .put("modified_files", DefaultConfig.create(138, 2600, 1000, 1))
+            .put("oauth_tokens", DefaultConfig.create(8, 2048, 1000, 1))
             .put("persisted_projects", DefaultConfig.create(128, 1024, 250, 2))
-            .put("conflicts", DefaultConfig.create(70, 16, 1000, 1))
+            .put("pure_revert", DefaultConfig.create(55, 16, 1000, 1))
+            .put("web_sessions", DefaultConfig.create(45, 221, 1000, 1))
             .build();
 
     public static long averageKeySizeFor(String configKey) {
@@ -226,8 +313,8 @@
       return DEFAULT_PERCENTAGE_FREE_SPACE_EVICTION_THRESHOLD;
     }
 
-    public static int percentageHotKeys() {
-      return DEFAULT_PERCENTAGE_HOT_KEYS;
+    public static Duration persistIndexEvery() {
+      return DEFAULT_PERSIST_INDEX_EVERY;
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheFactory.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheFactory.java
index 1161699..e4474b0 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheFactory.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheFactory.java
@@ -46,6 +46,8 @@
 
 @Singleton
 class ChronicleMapCacheFactory extends PersistentCacheBaseFactory implements LifecycleListener {
+  static final long PRUNE_DELAY = 30;
+
   private final ChronicleMapCacheConfig.Factory configFactory;
   private final MetricMaker metricMaker;
   private final DynamicMap<Cache<?, ?>> cacheMap;
@@ -53,6 +55,7 @@
   private final ScheduledExecutorService cleanup;
 
   private final LoggingContextAwareExecutorService storePersistenceExecutor;
+  private final LoggingContextAwareExecutorService indexPersistenceExecutor;
 
   @Inject
   ChronicleMapCacheFactory(
@@ -79,6 +82,10 @@
         new LoggingContextAwareExecutorService(
             Executors.newFixedThreadPool(
                 1, new ThreadFactoryBuilder().setNameFormat("ChronicleMap-Store-%d").build()));
+    this.indexPersistenceExecutor =
+        new LoggingContextAwareExecutorService(
+            Executors.newFixedThreadPool(
+                1, new ThreadFactoryBuilder().setNameFormat("ChronicleMap-Index-%d").build()));
   }
 
   @Override
@@ -121,7 +128,8 @@
               config,
               metricMaker,
               memLoader,
-              new InMemoryCacheLoadingFromStoreImpl<>(mem, false));
+              new InMemoryCacheLoadingFromStoreImpl<>(mem, false),
+              indexPersistenceExecutor);
 
     } catch (IOException e) {
       throw new UncheckedIOException(e);
@@ -173,7 +181,8 @@
               config,
               metricMaker,
               memLoader,
-              new InMemoryCacheLoadingFromStoreImpl<>(mem, true));
+              new InMemoryCacheLoadingFromStoreImpl<>(mem, true),
+              indexPersistenceExecutor);
     } catch (IOException e) {
       throw new UncheckedIOException(e);
     }
@@ -198,13 +207,19 @@
   @Override
   public void start() {
     for (ChronicleMapCacheImpl<?, ?> cache : caches) {
-      cleanup.scheduleWithFixedDelay(cache::prune, 30, 30, TimeUnit.SECONDS);
+      cleanup.scheduleWithFixedDelay(cache::prune, PRUNE_DELAY, PRUNE_DELAY, TimeUnit.SECONDS);
     }
   }
 
   @Override
   public void stop() {
     cleanup.shutdownNow();
+    for (ChronicleMapCacheImpl<?, ?> cache : caches) {
+      cache.close();
+    }
+    caches.clear();
+    storePersistenceExecutor.shutdown();
+    indexPersistenceExecutor.shutdown();
   }
 
   public static File fileName(Path cacheDir, String name, Integer version) {
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheImpl.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheImpl.java
index b704a7b..7efc8b5 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheImpl.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheImpl.java
@@ -17,7 +17,6 @@
 import com.google.common.cache.CacheStats;
 import com.google.common.flogger.FluentLogger;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.gerrit.metrics.Description;
 import com.google.gerrit.metrics.DisabledMetricMaker;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.server.cache.PersistentCache;
@@ -29,6 +28,7 @@
 import java.time.Instant;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.LongAdder;
 import net.openhft.chronicle.map.ChronicleMap;
 import net.openhft.chronicle.map.ChronicleMapBuilder;
@@ -45,10 +45,12 @@
   private final LongAdder loadSuccessCount = new LongAdder();
   private final LongAdder loadExceptionCount = new LongAdder();
   private final LongAdder totalLoadTime = new LongAdder();
-  private final InMemoryLRU<K> hotEntries;
+  private final CacheKeysIndex<K> keysIndex;
   private final PersistentCacheDef<K, V> cacheDefinition;
   private final ChronicleMapCacheLoader<K, V> memLoader;
   private final InMemoryCache<K, V> mem;
+  private final Executor indexPersistenceExecutor;
+  private long pruneCount;
 
   ChronicleMapCacheImpl(PersistentCacheDef<K, V> def, ChronicleMapCacheConfig config)
       throws IOException {
@@ -56,17 +58,15 @@
 
     this.cacheDefinition = def;
     this.config = config;
-    this.hotEntries =
-        new InMemoryLRU<>(
-            (int) Math.max(config.getMaxEntries() * config.getpercentageHotKeys() / 100, 1));
+    this.keysIndex =
+        new CacheKeysIndex<>(
+            metricMaker, def.name(), config.getIndexFile(), config.getCacheFileExists());
     this.store = createOrRecoverStore(def, config, metricMaker);
     this.memLoader =
         new ChronicleMapCacheLoader<>(
             MoreExecutors.directExecutor(), store, config.getExpireAfterWrite());
     this.mem = memLoader.asInMemoryCacheBypass();
-
-    ChronicleMapCacheMetrics metrics = new ChronicleMapCacheMetrics(metricMaker);
-    metrics.registerCallBackMetrics(def.name(), this);
+    this.indexPersistenceExecutor = MoreExecutors.directExecutor();
   }
 
   ChronicleMapCacheImpl(
@@ -74,19 +74,18 @@
       ChronicleMapCacheConfig config,
       MetricMaker metricMaker,
       ChronicleMapCacheLoader<K, V> memLoader,
-      InMemoryCache<K, V> mem) {
+      InMemoryCache<K, V> mem,
+      Executor indexPersistenceExecutor) {
 
     this.cacheDefinition = def;
     this.config = config;
-    this.hotEntries =
-        new InMemoryLRU<>(
-            (int) Math.max(config.getMaxEntries() * config.getpercentageHotKeys() / 100, 1));
+    this.keysIndex =
+        new CacheKeysIndex<>(
+            metricMaker, def.name(), config.getIndexFile(), config.getCacheFileExists());
     this.memLoader = memLoader;
     this.mem = mem;
     this.store = memLoader.getStore();
-
-    ChronicleMapCacheMetrics metrics = new ChronicleMapCacheMetrics(metricMaker);
-    metrics.registerCallBackMetrics(def.name(), this);
+    this.indexPersistenceExecutor = indexPersistenceExecutor;
   }
 
   @SuppressWarnings({"unchecked", "cast", "rawtypes"})
@@ -111,7 +110,9 @@
     }
 
     mapBuilder.averageValueSize(config.getAverageValueSize());
-    mapBuilder.valueMarshaller(new TimedValueMarshaller<>(def.name()));
+
+    TimedValueMarshaller<V> valueMarshaller = new TimedValueMarshaller<>(metricMaker, def.name());
+    mapBuilder.valueMarshaller(valueMarshaller);
 
     mapBuilder.entries(config.getMaxEntries());
 
@@ -123,12 +124,12 @@
             + "a function of the number of entries in the cache",
         def.diskLimit(), def.name());
     ChronicleMap<KeyWrapper<K>, TimedValue<V>> store =
-        mapBuilder.createOrRecoverPersistedTo(config.getPersistedFile());
+        mapBuilder.createOrRecoverPersistedTo(config.getCacheFile());
 
     logger.atInfo().log(
         "Initialized '%s'|version: %s|avgKeySize: %s bytes|avgValueSize:"
             + " %s bytes|entries: %s|maxBloatFactor: %s|remainingAutoResizes:"
-            + " %s|percentageFreeSpace: %s",
+            + " %s|percentageFreeSpace: %s|persistIndexEvery: %s",
         def.name(),
         def.version(),
         mapBuilder.constantlySizedKeys() ? "CONSTANT" : config.getAverageKeySize(),
@@ -136,45 +137,22 @@
         config.getMaxEntries(),
         config.getMaxBloatFactor(),
         store.remainingAutoResizes(),
-        store.percentageFreeSpace());
+        store.percentageFreeSpace(),
+        config.getPersistIndexEvery());
 
-    return new ChronicleMapStore<>(store, config, metricMaker);
+    return new ChronicleMapStore<K, V>(store, config, metricMaker) {
+      @Override
+      public void close() {
+        super.close();
+        valueMarshaller.close();
+      }
+    };
   }
 
   protected PersistentCacheDef<K, V> getCacheDefinition() {
     return cacheDefinition;
   }
 
-  private static class ChronicleMapCacheMetrics {
-
-    private final MetricMaker metricMaker;
-
-    ChronicleMapCacheMetrics(MetricMaker metricMaker) {
-      this.metricMaker = metricMaker;
-    }
-
-    <K, V> void registerCallBackMetrics(String name, ChronicleMapCacheImpl<K, V> cache) {
-      String sanitizedName = metricMaker.sanitizeMetricName(name);
-      String HOT_KEYS_CAPACITY_METRIC = "cache/chroniclemap/hot_keys_capacity_" + sanitizedName;
-      String HOT_KEYS_SIZE_METRIC = "cache/chroniclemap/hot_keys_size_" + sanitizedName;
-
-      metricMaker.newConstantMetric(
-          HOT_KEYS_CAPACITY_METRIC,
-          cache.hotEntries.getCapacity(),
-          new Description(
-              String.format(
-                  "The number of hot cache keys for %s cache that can be kept in memory", name)));
-
-      metricMaker.newCallbackMetric(
-          HOT_KEYS_SIZE_METRIC,
-          Integer.class,
-          new Description(
-              String.format(
-                  "The number of hot cache keys for %s cache that are currently in memory", name)),
-          cache.hotEntries::size);
-    }
-  }
-
   public ChronicleMapCacheConfig getConfig() {
     return config;
   }
@@ -187,6 +165,7 @@
       return null;
     }
 
+    keysIndex.add(objKey, timedValue.getCreated());
     return timedValue.getValue();
   }
 
@@ -199,6 +178,9 @@
       if (needsRefresh(valueHolder.getCreated())) {
         store.remove(keyWrapper);
         mem.refresh(key);
+        keysIndex.refresh(key);
+      } else {
+        keysIndex.add(key, valueHolder.getCreated());
       }
       return valueHolder.getValue();
     }
@@ -211,11 +193,12 @@
   @Override
   public V get(K key, Callable<? extends V> valueLoader) throws ExecutionException {
     try {
-      return mem.get(key, () -> getFromStore(key, valueLoader)).getValue();
+      TimedValue<V> value = mem.get(key, () -> getFromStore(key, valueLoader));
+      keysIndex.add(key, value.getCreated());
+      return value.getValue();
+    } catch (ExecutionException e) {
+      throw e;
     } catch (Exception e) {
-      if (e instanceof ExecutionException) {
-        throw (ExecutionException) e;
-      }
       throw new ExecutionException(e);
     }
   }
@@ -259,6 +242,7 @@
     KeyWrapper<?> wrappedKey = new KeyWrapper<>(key);
     if (store.tryPut((KeyWrapper<K>) wrappedKey, (TimedValue<V>) wrappedValue)) {
       mem.put((K) key, (TimedValue<V>) wrappedValue);
+      keysIndex.add(key, wrappedValue.getCreated());
     }
   }
 
@@ -276,6 +260,7 @@
   public void putUnchecked(KeyWrapper<Object> wrappedKey, TimedValue<Object> wrappedValue) {
     if (store.tryPut((KeyWrapper<K>) wrappedKey, (TimedValue<V>) wrappedValue)) {
       mem.put((K) wrappedKey.getValue(), (TimedValue<V>) wrappedValue);
+      keysIndex.add(wrappedKey.getValue(), wrappedValue.getCreated());
     }
   }
 
@@ -284,32 +269,29 @@
     TimedValue<V> timedVal = new TimedValue<>(val);
     if (putTimedToStore(key, timedVal)) {
       mem.put(key, timedVal);
+      keysIndex.add(key, timedVal.getCreated());
     }
   }
 
   boolean putTimedToStore(K key, TimedValue<V> timedVal) {
     KeyWrapper<K> wrappedKey = new KeyWrapper<>(key);
-    boolean putSuccess = store.tryPut(wrappedKey, timedVal);
-    if (putSuccess) {
-      hotEntries.add(key);
-    }
-    return putSuccess;
+    return store.tryPut(wrappedKey, timedVal);
   }
 
   public void prune() {
     if (!config.getExpireAfterWrite().isZero()) {
-      store.forEachEntry(
-          c -> {
-            if (memLoader.expired(c.value().get().getCreated())) {
-              hotEntries.remove(c.key().get().getValue());
-              c.context().remove(c);
-            }
-          });
+      long expirationTime = System.currentTimeMillis() - config.getExpireAfterWrite().toMillis();
+      keysIndex.removeAndConsumeKeysOlderThan(
+          expirationTime, key -> store.remove(new KeyWrapper<>(key)));
     }
 
     if (runningOutOfFreeSpace()) {
       evictColdEntries();
     }
+
+    if (++pruneCount % config.getPersistIndexEveryNthPrune() == 0L) {
+      indexPersistenceExecutor.execute(keysIndex::persist);
+    }
   }
 
   private boolean needsRefresh(long created) {
@@ -324,13 +306,8 @@
   }
 
   private void evictColdEntries() {
-    store.forEachEntryWhile(
-        e -> {
-          if (!hotEntries.contains(e.key().get().getValue())) {
-            e.doRemove();
-          }
-          return runningOutOfFreeSpace();
-        });
+    while (runningOutOfFreeSpace()
+        && keysIndex.removeAndConsumeLruKey(key -> store.remove(new KeyWrapper<>(key)))) ;
   }
 
   @SuppressWarnings("unchecked")
@@ -339,14 +316,14 @@
     KeyWrapper<K> wrappedKey = (KeyWrapper<K>) new KeyWrapper<>(key);
     store.remove(wrappedKey);
     mem.invalidate(key);
-    hotEntries.remove((K) key);
+    keysIndex.invalidate(key);
   }
 
   @Override
   public void invalidateAll() {
     store.clear();
-    hotEntries.invalidateAll();
     mem.invalidateAll();
+    keysIndex.clear();
   }
 
   ChronicleMap<KeyWrapper<K>, TimedValue<V>> getStore() {
@@ -367,7 +344,7 @@
   public DiskStats diskStats() {
     return new DiskStats(
         store.longSize(),
-        config.getPersistedFile().length(),
+        config.getCacheFile().length(),
         hitCount.longValue(),
         missCount.longValue());
   }
@@ -378,6 +355,7 @@
 
   public void close() {
     store.close();
+    keysIndex.close();
   }
 
   public double percentageUsedAutoResizes() {
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheModule.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheModule.java
index 99fe5c1..a9b3856 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheModule.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheModule.java
@@ -26,5 +26,6 @@
     factory(ChronicleMapCacheConfig.Factory.class);
     bind(PersistentCacheFactory.class).to(ChronicleMapCacheFactory.class);
     listener().to(ChronicleMapCacheFactory.class);
+    bind(CachesWithoutChronicleMapConfigMetric.class).asEagerSingleton();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStore.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStore.java
index 0419ca4..38759f4 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStore.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStore.java
@@ -320,6 +320,7 @@
   @Override
   public void close() {
     store.close();
+    metrics.close();
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStoreMetrics.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStoreMetrics.java
index cf758b5..c27be99 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStoreMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStoreMetrics.java
@@ -14,15 +14,19 @@
 
 package com.googlesource.gerrit.modules.cache.chroniclemap;
 
+import com.google.gerrit.extensions.registration.RegistrationHandle;
 import com.google.gerrit.metrics.Counter0;
 import com.google.gerrit.metrics.Description;
 import com.google.gerrit.metrics.MetricMaker;
+import java.util.HashSet;
+import java.util.Set;
 
 class ChronicleMapStoreMetrics {
   private final String sanitizedName;
   private final MetricMaker metricMaker;
   private final String name;
   private final Counter0 storePutFailures;
+  private final Set<RegistrationHandle> callbacks;
 
   ChronicleMapStoreMetrics(String name, MetricMaker metricMaker) {
     this.name = name;
@@ -37,6 +41,7 @@
                         + name)
                 .setCumulative()
                 .setUnit("errors"));
+    this.callbacks = new HashSet<>(3, 1.0F);
   }
 
   void incrementPutFailures() {
@@ -50,27 +55,37 @@
         "cache/chroniclemap/remaining_autoresizes_" + sanitizedName;
     String MAX_AUTORESIZES_METRIC = "cache/chroniclemap/max_autoresizes_" + sanitizedName;
 
-    metricMaker.newCallbackMetric(
-        PERCENTAGE_FREE_SPACE_METRIC,
-        Long.class,
-        new Description(
-            String.format("The amount of free space in the %s cache as a percentage", name)),
-        () -> (long) store.percentageFreeSpace());
+    callbacks.add(
+        metricMaker.newCallbackMetric(
+            PERCENTAGE_FREE_SPACE_METRIC,
+            Long.class,
+            new Description(
+                String.format("The amount of free space in the %s cache as a percentage", name)),
+            () -> (long) store.percentageFreeSpace()));
 
-    metricMaker.newCallbackMetric(
-        REMAINING_AUTORESIZES_METRIC,
-        Integer.class,
-        new Description(
-            String.format(
-                "The number of times the %s cache can automatically expand its capacity", name)),
-        store::remainingAutoResizes);
+    callbacks.add(
+        metricMaker.newCallbackMetric(
+            REMAINING_AUTORESIZES_METRIC,
+            Integer.class,
+            new Description(
+                String.format(
+                    "The number of times the %s cache can automatically expand its capacity",
+                    name)),
+            store::remainingAutoResizes));
 
-    metricMaker.newConstantMetric(
-        MAX_AUTORESIZES_METRIC,
-        store.maxAutoResizes(),
-        new Description(
-            String.format(
-                "The maximum number of times the %s cache can automatically expand its capacity",
-                name)));
+    callbacks.add(
+        metricMaker.newConstantMetric(
+            MAX_AUTORESIZES_METRIC,
+            store.maxAutoResizes(),
+            new Description(
+                String.format(
+                    "The maximum number of times the %s cache can automatically expand its capacity",
+                    name))));
+  }
+
+  void close() {
+    storePutFailures.remove();
+    callbacks.forEach(RegistrationHandle::remove);
+    callbacks.clear();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/H2MigrationServlet.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/H2MigrationServlet.java
index c74594f..287fe72 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/H2MigrationServlet.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/H2MigrationServlet.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.modules.cache.chroniclemap;
 
+import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.persistIndexEvery;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.H2CacheCommand.H2_SUFFIX;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.H2CacheCommand.getStats;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.H2CacheCommand.jdbcUrl;
@@ -300,7 +301,8 @@
         stats.size() * sizeMultiplier,
         stats.avgKeySize(),
         stats.avgValueSize(),
-        maxBloatFactor);
+        maxBloatFactor,
+        persistIndexEvery());
   }
 
   private void doMigrate(
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/InMemoryLRU.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/InMemoryLRU.java
deleted file mode 100644
index dcf7a68..0000000
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/InMemoryLRU.java
+++ /dev/null
@@ -1,77 +0,0 @@
-// Copyright (C) 2020 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.googlesource.gerrit.modules.cache.chroniclemap;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public class InMemoryLRU<K> {
-
-  private final Map<K, Boolean> LRUMap;
-
-  private static final Boolean dummyValue = Boolean.TRUE;
-  private final int capacity;
-
-  public InMemoryLRU(int capacity) {
-    this.capacity = capacity;
-
-    LRUMap =
-        Collections.synchronizedMap(
-            new LinkedHashMap<K, Boolean>(capacity, 0.75f, true) {
-              private static final long serialVersionUID = 1L;
-
-              @Override
-              protected boolean removeEldestEntry(Map.Entry<K, Boolean> eldest) {
-                return size() > capacity;
-              }
-            });
-  }
-
-  public void add(K objKey) {
-    LRUMap.putIfAbsent(objKey, dummyValue);
-  }
-
-  public boolean contains(K key) {
-    return LRUMap.containsKey(key);
-  }
-
-  /**
-   * Remove a key from the map
-   *
-   * @param key element to remove from the map
-   * @return true when key was in the map, null otherwise
-   */
-  public Boolean remove(K key) {
-    return LRUMap.remove(key);
-  }
-
-  public void invalidateAll() {
-    LRUMap.clear();
-  }
-
-  public int size() {
-    return LRUMap.size();
-  }
-
-  @VisibleForTesting
-  protected Object[] toArray() {
-    return LRUMap.keySet().toArray();
-  }
-
-  public int getCapacity() {
-    return capacity;
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/KeyWrapperMarshaller.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/KeyWrapperMarshaller.java
index 53de07c..1998c3d 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/KeyWrapperMarshaller.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/KeyWrapperMarshaller.java
@@ -13,6 +13,7 @@
 // limitations under the License.
 package com.googlesource.gerrit.modules.cache.chroniclemap;
 
+import com.google.gerrit.server.cache.serialize.CacheSerializer;
 import net.openhft.chronicle.bytes.Bytes;
 import net.openhft.chronicle.core.util.ReadResolvable;
 import net.openhft.chronicle.hash.serialization.BytesReader;
@@ -24,9 +25,11 @@
         ReadResolvable<KeyWrapperMarshaller<V>> {
 
   private final String name;
+  private final CacheSerializer<V> cacheSerializer;
 
   KeyWrapperMarshaller(String name) {
     this.name = name;
+    this.cacheSerializer = CacheSerializers.getKeySerializer(name);
   }
 
   @Override
@@ -34,13 +37,13 @@
     return new KeyWrapperMarshaller<>(name);
   }
 
-  @SuppressWarnings({"unchecked", "rawtypes"})
+  @SuppressWarnings("rawtypes")
   @Override
   public KeyWrapper<V> read(Bytes in, KeyWrapper<V> using) {
     int serializedLength = (int) in.readUnsignedInt();
     byte[] serialized = new byte[serializedLength];
     in.read(serialized, 0, serializedLength);
-    V v = (V) CacheSerializers.getKeySerializer(name).deserialize(serialized);
+    V v = cacheSerializer.deserialize(serialized);
     using = new KeyWrapper<>(v);
 
     return using;
@@ -49,7 +52,7 @@
   @SuppressWarnings("rawtypes")
   @Override
   public void write(Bytes out, KeyWrapper<V> toWrite) {
-    final byte[] serialized = CacheSerializers.getKeySerializer(name).serialize(toWrite.getValue());
+    final byte[] serialized = cacheSerializer.serialize(toWrite.getValue());
     out.writeUnsignedInt(serialized.length);
     out.write(serialized);
   }
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/SerializationMetricsForCache.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/SerializationMetricsForCache.java
new file mode 100644
index 0000000..098e8c6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/SerializationMetricsForCache.java
@@ -0,0 +1,88 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.modules.cache.chroniclemap;
+
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Description.Units;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.metrics.Timer0;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Metrics contains relations with objects created upon Gerrit start and as such it cannot be
+ * serialized. Upon the situation when store is being restored from a persistent file Marshaller
+ * gets deserialised and its transient metrics value needs to be re-initialised with object that was
+ * created upon the Gerrit start. Note that Marshaller creation is a step that prepends the cache
+ * build therefore it is ensured that metrics object is always available for cache's Marshaller.
+ */
+class SerializationMetricsForCache {
+  protected static class Metrics {
+    final Timer0 deserializeLatency;
+    final Timer0 serializeLatency;
+
+    private Metrics(MetricMaker metricMaker, String cacheName) {
+      String sanitizedName = metricMaker.sanitizeMetricName(cacheName);
+
+      deserializeLatency =
+          metricMaker.newTimer(
+              "cache/chroniclemap/store_deserialize_latency_" + sanitizedName,
+              new Description(
+                      String.format(
+                          "The latency of deserializing entries from chronicle-map store for %s"
+                              + " cache",
+                          cacheName))
+                  .setCumulative()
+                  .setUnit(Units.NANOSECONDS));
+
+      serializeLatency =
+          metricMaker.newTimer(
+              "cache/chroniclemap/store_serialize_latency_" + sanitizedName,
+              new Description(
+                      String.format(
+                          "The latency of serializing entries to chronicle-map store for %s cache",
+                          cacheName))
+                  .setCumulative()
+                  .setUnit(Units.NANOSECONDS));
+    }
+  }
+
+  private static final Map<String, Metrics> metricsCache = new HashMap<>();
+
+  protected final String name;
+  protected final transient Metrics metrics;
+
+  protected SerializationMetricsForCache(MetricMaker metricMaker, String name) {
+    this(createMetrics(metricMaker, name), name);
+  }
+
+  protected SerializationMetricsForCache(Metrics metrics, String name) {
+    this.metrics = Optional.ofNullable(metrics).orElseGet(() -> metricsCache.get(name));
+    this.name = name;
+  }
+
+  private static Metrics createMetrics(MetricMaker metricMaker, String name) {
+    Metrics metrics = new Metrics(metricMaker, name);
+    metricsCache.put(name, metrics);
+    return metrics;
+  }
+
+  void close() {
+    metrics.deserializeLatency.remove();
+    metrics.serializeLatency.remove();
+    metricsCache.remove(name);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValue.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValue.java
index 6bb53bc..e6ccda2 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValue.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValue.java
@@ -16,9 +16,8 @@
 import com.google.common.base.Objects;
 
 public class TimedValue<V> {
-
-  private final V value;
-  private final long created;
+  private V value;
+  private long created;
 
   TimedValue(V value) {
     this.created = System.currentTimeMillis();
@@ -34,10 +33,18 @@
     return created;
   }
 
+  void setCreated(long created) {
+    this.created = created;
+  }
+
   public V getValue() {
     return value;
   }
 
+  void setValue(V value) {
+    this.value = value;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshaller.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshaller.java
index f85f57e..d3dd713 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshaller.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshaller.java
@@ -13,78 +13,85 @@
 // limitations under the License.
 package com.googlesource.gerrit.modules.cache.chroniclemap;
 
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.metrics.Timer0;
+import com.google.gerrit.server.cache.serialize.CacheSerializer;
 import java.nio.ByteBuffer;
 import net.openhft.chronicle.bytes.Bytes;
 import net.openhft.chronicle.core.util.ReadResolvable;
 import net.openhft.chronicle.hash.serialization.BytesReader;
 import net.openhft.chronicle.hash.serialization.BytesWriter;
 
-public class TimedValueMarshaller<V>
+public class TimedValueMarshaller<V> extends SerializationMetricsForCache
     implements BytesWriter<TimedValue<V>>,
         BytesReader<TimedValue<V>>,
         ReadResolvable<TimedValueMarshaller<V>> {
 
-  private final String name;
+  private static final ThreadLocal<byte[]> staticBuffer =
+      new ThreadLocal<byte[]>() {
+        @Override
+        protected byte[] initialValue() {
+          return new byte[Long.BYTES + Integer.BYTES];
+        }
+      };
 
-  TimedValueMarshaller(String name) {
-    this.name = name;
+  private final CacheSerializer<V> cacheSerializer;
+
+  TimedValueMarshaller(MetricMaker metricMaker, String name) {
+    super(metricMaker, name);
+    this.cacheSerializer = CacheSerializers.getValueSerializer(name);
+  }
+
+  private TimedValueMarshaller(Metrics metrics, String name) {
+    super(metrics, name);
+    this.cacheSerializer = CacheSerializers.getValueSerializer(name);
   }
 
   @Override
   public TimedValueMarshaller<V> readResolve() {
-    return new TimedValueMarshaller<>(name);
+    return new TimedValueMarshaller<>(metrics, name);
   }
 
-  @SuppressWarnings({"rawtypes", "unchecked"})
+  @SuppressWarnings("rawtypes")
   @Override
   public TimedValue<V> read(Bytes in, TimedValue<V> using) {
-    long initialPosition = in.readPosition();
+    try (Timer0.Context timer = metrics.deserializeLatency.start()) {
+      byte[] bytesBuffer = staticBuffer.get();
+      in.read(bytesBuffer);
+      ByteBuffer buffer = ByteBuffer.wrap(bytesBuffer);
+      long created = buffer.getLong();
+      int vLength = buffer.getInt();
 
-    // Deserialize the creation timestamp (first 8 bytes)
-    byte[] serializedLong = new byte[Long.BYTES];
-    in.read(serializedLong, 0, Long.BYTES);
-    ByteBuffer buffer = ByteBuffer.wrap(serializedLong);
-    long created = buffer.getLong(0);
-    in.readPosition(initialPosition + Long.BYTES);
+      // Deserialize object V (remaining bytes)
+      byte[] serializedV = new byte[vLength];
+      in.read(serializedV);
+      V v = cacheSerializer.deserialize(serializedV);
 
-    // Deserialize the length of the serialized value (second 8 bytes)
-    byte[] serializedInt = new byte[Integer.BYTES];
-    in.read(serializedInt, 0, Integer.BYTES);
-    ByteBuffer buffer2 = ByteBuffer.wrap(serializedInt);
-    int vLength = buffer2.getInt(0);
-    in.readPosition(initialPosition + Long.BYTES + Integer.BYTES);
-
-    // Deserialize object V (remaining bytes)
-    byte[] serializedV = new byte[vLength];
-    in.read(serializedV, 0, vLength);
-    V v = (V) CacheSerializers.getValueSerializer(name).deserialize(serializedV);
-
-    using = new TimedValue<>(v, created);
-
-    return using;
+      if (using == null) {
+        using = new TimedValue<>(v, created);
+      } else {
+        using.setCreated(created);
+        using.setValue(v);
+      }
+      return using;
+    }
   }
 
   @SuppressWarnings("rawtypes")
   @Override
   public void write(Bytes out, TimedValue<V> toWrite) {
-    byte[] serialized = CacheSerializers.getValueSerializer(name).serialize(toWrite.getValue());
+    try (Timer0.Context timer = metrics.serializeLatency.start()) {
+      byte[] serialized = cacheSerializer.serialize(toWrite.getValue());
 
-    // Serialize as follows:
-    // created | length of serialized V | serialized value V
-    // 8 bytes |       4 bytes          | serialized_length bytes
-
-    int capacity = Long.BYTES + Integer.BYTES + serialized.length;
-    ByteBuffer buffer = ByteBuffer.allocate(capacity);
-
-    long timestamp = toWrite.getCreated();
-    buffer.putLong(0, timestamp);
-
-    buffer.position(Long.BYTES);
-    buffer.putInt(serialized.length);
-
-    buffer.position(Long.BYTES + Integer.BYTES);
-    buffer.put(serialized);
-
-    out.write(buffer.array());
+      // Serialize as follows:
+      // created | length of serialized V | serialized value V
+      // 8 bytes |       4 bytes          | serialized_length bytes
+      byte[] bytesBuffer = staticBuffer.get();
+      ByteBuffer buffer = ByteBuffer.wrap(bytesBuffer);
+      buffer.putLong(toWrite.getCreated());
+      buffer.putInt(serialized.length);
+      out.write(bytesBuffer);
+      out.write(serialized);
+    }
   }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 60d4ae1..f554b6b 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -90,6 +90,26 @@
 
 Default: *90*
 
+* `cache.persistIndexEvery`
+: Duration (in seconds if not specified) between caches keys index persist
+operations.
+
+Note that in order to avoid race condition between evict and persist operations
+the latter is performed after the former is finished. Therefore the lowest
+persist resolution is 30s. Smaller values will be rounded up to 30s whereas
+higher values will be rounded down (if needed) to the closest multiple of 30s.
+From practical point of view it means that persist operation will be performed
+after every n-th evict operation is finished.
+For instance if `cache.persistIndexEvery = 2m` then persist will be called
+after every 4th eviction is finished.
+
+Notes:
+* regardless of `cache.persistIndexEvery` persist will be called on
+  Gerrit termination unless there is one already in progress.
+* persist operation is scheduled in a dedicated thread ('ChronicleMap-Index-0')
+
+Default: *15m*
+
 ### Defaults
 
 Unless overridden by configuration, sensible default values are be provided for
@@ -128,24 +148,6 @@
  If the cache expands you will see an increase in the available free space.
 [official documentation](https://javadoc.io/static/net.openhft/chronicle-map/3.20.83/net/openhft/chronicle/map/ChronicleMap.html#percentageFreeSpace--)
 
-* `percentageHotKeys`
-: The percentage of _hot_ keys that can be kept in-memory.
-When performing evictions, _hot_ keys will be preserved and only _cold_ keys
-will be evicted from chronicle-map, in random order.
-
-This value implies a trade-off between eviction speed and eviction accuracy.
-
-The smaller the number of hotKeys allocated, the quicker the eviction phase
-will be. However, this will increase the chance of evicting entries that were
-recently accessed.
-
-Conversely, the higher the number of hotKeys allocated, the higher will be the
-accuracy in evicting only recently accessed keys, at the price of a longer
-time spent doing evictions.
-
-In order to ensure there is always a cold entry to be evicted, the number of
-`percentageHotKeys` always needs to be less than `maxEntries`.
-
 *Constraints*: [1-99]
 *Default*: 50
 
diff --git a/src/main/resources/Documentation/metrics.md b/src/main/resources/Documentation/metrics.md
index 2928708..0dca368 100644
--- a/src/main/resources/Documentation/metrics.md
+++ b/src/main/resources/Documentation/metrics.md
@@ -20,11 +20,39 @@
 * cache/chroniclemap/max_autoresizes_<cache-name>
   : The maximum number of times the cache can automatically expand its capacity.
 
-* cache/chroniclemap/hot_keys_capacity_<cache-name>
-  : Constant number of hot keys for the cache that can be kept in memory.
+* cache/chroniclemap/keys_index_size_<cache-name>
+  : The number of index keys for the cache that are currently in memory.
 
-* cache/chroniclemap/hot_keys_size_<cache-name>
-  : The number of hot keys for the cache that are currently in memory.
+* cache/chroniclemap/keys_index_add_latency_<cache-name>
+  : The latency of adding cache key to an index.
 
-* "cache/chroniclemap/store_put_failures_<cache-name>
-  : The number of errors caught when inserting entries in chronicle-map store
\ No newline at end of file
+* cache/chroniclemap/keys_index_remove_and_consume_older_than_latency_<cache-name>
+  : The latency of removing and consuming all keys older than expiration time for an index.
+
+* cache/chroniclemap/keys_index_remove_lru_key_latency_<cache-name>
+  : The latency of removing and consuming LRU key from an index.
+
+* cache/chroniclemap/keys_index_restore_latency_<cache-name>
+  : The latency of restoring an index from a file (performed once during the plugin start).
+
+* cache/chroniclemap/keys_index_persist_latency_<cache-name>
+  : The latency of persisting an index to a file.
+
+* cache/chroniclemap/store_serialize_latency_<cache-name>
+  : The latency of serializing entries in chronicle-map store
+
+* cache/chroniclemap/store_deserialize_latency_<cache-name>
+  : The latency of deserializing entries from chronicle-map store
+
+* cache/chroniclemap/store_put_failures_<cache-name>
+  : The number of errors caught when inserting entries in chronicle-map store
+
+* cache/chroniclemap/keys_index_restore_failures_<cache-name>
+  : The number of errors caught when restore cache index from file operation was performed
+
+* cache/chroniclemap/keys_index_persist_failures_<cache-name>
+  : The number of errors caught when persist cache index to file operation was performed
+
+* cache/chroniclemap/caches_without_configuration
+  : The number of caches that have no chronicle map configuration provided in `gerrit.config`
+    and fall back to defaults
diff --git a/src/main/resources/Documentation/migration.md b/src/main/resources/Documentation/migration.md
index 261f08f..f0aed00 100644
--- a/src/main/resources/Documentation/migration.md
+++ b/src/main/resources/Documentation/migration.md
@@ -103,4 +103,45 @@
 
 * size-multiplier=MULTIPLIER
 Multiplicative factor for the number of entries allowed in chronicle-map.
-*default:3*
\ No newline at end of file
+*default:3*
+
+
+### Reloading plugins with persistent caches backed by chroniclemap
+
+When chroniclemap store is initiated for a cache it locks exclusively the
+underlying file and keeps it until store is closed. Store is closed when Gerrit
+is stopped (for core caches) or when plugin is unloaded through either REST or
+SSH command. The later is problematic from the plugin `reload` command
+perspective as by default it unloads old version of plugin once new version is
+successfully loaded. Considering that old version holds the lock until it gets
+unloaded new version load will not succeed. As a result the following (or
+similar) error is visible in the log:
+
+```
+[2022-08-31T17:37:56.481+02:00] [SSH gerrit plugin reload test-cache-plugin (admin)] WARN  com.google.gerrit.server.plugins.PluginLoader : Cannot reload plugin test-cache-plugin
+com.google.inject.CreationException: Unable to create injector, see the following errors:
+
+1) [Guice/ErrorInCustomProvider]: ChronicleHashRecoveryFailedException: ChronicleFileLockException: Unable to acquire an exclusive file lock for gerrit/cache/test-cache-plugin.test_cache_0.dat. Make sure no other process is using the map.
+  at CacheModule.bindCache(CacheModule.java:188)
+      \_ installed by: Module -> TestCache$1
+  while locating Cache<String, String> annotated with @Named(value="test_cache")
+
+Learn more:
+  https://github.com/google/guice/wiki/ERROR_IN_CUSTOM_PROVIDER
+Caused by: ChronicleHashRecoveryFailedException: ChronicleFileLockException: Unable to acquire an exclusive file lock for gerrit/cache/test-cache-plugin.test_cache_0.dat. Make sure no other process is using the map.
+  at ChronicleMapBuilder.openWithExistingFile(ChronicleMapBuilder.java:1937)
+  at ChronicleMapBuilder.createWithFile(ChronicleMapBuilder.java:1706)
+  at ChronicleMapBuilder.recoverPersistedTo(ChronicleMapBuilder.java:1622)
+  ...
+```
+
+The following steps can be used in order to perform reload operation:
+
+1. perform the plugin `reload` in two steps by calling `remove` first and
+   following it with `add` command - the easiest way that doesn't require any
+   code modification
+
+2. add `Gerrit-ReloadMode: restart` to plugin's manifest so the when the plugin
+   `reload` command is called Gerrit unloads the old version prior loading the
+   new one - requires plugin's sources modification and build which might be
+   not an option in certain cases
\ No newline at end of file
diff --git a/src/main/resources/Documentation/tuning.md b/src/main/resources/Documentation/tuning.md
index 77078a4..7abb4d0 100644
--- a/src/main/resources/Documentation/tuning.md
+++ b/src/main/resources/Documentation/tuning.md
@@ -148,7 +148,7 @@
 * You can now run an the tuning command:
 
 ```bash
-ssh -p 29418 admin@<gerrit-server> cache-chroniclemap auto-adjust-caches [--dry-run] [cache-name]
+ssh -p 29418 admin@<gerrit-server> cache-chroniclemap auto-adjust-caches [--dry-run] [--adjust-caches-on-defaults] [cache-name]
 ```
 
 * You can also use the REST-API:
@@ -180,6 +180,18 @@
 to increase the size of a particular cache only you should be using this
 together with the `cache-name` parameter.
 
+* `--adjust-caches-on-defaults` or `-a` (SSH), `?adjust-caches-on-defaults` or `?a` (REST-API)
+  optional parameter
+
+The alternative to `cache-name` parameter that allows to tune all caches that are
+not configured in `gerrit.config` and thus are relying on defaults.
+Gerrit's admin can monitor `cache/chroniclemap/caches_without_configuration` metric
+(as described in
+[Detect caches that rely on defaults](#detect-caches-that-rely-on-defaults)
+section) to detect caches that rely on those defaults.
+Note that running `adjust-caches-on-defaults` in case when all caches are explicitly
+configured doesn't result in any tuning being performed.
+
 * `cache-name` (SSH), `?CACHE_NAME=cache-name` (REST-API) optional restriction of the caches
   to analyze and auto-tune. The parameter can be repeated multiple times for analyzing
   multiple caches. By default, analyze and adjust all persistent caches.
@@ -316,4 +328,19 @@
 4. restart gerrit-2
 
 Once you have tested gerrit-2 and you are happy with the results you can perform
-steps *1.* to *4.* for `gerrit-1`.
\ No newline at end of file
+steps *1.* to *4.* for `gerrit-1`.
+
+## Detect caches that rely on defaults
+
+Gerrit increments `cache/chroniclemap/caches_without_configuration`
+metric when persistent cache that has no configuration in `gerrit.config` IOW
+(relies on fallback defaults) is instantiated.
+
+*Notes:*
+* Additionally a warning is issued to the `error_log` but Gerrit's
+  core caches are initiated earlier than logging subsystem hence seeing them
+  requires starting the server in `run` mode.
+* One can use `auto-adjust-caches` command with `adjust-caches-on-defaults` option
+  (see the [auto-adjust-chronicle-map-caches](#auto-adjust-chronicle-map-caches)
+  section for details) in order to provide explicit defaults for caches in
+  question.
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesIT.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesIT.java
index 776ee45..5e4fe22 100644
--- a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesIT.java
+++ b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesIT.java
@@ -301,6 +301,23 @@
         .hasSize(1);
   }
 
+  @Test
+  public void shouldAdjustCachesOnDefaultsWhenSelected() throws Exception {
+    assertThat(
+            Joiner.on('\n').join(tunedFileNamesSet((n) -> n.contains(TEST_CACHE_FILENAME_TUNED))))
+        .isEmpty();
+
+    testCache.get(TEST_CACHE_KEY_100_CHARS);
+    String tuneResult = adminSshSession.exec(SSH_CMD + " --adjust-caches-on-defaults");
+    adminSshSession.assertSuccess();
+
+    assertThat(configResult(tuneResult, CONFIG_HEADER).getSubsections("cache"))
+        .contains(TEST_CACHE_NAME);
+    assertThat(
+            Joiner.on('\n').join(tunedFileNamesSet((n) -> n.contains(TEST_CACHE_FILENAME_TUNED))))
+        .isNotEmpty();
+  }
+
   private Config configResult(String result, @Nullable String configHeader)
       throws ConfigInvalidException {
     Config configResult = new Config();
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheKeysIndexTest.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheKeysIndexTest.java
new file mode 100644
index 0000000..c8a8b66
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheKeysIndexTest.java
@@ -0,0 +1,150 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.modules.cache.chroniclemap;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.modules.cache.chroniclemap.CacheKeysIndex.tempIndexFile;
+import static java.util.stream.Collectors.toList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+
+import com.google.gerrit.metrics.DisabledMetricMaker;
+import com.google.gerrit.server.cache.serialize.StringCacheSerializer;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class CacheKeysIndexTest {
+  private static final String CACHE_NAME = "test-cache";
+  @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private CacheKeysIndex<String> index;
+  private File indexFile;
+
+  @Before
+  public void setup() throws IOException {
+    CacheSerializers.registerCacheKeySerializer(CACHE_NAME, StringCacheSerializer.INSTANCE);
+    indexFile = temporaryFolder.newFolder().toPath().resolve("cache.index").toFile();
+    index = new CacheKeysIndex<>(new DisabledMetricMaker(), CACHE_NAME, indexFile, false);
+  }
+
+  @Test
+  public void add_shouldUpdateElementPositionWhenAlreadyInSet() {
+    index.add("foo", 2L);
+    index.add("bar", 1L);
+    assertThat(keys(index)).containsExactly("foo", "bar");
+
+    index.add("foo", 1L);
+    assertThat(keys(index)).containsExactly("bar", "foo");
+  }
+
+  @Test
+  public void add_shouldUpdateElementInsertionTimeWhenNewerGetsAdded() {
+    index.add("foo", 1L);
+    index.add("foo", 2L);
+
+    CacheKeysIndex<String>.TimedKey key = index.keys().iterator().next();
+    assertThat(key.getKey()).isEqualTo("foo");
+    assertThat(key.getCreated()).isEqualTo(2L);
+  }
+
+  @Test
+  public void refresh_shouldRefreshInsertionTimeOnRefresh() {
+    index.add("foo", 1L);
+    index.refresh("foo");
+
+    CacheKeysIndex<String>.TimedKey key = index.keys().iterator().next();
+    assertThat(key.getKey()).isEqualTo("foo");
+    assertThat(key.getCreated()).isGreaterThan(1L);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void remove_shouldRemoveAndConsumeKeysOlderThan() {
+    long than = 5l;
+    index.add("newerThan", 10L);
+    index.add("olderThan", 2L);
+    Consumer<String> consumer = mock(Consumer.class);
+
+    index.removeAndConsumeKeysOlderThan(than, consumer);
+
+    verify(consumer).accept("olderThan");
+    assertThat(keys(index)).containsExactly("newerThan");
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void remove_shouldReturnFalseIfThereIsNoLruToRemove() {
+    Consumer<String> consumer = mock(Consumer.class);
+
+    boolean actual = index.removeAndConsumeLruKey(consumer);
+
+    assertThat(actual).isEqualTo(false);
+    verifyNoInteractions(consumer);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void remove_shouldRemoveAndConsumeLruKey() {
+    index.add("older", 1L);
+    index.add("newer", 1L);
+    Consumer<String> consumer = mock(Consumer.class);
+
+    boolean actual = index.removeAndConsumeLruKey(consumer);
+
+    assertThat(actual).isEqualTo(true);
+    verify(consumer).accept("older");
+    assertThat(keys(index)).containsExactly("newer");
+  }
+
+  @Test
+  public void persist_shouldPersistAndRestoreKeys() {
+    index.add("older", 1L);
+    index.add("newer", 1L);
+    assertThat(indexFile.exists()).isFalse();
+
+    index.persist();
+    assertThat(indexFile.isFile()).isTrue();
+    assertThat(indexFile.canRead()).isTrue();
+
+    index.clear();
+    assertThat(keys(index)).isEmpty();
+
+    index.restore(true);
+    assertThat(keys(index)).containsExactly("newer", "older");
+  }
+
+  @Test
+  public void restore_shouldDeleteExistingTemporaryIndexStorageFileDuringRestore()
+      throws IOException {
+    File indexTempFile = tempIndexFile(indexFile);
+    indexTempFile.createNewFile();
+    assertThat(indexTempFile.isFile()).isTrue();
+
+    index.restore(true);
+
+    assertThat(indexTempFile.exists()).isFalse();
+  }
+
+  private static List<String> keys(CacheKeysIndex<String> index) {
+    return index.keys().stream().map(CacheKeysIndex.TimedKey::getKey).collect(toList());
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfigDefaultsIT.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfigDefaultsIT.java
new file mode 100644
index 0000000..f3e909d
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfigDefaultsIT.java
@@ -0,0 +1,47 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.modules.cache.chroniclemap;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.AbstractDaemonTest;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.UseSsh;
+import com.google.gerrit.acceptance.config.GerritConfig;
+import com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults;
+import java.util.Set;
+import org.junit.Test;
+
+@UseLocalDisk
+@UseSsh
+public class ChronicleMapCacheConfigDefaultsIT extends AbstractDaemonTest {
+  @Override
+  public ChronicleMapCacheModule createModule() {
+    // CacheSerializers is accumulating cache names from different test executions in CI therefore
+    // it has to be cleared before this test
+    CacheSerializers.clear();
+    return new ChronicleMapCacheModule();
+  }
+
+  @Test
+  // the following caches are not persisted by default hence `diskLimit` needs to be set so that
+  // Gerrit persists them
+  @GerritConfig(name = "cache.change_notes.diskLimit", value = "1")
+  @GerritConfig(name = "cache.external_ids_map.diskLimit", value = "1")
+  public void shouldAllPersistentCachesHaveDefaultConfiguration() throws Exception {
+    Set<String> allCaches = CacheSerializers.getSerializersNames();
+    assertThat(Defaults.defaultMap.keySet()).containsExactlyElementsIn(allCaches);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfigTest.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfigTest.java
index 8cb28c1..d030732 100644
--- a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfigTest.java
@@ -14,13 +14,16 @@
 package com.googlesource.gerrit.modules.cache.chroniclemap;
 
 import static com.google.common.truth.Truth.assertThat;
-import static com.google.gerrit.testing.GerritJUnit.assertThrows;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.DEFAULT_AVG_KEY_SIZE;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.DEFAULT_AVG_VALUE_SIZE;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.DEFAULT_MAX_BLOAT_FACTOR;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.DEFAULT_MAX_ENTRIES;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.DEFAULT_PERCENTAGE_FREE_SPACE_EVICTION_THRESHOLD;
-import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.DEFAULT_PERCENTAGE_HOT_KEYS;
+import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.DEFAULT_PERSIST_INDEX_EVERY;
+import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheFactory.PRUNE_DELAY;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 
 import com.google.gerrit.server.config.SitePaths;
 import java.io.File;
@@ -33,7 +36,11 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class ChronicleMapCacheConfigTest {
 
   private final String cacheDirectory = ".";
@@ -47,6 +54,8 @@
   private SitePaths sitePaths;
   private StoredConfig gerritConfig;
 
+  @Mock CachesWithoutChronicleMapConfigMetric cachesWithoutConfigMetricMock;
+
   @Before
   public void setUp() throws Exception {
     sitePaths = new SitePaths(temporaryFolder.newFolder().toPath());
@@ -61,10 +70,10 @@
   }
 
   @Test
-  public void shouldProvidePersistedFile() throws Exception {
+  public void shouldProvideCacheFile() throws Exception {
     assertThat(
             configUnderTest(gerritConfig)
-                .getPersistedFile()
+                .getCacheFile()
                 .toPath()
                 .getParent()
                 .toRealPath()
@@ -73,6 +82,19 @@
   }
 
   @Test
+  public void shouldProvideIndexFileThatIsRelatedToCacheFile() {
+    ChronicleMapCacheConfig config = configUnderTest(gerritConfig);
+    File cacheFile = config.getCacheFile();
+    File indexFile = config.getIndexFile();
+
+    assertThat(indexFile.getParentFile()).isEqualTo(cacheFile.getParentFile());
+    String cacheFileName = cacheFile.getName();
+    assertThat(indexFile.getName())
+        .isEqualTo(
+            String.format("%s.index", cacheFileName.substring(0, cacheFileName.indexOf(".dat"))));
+  }
+
+  @Test
   public void shouldProvideConfiguredMaxEntriesWhenDefined() throws Exception {
     long maxEntries = 10;
     gerritConfig.setLong("cache", cacheKey, "maxEntries", maxEntries);
@@ -178,38 +200,48 @@
   }
 
   @Test
-  public void shouldProvidePercentageHotKeysDefault() throws Exception {
-    assertThat(configUnderTest(gerritConfig).getpercentageHotKeys())
-        .isEqualTo(DEFAULT_PERCENTAGE_HOT_KEYS);
+  public void shouldProvideDefaultIndexPersistEveryValuesWhenNotConfigured() {
+    ChronicleMapCacheConfig configUnderTest = configUnderTest(gerritConfig);
+    assertThat(configUnderTest.getPersistIndexEvery()).isEqualTo(DEFAULT_PERSIST_INDEX_EVERY);
+    assertThat(configUnderTest.getPersistIndexEveryNthPrune()).isEqualTo(30L);
   }
 
   @Test
-  public void shouldProvidePercentageHotKeysWhenConfigured() throws Exception {
-    int percentageHotKeys = 20;
-    gerritConfig.setInt("cache", cacheKey, "percentageHotKeys", percentageHotKeys);
-    gerritConfig.save();
-
-    assertThat(configUnderTest(gerritConfig).getpercentageHotKeys()).isEqualTo(percentageHotKeys);
+  public void shouldPersistIndexEveryBePruneDelayWhenPersistIndexEveryIsLowerThanPruneDelay() {
+    gerritConfig.setString(
+        "cache", null, "persistIndexEvery", String.format("%ds", PRUNE_DELAY - 1L));
+    ChronicleMapCacheConfig configUnderTest = configUnderTest(gerritConfig);
+    assertThat(configUnderTest.getPersistIndexEvery()).isEqualTo(Duration.ofSeconds(PRUNE_DELAY));
+    assertThat(configUnderTest.getPersistIndexEveryNthPrune()).isEqualTo(1L);
   }
 
   @Test
-  public void shouldThrowWhenPercentageHotKeysIs100() throws Exception {
-    gerritConfig.setInt("cache", cacheKey, "percentageHotKeys", 100);
-    gerritConfig.save();
-
-    assertThrows(IllegalArgumentException.class, () -> configUnderTest(gerritConfig));
+  public void shouldPersistIndexEveryBeRoundedDownToAMultiplyOfPruneDelay() {
+    gerritConfig.setString(
+        "cache", null, "persistIndexEvery", String.format("%ds", 2L * PRUNE_DELAY + 1L));
+    ChronicleMapCacheConfig configUnderTest = configUnderTest(gerritConfig);
+    assertThat(configUnderTest.getPersistIndexEvery())
+        .isEqualTo(Duration.ofSeconds(2L * PRUNE_DELAY));
+    assertThat(configUnderTest.getPersistIndexEveryNthPrune()).isEqualTo(2L);
   }
 
   @Test
-  public void shouldThrowWhenPercentageHotKeysIs0() throws Exception {
-    gerritConfig.setInt("cache", cacheKey, "percentageHotKeys", 0);
-    gerritConfig.save();
+  public void shouldIncrementCacheMetricWhenCacheHasNoDedicatedConfiguration() {
+    configUnderTest(gerritConfig);
 
-    assertThrows(IllegalArgumentException.class, () -> configUnderTest(gerritConfig));
+    verify(cachesWithoutConfigMetricMock, atLeastOnce()).incrementForCache(cacheKey);
+  }
+
+  @Test
+  public void shouldNotIncrementCacheMetricWhenCacheHasAtLeastSingleParameterConfigured() {
+    gerritConfig.setLong("cache", cacheKey, "maxEntries", 1L);
+    configUnderTest(gerritConfig);
+
+    verify(cachesWithoutConfigMetricMock, never()).incrementForCache(cacheName);
   }
 
   private ChronicleMapCacheConfig configUnderTest(StoredConfig gerritConfig) {
-    File persistentFile =
+    File cacheFile =
         ChronicleMapCacheFactory.fileName(
             sitePaths.site_path.resolve(cacheDirectory), cacheName, version);
     sitePaths
@@ -218,6 +250,11 @@
         .toFile();
 
     return new ChronicleMapCacheConfig(
-        gerritConfig, cacheKey, persistentFile, expireAfterWrite, refreshAfterWrite);
+        gerritConfig,
+        cachesWithoutConfigMetricMock,
+        cacheKey,
+        cacheFile,
+        expireAfterWrite,
+        refreshAfterWrite);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheIT.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheIT.java
index 444762b..389fbb1 100644
--- a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheIT.java
+++ b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheIT.java
@@ -17,7 +17,6 @@
 import static com.google.common.truth.Truth8.assertThat;
 
 import com.google.common.cache.Cache;
-import com.google.common.truth.Truth8;
 import com.google.gerrit.acceptance.AbstractDaemonTest;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.entities.Project;
@@ -68,7 +67,7 @@
     String newProjectName = name("newProject");
     adminRestSession.put("/projects/" + newProjectName).assertCreated();
 
-    Truth8.assertThat(projectCache.get(Project.nameKey(newProjectName))).isPresent();
+    assertThat(projectCache.get(Project.nameKey(newProjectName))).isPresent();
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheTest.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheTest.java
index 0041c53..b943c07 100644
--- a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheTest.java
+++ b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheTest.java
@@ -16,10 +16,12 @@
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.common.truth.Truth.assertWithMessage;
 import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static org.mockito.Mockito.mock;
 
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 import com.google.gerrit.acceptance.AbstractDaemonTest;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.WaitUtil;
@@ -306,7 +308,6 @@
     final String fooValue = "foo";
 
     gerritConfig.setInt("cache", testCacheName, "maxEntries", 2);
-    gerritConfig.setInt("cache", testCacheName, "percentageHotKeys", 10);
     gerritConfig.setInt("cache", testCacheName, "avgKeySize", "foo1".getBytes().length);
     gerritConfig.setInt("cache", testCacheName, "avgValueSize", valueSize(fooValue));
     gerritConfig.save();
@@ -326,7 +327,6 @@
       throws Exception {
     final String fooValue = "foo";
     gerritConfig.setInt("cache", testCacheName, "maxEntries", 2);
-    gerritConfig.setInt("cache", testCacheName, "percentageHotKeys", 10);
     gerritConfig.setInt("cache", testCacheName, "avgKeySize", "foo1".getBytes().length);
     gerritConfig.setInt("cache", testCacheName, "avgValueSize", valueSize(fooValue));
     gerritConfig.save();
@@ -347,7 +347,6 @@
   public void shouldEvictEntriesUntilFreeSpaceIsRecovered() throws Exception {
     final int uuidSize = valueSize(UUID.randomUUID().toString());
     gerritConfig.setInt("cache", "foo", "maxEntries", 50);
-    gerritConfig.setInt("cache", "foo", "percentageHotKeys", 10);
     gerritConfig.setInt("cache", "foo", "avgKeySize", uuidSize);
     gerritConfig.setInt("cache", "foo", "avgValueSize", uuidSize);
     gerritConfig.save();
@@ -458,104 +457,109 @@
   }
 
   @Test
-  public void shouldTriggerHotKeysCapacityCacheMetric() throws Exception {
+  public void shouldTriggerKeysIndexSizeCacheMetric() throws Exception {
     String cachedValue = UUID.randomUUID().toString();
-    int percentageHotKeys = 60;
     int maxEntries = 10;
-    int expectedCapacity = 6;
-    String hotKeysCapacityMetricName = "cache/chroniclemap/hot_keys_capacity_" + testCacheName;
-    gerritConfig.setInt("cache", testCacheName, "maxEntries", maxEntries);
-    gerritConfig.setInt("cache", testCacheName, "percentageHotKeys", percentageHotKeys);
-    gerritConfig.save();
-
-    newCacheWithMetrics(testCacheName, cachedValue);
-
-    assertThat(getMetric(hotKeysCapacityMetricName).getValue()).isEqualTo(expectedCapacity);
-  }
-
-  @Test
-  public void shouldTriggerHotKeysSizeCacheMetric() throws Exception {
-    String cachedValue = UUID.randomUUID().toString();
-    int percentageHotKeys = 30;
-    int maxEntries = 10;
-    int maxHotKeyCapacity = 3;
+    int expectedKeysSize = 3;
     final Duration METRIC_TRIGGER_TIMEOUT = Duration.ofSeconds(2);
-    String hotKeysSizeMetricName = "cache/chroniclemap/hot_keys_size_" + testCacheName;
+    String keysIndexSizeMetricName = "cache/chroniclemap/keys_index_size_" + testCacheName;
     gerritConfig.setInt("cache", testCacheName, "maxEntries", maxEntries);
-    gerritConfig.setInt("cache", testCacheName, "percentageHotKeys", percentageHotKeys);
     gerritConfig.save();
 
     ChronicleMapCacheImpl<String, String> cache = newCacheWithMetrics(testCacheName, cachedValue);
 
-    assertThat(getMetric(hotKeysSizeMetricName).getValue()).isEqualTo(0);
+    assertThat(getMetric(keysIndexSizeMetricName).getValue()).isEqualTo(0);
 
-    for (int i = 0; i < maxHotKeyCapacity; i++) {
+    for (int i = 0; i < expectedKeysSize; i++) {
       cache.put(cachedValue + i, cachedValue);
     }
 
     WaitUtil.waitUntil(
-        () -> (int) getMetric(hotKeysSizeMetricName).getValue() == maxHotKeyCapacity,
+        () -> (int) getMetric(keysIndexSizeMetricName).getValue() == expectedKeysSize,
         METRIC_TRIGGER_TIMEOUT);
-
-    cache.put(cachedValue + maxHotKeyCapacity + 1, cachedValue);
-
-    assertThrows(
-        InterruptedException.class,
-        () ->
-            WaitUtil.waitUntil(
-                () -> (int) getMetric(hotKeysSizeMetricName).getValue() > maxHotKeyCapacity,
-                METRIC_TRIGGER_TIMEOUT));
   }
 
   @Test
-  public void shouldResetHotKeysWhenInvalidateAll() throws Exception {
-    String cachedValue = UUID.randomUUID().toString();
-    int percentageHotKeys = 30;
+  public void shouldTriggerKeysIndexAddLatencyCacheMetric() throws Exception {
     int maxEntries = 10;
-    int maxHotKeyCapacity = 3;
     final Duration METRIC_TRIGGER_TIMEOUT = Duration.ofSeconds(2);
-    String hotKeysSizeMetricName = "cache/chroniclemap/hot_keys_size_" + testCacheName;
+    String keysIndexAddLatencyMetricName =
+        "cache/chroniclemap/keys_index_add_latency_" + testCacheName;
     gerritConfig.setInt("cache", testCacheName, "maxEntries", maxEntries);
-    gerritConfig.setInt("cache", testCacheName, "percentageHotKeys", percentageHotKeys);
+    gerritConfig.save();
+
+    ChronicleMapCacheImpl<String, String> cache = newCacheWithMetrics(testCacheName, null);
+    assertThat(getTimer(keysIndexAddLatencyMetricName).getCount()).isEqualTo(0L);
+
+    String cachedValue = UUID.randomUUID().toString();
+    cache.put(cachedValue, cachedValue);
+
+    WaitUtil.waitUntil(
+        () -> getTimer(keysIndexAddLatencyMetricName).getCount() == 1L, METRIC_TRIGGER_TIMEOUT);
+  }
+
+  @Test
+  public void shouldResetKeysIndexWhenInvalidateAll() throws Exception {
+    String cachedValue = UUID.randomUUID().toString();
+    int maxEntries = 10;
+    int expectedKeysSize = 3;
+    final Duration METRIC_TRIGGER_TIMEOUT = Duration.ofSeconds(2);
+    String keysIndexSizeMetricName = "cache/chroniclemap/keys_index_size_" + testCacheName;
+    gerritConfig.setInt("cache", testCacheName, "maxEntries", maxEntries);
     gerritConfig.save();
 
     ChronicleMapCacheImpl<String, String> cache = newCacheWithMetrics(testCacheName, cachedValue);
 
-    for (int i = 0; i < maxHotKeyCapacity; i++) {
+    for (int i = 0; i < expectedKeysSize; i++) {
       cache.put(cachedValue + i, cachedValue);
     }
 
     WaitUtil.waitUntil(
-        () -> (int) getMetric(hotKeysSizeMetricName).getValue() == maxHotKeyCapacity,
+        () -> (int) getMetric(keysIndexSizeMetricName).getValue() == expectedKeysSize,
         METRIC_TRIGGER_TIMEOUT);
-
     cache.invalidateAll();
-
     WaitUtil.waitUntil(
-        () -> (int) getMetric(hotKeysSizeMetricName).getValue() == 0, METRIC_TRIGGER_TIMEOUT);
+        () -> (int) getMetric(keysIndexSizeMetricName).getValue() == 0, METRIC_TRIGGER_TIMEOUT);
   }
 
   @Test
   public void shouldSanitizeUnwantedCharsInMetricNames() throws Exception {
     String cacheName = "very+confusing.cache#name";
     String sanitized = "very_confusing_cache_name";
-    String hotKeySizeMetricName = "cache/chroniclemap/hot_keys_size_" + sanitized;
     String percentageFreeMetricName = "cache/chroniclemap/percentage_free_space_" + sanitized;
     String autoResizeMetricName = "cache/chroniclemap/remaining_autoresizes_" + sanitized;
     String maxAutoResizeMetricName = "cache/chroniclemap/max_autoresizes_" + sanitized;
-    String hotKeyCapacityMetricName = "cache/chroniclemap/hot_keys_capacity_" + sanitized;
+    String keysIndexSizeMetricName = "cache/chroniclemap/keys_index_size_" + sanitized;
+    String keysIndexAddLatencyMetricName = "cache/chroniclemap/keys_index_add_latency_" + sanitized;
+    String keysIndexRemoveOlderThanLatencyMetricName =
+        "cache/chroniclemap/keys_index_remove_and_consume_older_than_latency_" + sanitized;
+    String keysIndexRemoveLruLatencyMetricName =
+        "cache/chroniclemap/keys_index_remove_lru_key_latency_" + sanitized;
+    String keysIndexRestoreName = "cache/chroniclemap/keys_index_restore_latency_" + sanitized;
+    String keysIndexPersistName = "cache/chroniclemap/keys_index_persist_latency_" + sanitized;
+    String keysIndexRestoreFailuresName =
+        "cache/chroniclemap/keys_index_restore_failures_" + sanitized;
+    String keysIndexPersistFailuresName =
+        "cache/chroniclemap/keys_index_persist_failures_" + sanitized;
 
     newCacheWithMetrics(cacheName, null);
 
-    getMetric(hotKeySizeMetricName);
     getMetric(percentageFreeMetricName);
     getMetric(autoResizeMetricName);
     getMetric(maxAutoResizeMetricName);
-    getMetric(hotKeyCapacityMetricName);
+    getMetric(keysIndexSizeMetricName);
+    getTimer(keysIndexAddLatencyMetricName);
+    getTimer(keysIndexRemoveOlderThanLatencyMetricName);
+    getTimer(keysIndexRemoveLruLatencyMetricName);
+    getTimer(keysIndexRestoreName);
+    getTimer(keysIndexPersistName);
+    getCounter(keysIndexRestoreFailuresName);
+    getCounter(keysIndexPersistFailuresName);
   }
 
   private int valueSize(String value) {
-    final TimedValueMarshaller<String> marshaller = new TimedValueMarshaller<>(testCacheName);
+    final TimedValueMarshaller<String> marshaller =
+        new TimedValueMarshaller<>(metricMaker, testCacheName);
 
     Bytes<ByteBuffer> out = Bytes.elasticByteBuffer();
     marshaller.write(out, new TimedValue<>(value));
@@ -608,6 +612,7 @@
     ChronicleMapCacheConfig config =
         new ChronicleMapCacheConfig(
             gerritConfig,
+            mock(CachesWithoutChronicleMapConfigMetric.class),
             cacheDef.configKey(),
             persistentFile,
             expireAfterWrite != null ? expireAfterWrite : Duration.ZERO,
@@ -654,4 +659,10 @@
     assertWithMessage(name).that(counter).isNotNull();
     return counter;
   }
+
+  private Timer getTimer(String name) {
+    Timer timer = (Timer) metricRegistry.getMetrics().get(name);
+    assertWithMessage(name).that(timer).isNotNull();
+    return timer;
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/InMemoryLRUTest.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/InMemoryLRUTest.java
deleted file mode 100644
index 3d435c9..0000000
--- a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/InMemoryLRUTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-// Copyright (C) 2020 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.googlesource.gerrit.modules.cache.chroniclemap;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import org.junit.Test;
-
-public class InMemoryLRUTest {
-
-  @Test
-  public void add_shouldUpdateElementPositionWhenAlreadyInSet() {
-    final InMemoryLRU<Object> map = new InMemoryLRU<>(2);
-
-    map.add("A");
-    map.add("B");
-
-    assertThat(map.toArray()).asList().containsExactly("A", "B");
-
-    map.add("A");
-    assertThat(map.toArray()).asList().containsExactly("B", "A");
-  }
-
-  @Test
-  public void add_shouldEvictLRUElement() {
-    final InMemoryLRU<Object> map = new InMemoryLRU<>(2);
-
-    map.add("A");
-    map.add("B");
-    map.add("C");
-
-    assertThat(map.toArray()).asList().containsExactly("B", "C");
-  }
-
-  @Test
-  public void remove_unexistingEntryShouldReturnNull() {
-    InMemoryLRU<Object> map = new InMemoryLRU<>(1);
-
-    assertThat(map.remove("foo")).isNull();
-  }
-
-  @Test
-  public void remove_unexistingEntryShouldReturnTrue() {
-    InMemoryLRU<Object> map = new InMemoryLRU<>(1);
-
-    map.add("foo");
-
-    assertThat(map.remove("foo")).isTrue();
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshallerTest.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshallerTest.java
index b630e36..b4eb4ca 100644
--- a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshallerTest.java
+++ b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshallerTest.java
@@ -15,6 +15,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 
+import com.google.gerrit.acceptance.TestMetricMaker;
 import com.google.gerrit.server.cache.serialize.ObjectIdCacheSerializer;
 import java.nio.ByteBuffer;
 import net.openhft.chronicle.bytes.Bytes;
@@ -35,7 +36,8 @@
   public void shouldSerializeAndDeserializeBack() {
     ObjectId id = ObjectId.fromString("1234567890123456789012345678901234567890");
     long timestamp = 1600329018L;
-    TimedValueMarshaller<ObjectId> marshaller = new TimedValueMarshaller<>(TEST_CACHE_NAME);
+    TimedValueMarshaller<ObjectId> marshaller =
+        new TimedValueMarshaller<>(new TestMetricMaker(), TEST_CACHE_NAME);
 
     final TimedValue<ObjectId> wrapped = new TimedValue<>(id, timestamp);