diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCaches.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCaches.java
index 948aaee..a07d8c4 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCaches.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCaches.java
@@ -27,6 +27,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -52,8 +53,10 @@
   private final ChronicleMapCacheConfig.Factory configFactory;
   private final Path cacheDir;
   private final AdministerCachePermission adminCachePermission;
+  private final CachesWithoutChronicleMapConfigMetric metric;
 
   private boolean dryRun;
+  private boolean adjustCachesOnDefaults;
   private Optional<Long> optionalMaxEntries = Optional.empty();
   private Set<String> cacheNames = new HashSet<>();
 
@@ -63,11 +66,13 @@
       SitePaths site,
       DynamicMap<Cache<?, ?>> cacheMap,
       ChronicleMapCacheConfig.Factory configFactory,
-      AdministerCachePermission adminCachePermission) {
+      AdministerCachePermission adminCachePermission,
+      CachesWithoutChronicleMapConfigMetric metric) {
     this.cacheMap = cacheMap;
     this.configFactory = configFactory;
     this.cacheDir = getCacheDir(site, cfg.getString("cache", null, "directory"));
     this.adminCachePermission = adminCachePermission;
+    this.metric = metric;
   }
 
   public boolean isDryRun() {
@@ -78,6 +83,14 @@
     this.dryRun = dryRun;
   }
 
+  public boolean isAdjustCachesOnDefaults() {
+    return adjustCachesOnDefaults;
+  }
+
+  public void setAdjustCachesOnDefaults(boolean adjustCachesOnDefaults) {
+    this.adjustCachesOnDefaults = adjustCachesOnDefaults;
+  }
+
   public Optional<Long> getOptionalMaxEntries() {
     return optionalMaxEntries;
   }
@@ -206,13 +219,14 @@
 
     return configFactory.createWithValues(
         currentChronicleMapConfig.getConfigKey(),
-        resolveNewFile(currentChronicleMapConfig.getPersistedFile().getName()),
+        resolveNewFile(currentChronicleMapConfig.getCacheFile().getName()),
         currentChronicleMapConfig.getExpireAfterWrite(),
         currentChronicleMapConfig.getRefreshAfterWrite(),
         newMaxEntries,
         averageKeySize,
         averageValueSize,
-        currentChronicleMapConfig.getMaxBloatFactor());
+        currentChronicleMapConfig.getMaxBloatFactor(),
+        currentChronicleMapConfig.getPersistIndexEvery());
   }
 
   private long newMaxEntries(ChronicleMapCacheImpl<Object, Object> currentCache) {
@@ -261,6 +275,13 @@
 
   @SuppressWarnings({"unchecked", "rawtypes"})
   private Map<String, ChronicleMapCacheImpl<Object, Object>> getChronicleMapCaches() {
+    if (isAdjustCachesOnDefaults()) {
+      if (metric.cachesOnDefaults().isEmpty()) {
+        return Collections.emptyMap();
+      }
+      cacheNames.addAll(metric.cachesOnDefaults());
+    }
+
     return cacheMap.plugins().stream()
         .map(cacheMap::byPlugin)
         .flatMap(
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesCommand.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesCommand.java
index 580bd98..9e7efd3 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesCommand.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesCommand.java
@@ -49,6 +49,14 @@
     autoAdjustCachesEngine.setOptionalMaxEntries(Optional.of(maxEntries));
   }
 
+  @Option(
+      name = "--adjust-caches-on-defaults",
+      aliases = {"-a"},
+      usage = "Adjust caches that fall back to default configuration.")
+  public void setAdjustCachesOnDefaults(boolean adjustCachesOnDefaults) {
+    autoAdjustCachesEngine.setAdjustCachesOnDefaults(adjustCachesOnDefaults);
+  }
+
   @Argument(
       index = 0,
       required = false,
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesServlet.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesServlet.java
index 49f7a97..91950ec 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesServlet.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesServlet.java
@@ -57,6 +57,11 @@
             .or(() -> Optional.ofNullable(req.getParameter("d")))
             .isPresent());
 
+    autoAdjustCachesEngine.setAdjustCachesOnDefaults(
+        Optional.ofNullable(req.getParameter("adjust-caches-on-defaults"))
+            .or(() -> Optional.ofNullable(req.getParameter("a")))
+            .isPresent());
+
     autoAdjustCachesEngine.setOptionalMaxEntries(
         Optional.ofNullable(req.getParameter("max-entries"))
             .or(() -> Optional.ofNullable(req.getParameter("m")))
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheKeysIndex.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheKeysIndex.java
new file mode 100644
index 0000000..0730380
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheKeysIndex.java
@@ -0,0 +1,379 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.modules.cache.chroniclemap;
+
+import static java.util.stream.Collectors.toUnmodifiableSet;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
+import com.google.errorprone.annotations.CompatibleWith;
+import com.google.gerrit.extensions.registration.RegistrationHandle;
+import com.google.gerrit.metrics.Counter0;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Description.Units;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.metrics.Timer0;
+import com.google.gerrit.server.cache.serialize.CacheSerializer;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+class CacheKeysIndex<T> {
+  /** As opposed to TimedValue keys are equal when their key equals. */
+  class TimedKey {
+    private final T key;
+    private final long created;
+
+    private TimedKey(T key, long created) {
+      this.key = key;
+      this.created = created;
+    }
+
+    T getKey() {
+      return key;
+    }
+
+    long getCreated() {
+      return created;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o instanceof CacheKeysIndex.TimedKey) {
+        @SuppressWarnings("unchecked")
+        TimedKey other = (TimedKey) o;
+        return Objects.equals(key, other.key);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(key);
+    }
+  }
+
+  private class Metrics {
+    private final RegistrationHandle indexSize;
+    private final Timer0 addLatency;
+    private final Timer0 removeAndConsumeOlderThanLatency;
+    private final Timer0 removeAndConsumeLruKeyLatency;
+    private final Timer0 restoreLatency;
+    private final Timer0 persistLatency;
+    private final Counter0 restoreFailures;
+    private final Counter0 persistFailures;
+
+    private Metrics(MetricMaker metricMaker, String name) {
+      String sanitizedName = metricMaker.sanitizeMetricName(name);
+
+      indexSize =
+          metricMaker.newCallbackMetric(
+              "cache/chroniclemap/keys_index_size_" + sanitizedName,
+              Integer.class,
+              new Description(
+                  String.format(
+                      "The number of cache index keys for %s cache that are currently in memory",
+                      name)),
+              keys::size);
+
+      addLatency =
+          metricMaker.newTimer(
+              "cache/chroniclemap/keys_index_add_latency_" + sanitizedName,
+              new Description(
+                      String.format("The latency of adding key to the index for %s cache", name))
+                  .setCumulative()
+                  .setUnit(Units.NANOSECONDS));
+
+      removeAndConsumeOlderThanLatency =
+          metricMaker.newTimer(
+              "cache/chroniclemap/keys_index_remove_and_consume_older_than_latency_"
+                  + sanitizedName,
+              new Description(
+                      String.format(
+                          "The latency of removing and consuming all keys older than expiration"
+                              + " time for the index for %s cache",
+                          name))
+                  .setCumulative()
+                  .setUnit(Units.NANOSECONDS));
+
+      removeAndConsumeLruKeyLatency =
+          metricMaker.newTimer(
+              "cache/chroniclemap/keys_index_remove_lru_key_latency_" + sanitizedName,
+              new Description(
+                      String.format(
+                          "The latency of removing and consuming LRU key from the index for %s"
+                              + " cache",
+                          name))
+                  .setCumulative()
+                  .setUnit(Units.NANOSECONDS));
+
+      restoreLatency =
+          metricMaker.newTimer(
+              "cache/chroniclemap/keys_index_restore_latency_" + sanitizedName,
+              new Description(
+                      String.format("The latency of restoring %s cache's index from file", name))
+                  .setCumulative()
+                  .setUnit(Units.MICROSECONDS));
+
+      persistLatency =
+          metricMaker.newTimer(
+              "cache/chroniclemap/keys_index_persist_latency_" + sanitizedName,
+              new Description(
+                      String.format("The latency of perststing %s cache's index to file", name))
+                  .setCumulative()
+                  .setUnit(Units.MICROSECONDS));
+
+      restoreFailures =
+          metricMaker.newCounter(
+              "cache/chroniclemap/keys_index_restore_failures_" + sanitizedName,
+              new Description(
+                      String.format(
+                          "The number of errors caught when restore %s cache index from file operation was performed: ",
+                          name))
+                  .setCumulative()
+                  .setUnit("errors"));
+
+      persistFailures =
+          metricMaker.newCounter(
+              "cache/chroniclemap/keys_index_persist_failures_" + sanitizedName,
+              new Description(
+                      String.format(
+                          "The number of errors caught when persist %s cache index to file operation was performed: ",
+                          name))
+                  .setCumulative()
+                  .setUnit("errors"));
+    }
+
+    private void close() {
+      indexSize.remove();
+      addLatency.remove();
+      removeAndConsumeOlderThanLatency.remove();
+      removeAndConsumeLruKeyLatency.remove();
+      restoreLatency.remove();
+      persistLatency.remove();
+      restoreFailures.remove();
+      persistFailures.remove();
+    }
+  }
+
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final Set<TimedKey> keys;
+  private final Metrics metrics;
+  private final String name;
+  private final File indexFile;
+  private final File tempIndexFile;
+  private final AtomicBoolean persistInProgress;
+
+  CacheKeysIndex(MetricMaker metricMaker, String name, File indexFile, boolean cacheFileExists) {
+    this.keys = Collections.synchronizedSet(new LinkedHashSet<>());
+    this.metrics = new Metrics(metricMaker, name);
+    this.name = name;
+    this.indexFile = indexFile;
+    this.tempIndexFile = tempIndexFile(indexFile);
+    this.persistInProgress = new AtomicBoolean(false);
+    restore(cacheFileExists);
+  }
+
+  @SuppressWarnings("unchecked")
+  void add(@CompatibleWith("T") Object key, long created) {
+    Objects.requireNonNull(key, "Key value cannot be [null].");
+    TimedKey timedKey = new TimedKey((T) key, created);
+
+    // bubble up MRU key by re-adding it to a set
+    try (Timer0.Context timer = metrics.addLatency.start()) {
+      keys.remove(timedKey);
+      keys.add(timedKey);
+    }
+  }
+
+  void refresh(T key) {
+    add(key, System.currentTimeMillis());
+  }
+
+  void removeAndConsumeKeysOlderThan(long time, Consumer<T> consumer) {
+    try (Timer0.Context timer = metrics.removeAndConsumeOlderThanLatency.start()) {
+      Set<TimedKey> toRemoveAndConsume;
+      synchronized (keys) {
+        toRemoveAndConsume =
+            keys.stream().filter(key -> key.created < time).collect(toUnmodifiableSet());
+      }
+      toRemoveAndConsume.forEach(
+          key -> {
+            keys.remove(key);
+            consumer.accept(key.getKey());
+          });
+    }
+  }
+
+  boolean removeAndConsumeLruKey(Consumer<T> consumer) {
+    try (Timer0.Context timer = metrics.removeAndConsumeLruKeyLatency.start()) {
+      Optional<TimedKey> lruKey;
+      synchronized (keys) {
+        lruKey = keys.stream().findFirst();
+      }
+
+      return lruKey
+          .map(
+              key -> {
+                keys.remove(key);
+                consumer.accept(key.getKey());
+                return true;
+              })
+          .orElse(false);
+    }
+  }
+
+  void invalidate(@CompatibleWith("T") Object key) {
+    keys.remove(key);
+  }
+
+  void clear() {
+    keys.clear();
+  }
+
+  @VisibleForTesting
+  Set<TimedKey> keys() {
+    return keys;
+  }
+
+  void persist() {
+    if (!persistInProgress.compareAndSet(false, true)) {
+      logger.atWarning().log(
+          "Persist cache keys index %s to %s file is already in progress. This persist request was"
+              + " skipped.",
+          name, indexFile);
+      return;
+    }
+
+    logger.atInfo().log("Persisting cache keys index %s to %s file", name, indexFile);
+    try (Timer0.Context timer = metrics.persistLatency.start()) {
+      ArrayList<TimedKey> toPersist;
+      synchronized (keys) {
+        toPersist = new ArrayList<>(keys);
+      }
+      CacheSerializer<T> serializer = CacheSerializers.getKeySerializer(name);
+      try (DataOutputStream dos =
+          new DataOutputStream(new BufferedOutputStream(new FileOutputStream(tempIndexFile)))) {
+        for (TimedKey key : toPersist) {
+          writeKey(serializer, dos, key);
+        }
+        dos.flush();
+        indexFile.delete();
+        if (!tempIndexFile.renameTo(indexFile)) {
+          logger.atWarning().log(
+              "Renaming temporary index file %s to %s was not successful",
+              tempIndexFile, indexFile);
+          metrics.persistFailures.increment();
+          return;
+        }
+        logger.atInfo().log("Cache keys index %s was persisted to %s file", name, indexFile);
+      } catch (Exception e) {
+        logger.atSevere().withCause(e).log("Persisting cache keys index %s failed", name);
+        metrics.persistFailures.increment();
+      }
+    } finally {
+      persistInProgress.set(false);
+    }
+  }
+
+  void restore(boolean cacheFileExists) {
+    try {
+      if (tempIndexFile.isFile()) {
+        logger.atWarning().log(
+            "Gerrit was not closed properly as index persist operation was not finished: temporary"
+                + " index storage file %s exists for %s cache.",
+            tempIndexFile, name);
+        metrics.restoreFailures.increment();
+        if (!tempIndexFile.delete()) {
+          logger.atSevere().log(
+              "Cannot delete the temporary index storage file %s.", tempIndexFile);
+          return;
+        }
+      }
+
+      if (!indexFile.isFile() || !indexFile.canRead()) {
+        if (cacheFileExists) {
+          logger.atWarning().log(
+              "Restoring cache keys index %s not possible. File %s doesn't exist or cannot be read."
+                  + " Existing persisted entries will be pruned only when they are accessed after"
+                  + " Gerrit start.",
+              name, indexFile.getPath());
+          metrics.restoreFailures.increment();
+        }
+        return;
+      }
+
+      logger.atInfo().log("Restoring cache keys index %s from %s file", name, indexFile);
+      try (Timer0.Context timer = metrics.restoreLatency.start()) {
+        CacheSerializer<T> serializer = CacheSerializers.getKeySerializer(name);
+        try (DataInputStream dis =
+            new DataInputStream(new BufferedInputStream(new FileInputStream(indexFile)))) {
+          while (dis.available() > 0) {
+            keys.add(readKey(serializer, dis));
+          }
+          logger.atInfo().log("Cache keys index %s was restored from %s file", name, indexFile);
+        }
+      }
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("Restoring cache keys index %s failed", name);
+      metrics.restoreFailures.increment();
+    }
+  }
+
+  void close() {
+    persist();
+    metrics.close();
+  }
+
+  static final File tempIndexFile(File indexFile) {
+    return new File(String.format("%s.tmp", indexFile.getPath()));
+  }
+
+  private void writeKey(CacheSerializer<T> serializer, DataOutput out, TimedKey key)
+      throws IOException {
+    byte[] serializeKey = serializer.serialize(key.getKey());
+    out.writeLong(key.getCreated());
+    out.writeInt(serializeKey.length);
+    out.write(serializeKey);
+  }
+
+  private TimedKey readKey(CacheSerializer<T> serializer, DataInput in) throws IOException {
+    long created = in.readLong();
+    int keySize = in.readInt();
+    byte[] serializedKey = new byte[keySize];
+    in.readFully(serializedKey);
+    T key = serializer.deserialize(serializedKey);
+    return new TimedKey(key, created);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheSerializers.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheSerializers.java
index 8546981..387391e 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheSerializers.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheSerializers.java
@@ -13,10 +13,12 @@
 // limitations under the License.
 package com.googlesource.gerrit.modules.cache.chroniclemap;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.gerrit.server.cache.PersistentCacheDef;
 import com.google.gerrit.server.cache.serialize.CacheSerializer;
 import com.google.inject.Singleton;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 @Singleton
@@ -30,16 +32,6 @@
     registerCacheValueSerializer(cacheName, def.valueSerializer());
   }
 
-  static <K, V> void registerCacheKeySerializer(
-      String cacheName, CacheSerializer<K> keySerializer) {
-    keySerializers.computeIfAbsent(cacheName, (name) -> keySerializer);
-  }
-
-  static <K, V> void registerCacheValueSerializer(
-      String cacheName, CacheSerializer<V> valueSerializer) {
-    valueSerializers.computeIfAbsent(cacheName, (name) -> valueSerializer);
-  }
-
   @SuppressWarnings("unchecked")
   public static <K> CacheSerializer<K> getKeySerializer(String name) {
     if (keySerializers.containsKey(name)) {
@@ -55,4 +47,29 @@
     }
     throw new IllegalStateException("Could not find value serializer for " + name);
   }
+
+  @VisibleForTesting
+  static <K, V> void registerCacheKeySerializer(
+      String cacheName, CacheSerializer<K> keySerializer) {
+    keySerializers.computeIfAbsent(cacheName, (name) -> keySerializer);
+  }
+
+  @VisibleForTesting
+  static <K, V> void registerCacheValueSerializer(
+      String cacheName, CacheSerializer<V> valueSerializer) {
+    valueSerializers.computeIfAbsent(cacheName, (name) -> valueSerializer);
+  }
+
+  @VisibleForTesting
+  static Set<String> getSerializersNames() {
+    // caches registration during Gerrit's start is performed through registerCacheDef hence there
+    // is no need to check both maps for all serializers names
+    return keySerializers.keySet();
+  }
+
+  @VisibleForTesting
+  static void clear() {
+    keySerializers.clear();
+    valueSerializers.clear();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CachesWithoutChronicleMapConfigMetric.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CachesWithoutChronicleMapConfigMetric.java
new file mode 100644
index 0000000..0443fc6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/CachesWithoutChronicleMapConfigMetric.java
@@ -0,0 +1,57 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.modules.cache.chroniclemap;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.metrics.Counter0;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+@Singleton
+class CachesWithoutChronicleMapConfigMetric {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final Set<String> uniqueCacheNames;
+  private final Counter0 numberOfCachesWithoutConfig;
+
+  @Inject
+  CachesWithoutChronicleMapConfigMetric(MetricMaker metricMaker) {
+    this.uniqueCacheNames = Collections.synchronizedSet(new HashSet<>());
+
+    String metricName = "cache/chroniclemap/caches_without_configuration";
+    this.numberOfCachesWithoutConfig =
+        metricMaker.newCounter(
+            metricName,
+            new Description(
+                    "The number of caches that have no chronicle map configuration provided in 'gerrit.config' and use defaults.")
+                .setUnit("caches"));
+  }
+
+  void incrementForCache(String name) {
+    if (uniqueCacheNames.add(name)) {
+      numberOfCachesWithoutConfig.increment();
+      logger.atWarning().log("Fall back to default configuration for '%s' cache", name);
+    }
+  }
+
+  Set<String> cachesOnDefaults() {
+    return uniqueCacheNames;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfig.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfig.java
index 79e4b38..e2c5afe 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfig.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfig.java
@@ -13,21 +13,31 @@
 // limitations under the License.
 package com.googlesource.gerrit.modules.cache.chroniclemap;
 
+import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheFactory.PRUNE_DELAY;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.common.Nullable;
 import com.google.gerrit.server.config.ConfigUtil;
 import com.google.gerrit.server.config.GerritServerConfig;
+import com.google.inject.Singleton;
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
 import java.io.File;
+import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Optional;
+import org.apache.commons.io.FilenameUtils;
 import org.eclipse.jgit.lib.Config;
 
 public class ChronicleMapCacheConfig {
-  private final File persistedFile;
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final File cacheFile;
+  private final boolean cacheFileExists;
+  private final File indexFile;
   private final long maxEntries;
   private final long averageKeySize;
   private final long averageValueSize;
@@ -35,59 +45,74 @@
   private final Duration refreshAfterWrite;
   private final int maxBloatFactor;
   private final int percentageFreeSpaceEvictionThreshold;
-  private final int percentageHotKeys;
   private final String configKey;
+  private final Duration persistIndexEvery;
+  private final long persistIndexEveryNthPrune;
 
   public interface Factory {
     ChronicleMapCacheConfig create(
         @Assisted("ConfigKey") String configKey,
-        @Assisted File persistedFile,
+        @Assisted File cacheFile,
         @Nullable @Assisted("ExpireAfterWrite") Duration expireAfterWrite,
         @Nullable @Assisted("RefreshAfterWrite") Duration refreshAfterWrite);
 
     ChronicleMapCacheConfig createWithValues(
         @Assisted("ConfigKey") String configKey,
-        @Assisted File persistedFile,
+        @Assisted File cacheFile,
         @Nullable @Assisted("ExpireAfterWrite") Duration expireAfterWrite,
         @Nullable @Assisted("RefreshAfterWrite") Duration refreshAfterWrite,
         @Assisted("maxEntries") long maxEntries,
         @Assisted("avgKeySize") long avgKeySize,
         @Assisted("avgValueSize") long avgValueSize,
-        @Assisted("maxBloatFactor") int maxBloatFactor);
+        @Assisted("maxBloatFactor") int maxBloatFactor,
+        @Assisted("PersistIndexEvery") Duration persistIndexEver);
   }
 
   @AssistedInject
   ChronicleMapCacheConfig(
       @GerritServerConfig Config cfg,
+      CachesWithoutChronicleMapConfigMetric cachesWithoutConfigMetric,
       @Assisted("ConfigKey") String configKey,
-      @Assisted File persistedFile,
+      @Assisted File cacheFile,
       @Nullable @Assisted("ExpireAfterWrite") Duration expireAfterWrite,
       @Nullable @Assisted("RefreshAfterWrite") Duration refreshAfterWrite) {
 
     this(
         cfg,
+        cachesWithoutConfigMetric,
         configKey,
-        persistedFile,
+        cacheFile,
         expireAfterWrite,
         refreshAfterWrite,
         cfg.getLong("cache", configKey, "maxEntries", Defaults.maxEntriesFor(configKey)),
         cfg.getLong("cache", configKey, "avgKeySize", Defaults.averageKeySizeFor(configKey)),
         cfg.getLong("cache", configKey, "avgValueSize", Defaults.avgValueSizeFor(configKey)),
-        cfg.getInt("cache", configKey, "maxBloatFactor", Defaults.maxBloatFactorFor(configKey)));
+        cfg.getInt("cache", configKey, "maxBloatFactor", Defaults.maxBloatFactorFor(configKey)),
+        Duration.ofSeconds(
+            cfg.getTimeUnit(
+                "cache",
+                null,
+                "persistIndexEvery",
+                Defaults.persistIndexEvery().toSeconds(),
+                SECONDS)));
   }
 
   @AssistedInject
   ChronicleMapCacheConfig(
       @GerritServerConfig Config cfg,
+      CachesWithoutChronicleMapConfigMetric cachesWithoutConfigMetric,
       @Assisted("ConfigKey") String configKey,
-      @Assisted File persistedFile,
+      @Assisted File cacheFile,
       @Nullable @Assisted("ExpireAfterWrite") Duration expireAfterWrite,
       @Nullable @Assisted("RefreshAfterWrite") Duration refreshAfterWrite,
       @Assisted("maxEntries") long maxEntries,
       @Assisted("avgKeySize") long avgKeySize,
       @Assisted("avgValueSize") long avgValueSize,
-      @Assisted("maxBloatFactor") int maxBloatFactor) {
-    this.persistedFile = persistedFile;
+      @Assisted("maxBloatFactor") int maxBloatFactor,
+      @Assisted("PersistIndexEvery") Duration persistIndexEvery) {
+    this.cacheFile = cacheFile;
+    this.cacheFileExists = cacheFile.exists();
+    this.indexFile = resolveIndexFile(cacheFile);
     this.configKey = configKey;
 
     this.maxEntries = maxEntries;
@@ -116,22 +141,30 @@
             "percentageFreeSpaceEvictionThreshold",
             Defaults.percentageFreeSpaceEvictionThreshold());
 
-    this.percentageHotKeys =
-        cfg.getInt("cache", configKey, "percentageHotKeys", Defaults.percentageHotKeys());
-
-    if (percentageHotKeys <= 0 || percentageHotKeys >= 100) {
-      throw new IllegalArgumentException("Invalid 'percentageHotKeys': should be in range [1-99]");
+    long persistIndexEverySeconds = persistIndexEvery.getSeconds();
+    if (persistIndexEverySeconds < PRUNE_DELAY) {
+      logger.atWarning().log(
+          "Configured 'persistIndexEvery' duration [%ds] is lower than minimal threshold [%ds]."
+              + " Minimal threshold will be used.",
+          persistIndexEverySeconds, PRUNE_DELAY);
+      persistIndexEverySeconds = PRUNE_DELAY;
+    } else if (persistIndexEverySeconds % PRUNE_DELAY != 0L) {
+      logger.atWarning().log(
+          "Configured 'persistIndexEvery' duration [%ds] will be rounded down to a multiple of"
+              + " [%ds]. [%ds] is the minimal 'persistIndexEvery' resolution.",
+          persistIndexEverySeconds, PRUNE_DELAY, PRUNE_DELAY);
+      persistIndexEverySeconds = (persistIndexEverySeconds / PRUNE_DELAY) * PRUNE_DELAY;
     }
+    this.persistIndexEvery = Duration.ofSeconds(persistIndexEverySeconds);
+    this.persistIndexEveryNthPrune = persistIndexEverySeconds / PRUNE_DELAY;
+
+    emitMetricForMissingConfig(cfg, cachesWithoutConfigMetric, configKey);
   }
 
   public int getPercentageFreeSpaceEvictionThreshold() {
     return percentageFreeSpaceEvictionThreshold;
   }
 
-  public int getpercentageHotKeys() {
-    return percentageHotKeys;
-  }
-
   public Duration getExpireAfterWrite() {
     return expireAfterWrite;
   }
@@ -140,12 +173,28 @@
     return refreshAfterWrite;
   }
 
+  public Duration getPersistIndexEvery() {
+    return persistIndexEvery;
+  }
+
+  public long getPersistIndexEveryNthPrune() {
+    return persistIndexEveryNthPrune;
+  }
+
   public long getMaxEntries() {
     return maxEntries;
   }
 
-  public File getPersistedFile() {
-    return persistedFile;
+  public File getCacheFile() {
+    return cacheFile;
+  }
+
+  public boolean getCacheFileExists() {
+    return cacheFileExists;
+  }
+
+  public File getIndexFile() {
+    return indexFile;
   }
 
   public long getAverageKeySize() {
@@ -164,11 +213,41 @@
     return configKey;
   }
 
+  private static void emitMetricForMissingConfig(
+      Config cfg,
+      CachesWithoutChronicleMapConfigMetric cachesWithoutConfigMetric,
+      String configKey) {
+    if (!cfg.getSubsections("cache").stream()
+        .filter(cache -> cache.equalsIgnoreCase(configKey))
+        .filter(
+            cache ->
+                isConfigured(cfg, cache, "maxEntries")
+                    || isConfigured(cfg, cache, "avgKeySize")
+                    || isConfigured(cfg, cache, "avgValueSize")
+                    || isConfigured(cfg, cache, "maxBloatFactor"))
+        .findAny()
+        .isPresent()) {
+      cachesWithoutConfigMetric.incrementForCache(configKey);
+    }
+  }
+
+  private static boolean isConfigured(Config cfg, String configKey, String name) {
+    return cfg.getString("cache", configKey, name) != null;
+  }
+
+  private static File resolveIndexFile(File persistedCacheFile) {
+    String cacheFileName = persistedCacheFile.getName();
+    String indexFileName = String.format("%s.index", FilenameUtils.getBaseName(cacheFileName));
+
+    return Path.of(persistedCacheFile.getParent()).resolve(indexFileName).toFile();
+  }
+
   private static long toSeconds(@Nullable Duration duration) {
     return duration != null ? duration.getSeconds() : 0;
   }
 
-  protected static class Defaults {
+  @Singleton
+  static class Defaults {
 
     public static final long DEFAULT_MAX_ENTRIES = 1000;
 
@@ -178,24 +257,32 @@
     public static final int DEFAULT_MAX_BLOAT_FACTOR = 1;
 
     public static final int DEFAULT_PERCENTAGE_FREE_SPACE_EVICTION_THRESHOLD = 90;
-    public static final int DEFAULT_PERCENTAGE_HOT_KEYS = 50;
 
-    private static final ImmutableMap<String, DefaultConfig> defaultMap =
+    public static final Duration DEFAULT_PERSIST_INDEX_EVERY = Duration.ofMinutes(15);
+
+    @VisibleForTesting
+    static final ImmutableMap<String, DefaultConfig> defaultMap =
         new ImmutableMap.Builder<String, DefaultConfig>()
-            .put("web_sessions", DefaultConfig.create(45, 221, 1000, 1))
-            .put("change_notes", DefaultConfig.create(36, 10240, 1000, 3))
             .put("accounts", DefaultConfig.create(30, 256, 1000, 1))
-            .put("gerrit_file_diff", DefaultConfig.create(98, 10240, 1000, 2))
-            .put("git_file_diff", DefaultConfig.create(98, 10240, 1000, 2))
+            .put("approvals", DefaultConfig.create(103, 365, 1000, 3))
+            .put("change_kind", DefaultConfig.create(59, 26, 1000, 1))
+            .put("change_notes", DefaultConfig.create(36, 10240, 1000, 3))
+            .put("comment_context", DefaultConfig.create(80, 662, 2000, 3))
+            .put("conflicts", DefaultConfig.create(70, 16, 1000, 1))
             .put("diff_intraline", DefaultConfig.create(512, 2048, 1000, 2))
             .put("diff_summary", DefaultConfig.create(128, 2048, 1000, 1))
             .put("external_ids_map", DefaultConfig.create(128, 204800, 2, 1))
-            .put("oauth_tokens", DefaultConfig.create(8, 2048, 1000, 1))
-            .put("change_kind", DefaultConfig.create(59, 26, 1000, 1))
+            .put("gerrit_file_diff", DefaultConfig.create(342, 883, 1000, 1))
+            .put("git_file_diff", DefaultConfig.create(349, 943, 1000, 1))
+            .put("git_modified_files", DefaultConfig.create(349, 943, 1000, 1))
+            .put("git_tags", DefaultConfig.create(43, 6673, 1000, 3))
+            .put("groups_byuuid_persisted", DefaultConfig.create(64, 182, 1000, 1))
             .put("mergeability", DefaultConfig.create(79, 16, 65000, 2))
-            .put("pure_revert", DefaultConfig.create(55, 16, 1000, 1))
+            .put("modified_files", DefaultConfig.create(138, 2600, 1000, 1))
+            .put("oauth_tokens", DefaultConfig.create(8, 2048, 1000, 1))
             .put("persisted_projects", DefaultConfig.create(128, 1024, 250, 2))
-            .put("conflicts", DefaultConfig.create(70, 16, 1000, 1))
+            .put("pure_revert", DefaultConfig.create(55, 16, 1000, 1))
+            .put("web_sessions", DefaultConfig.create(45, 221, 1000, 1))
             .build();
 
     public static long averageKeySizeFor(String configKey) {
@@ -226,8 +313,8 @@
       return DEFAULT_PERCENTAGE_FREE_SPACE_EVICTION_THRESHOLD;
     }
 
-    public static int percentageHotKeys() {
-      return DEFAULT_PERCENTAGE_HOT_KEYS;
+    public static Duration persistIndexEvery() {
+      return DEFAULT_PERSIST_INDEX_EVERY;
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheFactory.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheFactory.java
index 1161699..e4474b0 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheFactory.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheFactory.java
@@ -46,6 +46,8 @@
 
 @Singleton
 class ChronicleMapCacheFactory extends PersistentCacheBaseFactory implements LifecycleListener {
+  static final long PRUNE_DELAY = 30;
+
   private final ChronicleMapCacheConfig.Factory configFactory;
   private final MetricMaker metricMaker;
   private final DynamicMap<Cache<?, ?>> cacheMap;
@@ -53,6 +55,7 @@
   private final ScheduledExecutorService cleanup;
 
   private final LoggingContextAwareExecutorService storePersistenceExecutor;
+  private final LoggingContextAwareExecutorService indexPersistenceExecutor;
 
   @Inject
   ChronicleMapCacheFactory(
@@ -79,6 +82,10 @@
         new LoggingContextAwareExecutorService(
             Executors.newFixedThreadPool(
                 1, new ThreadFactoryBuilder().setNameFormat("ChronicleMap-Store-%d").build()));
+    this.indexPersistenceExecutor =
+        new LoggingContextAwareExecutorService(
+            Executors.newFixedThreadPool(
+                1, new ThreadFactoryBuilder().setNameFormat("ChronicleMap-Index-%d").build()));
   }
 
   @Override
@@ -121,7 +128,8 @@
               config,
               metricMaker,
               memLoader,
-              new InMemoryCacheLoadingFromStoreImpl<>(mem, false));
+              new InMemoryCacheLoadingFromStoreImpl<>(mem, false),
+              indexPersistenceExecutor);
 
     } catch (IOException e) {
       throw new UncheckedIOException(e);
@@ -173,7 +181,8 @@
               config,
               metricMaker,
               memLoader,
-              new InMemoryCacheLoadingFromStoreImpl<>(mem, true));
+              new InMemoryCacheLoadingFromStoreImpl<>(mem, true),
+              indexPersistenceExecutor);
     } catch (IOException e) {
       throw new UncheckedIOException(e);
     }
@@ -198,13 +207,19 @@
   @Override
   public void start() {
     for (ChronicleMapCacheImpl<?, ?> cache : caches) {
-      cleanup.scheduleWithFixedDelay(cache::prune, 30, 30, TimeUnit.SECONDS);
+      cleanup.scheduleWithFixedDelay(cache::prune, PRUNE_DELAY, PRUNE_DELAY, TimeUnit.SECONDS);
     }
   }
 
   @Override
   public void stop() {
     cleanup.shutdownNow();
+    for (ChronicleMapCacheImpl<?, ?> cache : caches) {
+      cache.close();
+    }
+    caches.clear();
+    storePersistenceExecutor.shutdown();
+    indexPersistenceExecutor.shutdown();
   }
 
   public static File fileName(Path cacheDir, String name, Integer version) {
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheImpl.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheImpl.java
index b704a7b..7efc8b5 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheImpl.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheImpl.java
@@ -17,7 +17,6 @@
 import com.google.common.cache.CacheStats;
 import com.google.common.flogger.FluentLogger;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.gerrit.metrics.Description;
 import com.google.gerrit.metrics.DisabledMetricMaker;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.server.cache.PersistentCache;
@@ -29,6 +28,7 @@
 import java.time.Instant;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.LongAdder;
 import net.openhft.chronicle.map.ChronicleMap;
 import net.openhft.chronicle.map.ChronicleMapBuilder;
@@ -45,10 +45,12 @@
   private final LongAdder loadSuccessCount = new LongAdder();
   private final LongAdder loadExceptionCount = new LongAdder();
   private final LongAdder totalLoadTime = new LongAdder();
-  private final InMemoryLRU<K> hotEntries;
+  private final CacheKeysIndex<K> keysIndex;
   private final PersistentCacheDef<K, V> cacheDefinition;
   private final ChronicleMapCacheLoader<K, V> memLoader;
   private final InMemoryCache<K, V> mem;
+  private final Executor indexPersistenceExecutor;
+  private long pruneCount;
 
   ChronicleMapCacheImpl(PersistentCacheDef<K, V> def, ChronicleMapCacheConfig config)
       throws IOException {
@@ -56,17 +58,15 @@
 
     this.cacheDefinition = def;
     this.config = config;
-    this.hotEntries =
-        new InMemoryLRU<>(
-            (int) Math.max(config.getMaxEntries() * config.getpercentageHotKeys() / 100, 1));
+    this.keysIndex =
+        new CacheKeysIndex<>(
+            metricMaker, def.name(), config.getIndexFile(), config.getCacheFileExists());
     this.store = createOrRecoverStore(def, config, metricMaker);
     this.memLoader =
         new ChronicleMapCacheLoader<>(
             MoreExecutors.directExecutor(), store, config.getExpireAfterWrite());
     this.mem = memLoader.asInMemoryCacheBypass();
-
-    ChronicleMapCacheMetrics metrics = new ChronicleMapCacheMetrics(metricMaker);
-    metrics.registerCallBackMetrics(def.name(), this);
+    this.indexPersistenceExecutor = MoreExecutors.directExecutor();
   }
 
   ChronicleMapCacheImpl(
@@ -74,19 +74,18 @@
       ChronicleMapCacheConfig config,
       MetricMaker metricMaker,
       ChronicleMapCacheLoader<K, V> memLoader,
-      InMemoryCache<K, V> mem) {
+      InMemoryCache<K, V> mem,
+      Executor indexPersistenceExecutor) {
 
     this.cacheDefinition = def;
     this.config = config;
-    this.hotEntries =
-        new InMemoryLRU<>(
-            (int) Math.max(config.getMaxEntries() * config.getpercentageHotKeys() / 100, 1));
+    this.keysIndex =
+        new CacheKeysIndex<>(
+            metricMaker, def.name(), config.getIndexFile(), config.getCacheFileExists());
     this.memLoader = memLoader;
     this.mem = mem;
     this.store = memLoader.getStore();
-
-    ChronicleMapCacheMetrics metrics = new ChronicleMapCacheMetrics(metricMaker);
-    metrics.registerCallBackMetrics(def.name(), this);
+    this.indexPersistenceExecutor = indexPersistenceExecutor;
   }
 
   @SuppressWarnings({"unchecked", "cast", "rawtypes"})
@@ -111,7 +110,9 @@
     }
 
     mapBuilder.averageValueSize(config.getAverageValueSize());
-    mapBuilder.valueMarshaller(new TimedValueMarshaller<>(def.name()));
+
+    TimedValueMarshaller<V> valueMarshaller = new TimedValueMarshaller<>(metricMaker, def.name());
+    mapBuilder.valueMarshaller(valueMarshaller);
 
     mapBuilder.entries(config.getMaxEntries());
 
@@ -123,12 +124,12 @@
             + "a function of the number of entries in the cache",
         def.diskLimit(), def.name());
     ChronicleMap<KeyWrapper<K>, TimedValue<V>> store =
-        mapBuilder.createOrRecoverPersistedTo(config.getPersistedFile());
+        mapBuilder.createOrRecoverPersistedTo(config.getCacheFile());
 
     logger.atInfo().log(
         "Initialized '%s'|version: %s|avgKeySize: %s bytes|avgValueSize:"
             + " %s bytes|entries: %s|maxBloatFactor: %s|remainingAutoResizes:"
-            + " %s|percentageFreeSpace: %s",
+            + " %s|percentageFreeSpace: %s|persistIndexEvery: %s",
         def.name(),
         def.version(),
         mapBuilder.constantlySizedKeys() ? "CONSTANT" : config.getAverageKeySize(),
@@ -136,45 +137,22 @@
         config.getMaxEntries(),
         config.getMaxBloatFactor(),
         store.remainingAutoResizes(),
-        store.percentageFreeSpace());
+        store.percentageFreeSpace(),
+        config.getPersistIndexEvery());
 
-    return new ChronicleMapStore<>(store, config, metricMaker);
+    return new ChronicleMapStore<K, V>(store, config, metricMaker) {
+      @Override
+      public void close() {
+        super.close();
+        valueMarshaller.close();
+      }
+    };
   }
 
   protected PersistentCacheDef<K, V> getCacheDefinition() {
     return cacheDefinition;
   }
 
-  private static class ChronicleMapCacheMetrics {
-
-    private final MetricMaker metricMaker;
-
-    ChronicleMapCacheMetrics(MetricMaker metricMaker) {
-      this.metricMaker = metricMaker;
-    }
-
-    <K, V> void registerCallBackMetrics(String name, ChronicleMapCacheImpl<K, V> cache) {
-      String sanitizedName = metricMaker.sanitizeMetricName(name);
-      String HOT_KEYS_CAPACITY_METRIC = "cache/chroniclemap/hot_keys_capacity_" + sanitizedName;
-      String HOT_KEYS_SIZE_METRIC = "cache/chroniclemap/hot_keys_size_" + sanitizedName;
-
-      metricMaker.newConstantMetric(
-          HOT_KEYS_CAPACITY_METRIC,
-          cache.hotEntries.getCapacity(),
-          new Description(
-              String.format(
-                  "The number of hot cache keys for %s cache that can be kept in memory", name)));
-
-      metricMaker.newCallbackMetric(
-          HOT_KEYS_SIZE_METRIC,
-          Integer.class,
-          new Description(
-              String.format(
-                  "The number of hot cache keys for %s cache that are currently in memory", name)),
-          cache.hotEntries::size);
-    }
-  }
-
   public ChronicleMapCacheConfig getConfig() {
     return config;
   }
@@ -187,6 +165,7 @@
       return null;
     }
 
+    keysIndex.add(objKey, timedValue.getCreated());
     return timedValue.getValue();
   }
 
@@ -199,6 +178,9 @@
       if (needsRefresh(valueHolder.getCreated())) {
         store.remove(keyWrapper);
         mem.refresh(key);
+        keysIndex.refresh(key);
+      } else {
+        keysIndex.add(key, valueHolder.getCreated());
       }
       return valueHolder.getValue();
     }
@@ -211,11 +193,12 @@
   @Override
   public V get(K key, Callable<? extends V> valueLoader) throws ExecutionException {
     try {
-      return mem.get(key, () -> getFromStore(key, valueLoader)).getValue();
+      TimedValue<V> value = mem.get(key, () -> getFromStore(key, valueLoader));
+      keysIndex.add(key, value.getCreated());
+      return value.getValue();
+    } catch (ExecutionException e) {
+      throw e;
     } catch (Exception e) {
-      if (e instanceof ExecutionException) {
-        throw (ExecutionException) e;
-      }
       throw new ExecutionException(e);
     }
   }
@@ -259,6 +242,7 @@
     KeyWrapper<?> wrappedKey = new KeyWrapper<>(key);
     if (store.tryPut((KeyWrapper<K>) wrappedKey, (TimedValue<V>) wrappedValue)) {
       mem.put((K) key, (TimedValue<V>) wrappedValue);
+      keysIndex.add(key, wrappedValue.getCreated());
     }
   }
 
@@ -276,6 +260,7 @@
   public void putUnchecked(KeyWrapper<Object> wrappedKey, TimedValue<Object> wrappedValue) {
     if (store.tryPut((KeyWrapper<K>) wrappedKey, (TimedValue<V>) wrappedValue)) {
       mem.put((K) wrappedKey.getValue(), (TimedValue<V>) wrappedValue);
+      keysIndex.add(wrappedKey.getValue(), wrappedValue.getCreated());
     }
   }
 
@@ -284,32 +269,29 @@
     TimedValue<V> timedVal = new TimedValue<>(val);
     if (putTimedToStore(key, timedVal)) {
       mem.put(key, timedVal);
+      keysIndex.add(key, timedVal.getCreated());
     }
   }
 
   boolean putTimedToStore(K key, TimedValue<V> timedVal) {
     KeyWrapper<K> wrappedKey = new KeyWrapper<>(key);
-    boolean putSuccess = store.tryPut(wrappedKey, timedVal);
-    if (putSuccess) {
-      hotEntries.add(key);
-    }
-    return putSuccess;
+    return store.tryPut(wrappedKey, timedVal);
   }
 
   public void prune() {
     if (!config.getExpireAfterWrite().isZero()) {
-      store.forEachEntry(
-          c -> {
-            if (memLoader.expired(c.value().get().getCreated())) {
-              hotEntries.remove(c.key().get().getValue());
-              c.context().remove(c);
-            }
-          });
+      long expirationTime = System.currentTimeMillis() - config.getExpireAfterWrite().toMillis();
+      keysIndex.removeAndConsumeKeysOlderThan(
+          expirationTime, key -> store.remove(new KeyWrapper<>(key)));
     }
 
     if (runningOutOfFreeSpace()) {
       evictColdEntries();
     }
+
+    if (++pruneCount % config.getPersistIndexEveryNthPrune() == 0L) {
+      indexPersistenceExecutor.execute(keysIndex::persist);
+    }
   }
 
   private boolean needsRefresh(long created) {
@@ -324,13 +306,8 @@
   }
 
   private void evictColdEntries() {
-    store.forEachEntryWhile(
-        e -> {
-          if (!hotEntries.contains(e.key().get().getValue())) {
-            e.doRemove();
-          }
-          return runningOutOfFreeSpace();
-        });
+    while (runningOutOfFreeSpace()
+        && keysIndex.removeAndConsumeLruKey(key -> store.remove(new KeyWrapper<>(key)))) ;
   }
 
   @SuppressWarnings("unchecked")
@@ -339,14 +316,14 @@
     KeyWrapper<K> wrappedKey = (KeyWrapper<K>) new KeyWrapper<>(key);
     store.remove(wrappedKey);
     mem.invalidate(key);
-    hotEntries.remove((K) key);
+    keysIndex.invalidate(key);
   }
 
   @Override
   public void invalidateAll() {
     store.clear();
-    hotEntries.invalidateAll();
     mem.invalidateAll();
+    keysIndex.clear();
   }
 
   ChronicleMap<KeyWrapper<K>, TimedValue<V>> getStore() {
@@ -367,7 +344,7 @@
   public DiskStats diskStats() {
     return new DiskStats(
         store.longSize(),
-        config.getPersistedFile().length(),
+        config.getCacheFile().length(),
         hitCount.longValue(),
         missCount.longValue());
   }
@@ -378,6 +355,7 @@
 
   public void close() {
     store.close();
+    keysIndex.close();
   }
 
   public double percentageUsedAutoResizes() {
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheModule.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheModule.java
index 99fe5c1..a9b3856 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheModule.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheModule.java
@@ -26,5 +26,6 @@
     factory(ChronicleMapCacheConfig.Factory.class);
     bind(PersistentCacheFactory.class).to(ChronicleMapCacheFactory.class);
     listener().to(ChronicleMapCacheFactory.class);
+    bind(CachesWithoutChronicleMapConfigMetric.class).asEagerSingleton();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStore.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStore.java
index 0419ca4..38759f4 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStore.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStore.java
@@ -320,6 +320,7 @@
   @Override
   public void close() {
     store.close();
+    metrics.close();
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStoreMetrics.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStoreMetrics.java
index cf758b5..c27be99 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStoreMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapStoreMetrics.java
@@ -14,15 +14,19 @@
 
 package com.googlesource.gerrit.modules.cache.chroniclemap;
 
+import com.google.gerrit.extensions.registration.RegistrationHandle;
 import com.google.gerrit.metrics.Counter0;
 import com.google.gerrit.metrics.Description;
 import com.google.gerrit.metrics.MetricMaker;
+import java.util.HashSet;
+import java.util.Set;
 
 class ChronicleMapStoreMetrics {
   private final String sanitizedName;
   private final MetricMaker metricMaker;
   private final String name;
   private final Counter0 storePutFailures;
+  private final Set<RegistrationHandle> callbacks;
 
   ChronicleMapStoreMetrics(String name, MetricMaker metricMaker) {
     this.name = name;
@@ -37,6 +41,7 @@
                         + name)
                 .setCumulative()
                 .setUnit("errors"));
+    this.callbacks = new HashSet<>(3, 1.0F);
   }
 
   void incrementPutFailures() {
@@ -50,27 +55,37 @@
         "cache/chroniclemap/remaining_autoresizes_" + sanitizedName;
     String MAX_AUTORESIZES_METRIC = "cache/chroniclemap/max_autoresizes_" + sanitizedName;
 
-    metricMaker.newCallbackMetric(
-        PERCENTAGE_FREE_SPACE_METRIC,
-        Long.class,
-        new Description(
-            String.format("The amount of free space in the %s cache as a percentage", name)),
-        () -> (long) store.percentageFreeSpace());
+    callbacks.add(
+        metricMaker.newCallbackMetric(
+            PERCENTAGE_FREE_SPACE_METRIC,
+            Long.class,
+            new Description(
+                String.format("The amount of free space in the %s cache as a percentage", name)),
+            () -> (long) store.percentageFreeSpace()));
 
-    metricMaker.newCallbackMetric(
-        REMAINING_AUTORESIZES_METRIC,
-        Integer.class,
-        new Description(
-            String.format(
-                "The number of times the %s cache can automatically expand its capacity", name)),
-        store::remainingAutoResizes);
+    callbacks.add(
+        metricMaker.newCallbackMetric(
+            REMAINING_AUTORESIZES_METRIC,
+            Integer.class,
+            new Description(
+                String.format(
+                    "The number of times the %s cache can automatically expand its capacity",
+                    name)),
+            store::remainingAutoResizes));
 
-    metricMaker.newConstantMetric(
-        MAX_AUTORESIZES_METRIC,
-        store.maxAutoResizes(),
-        new Description(
-            String.format(
-                "The maximum number of times the %s cache can automatically expand its capacity",
-                name)));
+    callbacks.add(
+        metricMaker.newConstantMetric(
+            MAX_AUTORESIZES_METRIC,
+            store.maxAutoResizes(),
+            new Description(
+                String.format(
+                    "The maximum number of times the %s cache can automatically expand its capacity",
+                    name))));
+  }
+
+  void close() {
+    storePutFailures.remove();
+    callbacks.forEach(RegistrationHandle::remove);
+    callbacks.clear();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/H2MigrationServlet.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/H2MigrationServlet.java
index c74594f..287fe72 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/H2MigrationServlet.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/H2MigrationServlet.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.modules.cache.chroniclemap;
 
+import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.persistIndexEvery;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.H2CacheCommand.H2_SUFFIX;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.H2CacheCommand.getStats;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.H2CacheCommand.jdbcUrl;
@@ -300,7 +301,8 @@
         stats.size() * sizeMultiplier,
         stats.avgKeySize(),
         stats.avgValueSize(),
-        maxBloatFactor);
+        maxBloatFactor,
+        persistIndexEvery());
   }
 
   private void doMigrate(
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/InMemoryLRU.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/InMemoryLRU.java
deleted file mode 100644
index dcf7a68..0000000
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/InMemoryLRU.java
+++ /dev/null
@@ -1,77 +0,0 @@
-// Copyright (C) 2020 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.googlesource.gerrit.modules.cache.chroniclemap;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public class InMemoryLRU<K> {
-
-  private final Map<K, Boolean> LRUMap;
-
-  private static final Boolean dummyValue = Boolean.TRUE;
-  private final int capacity;
-
-  public InMemoryLRU(int capacity) {
-    this.capacity = capacity;
-
-    LRUMap =
-        Collections.synchronizedMap(
-            new LinkedHashMap<K, Boolean>(capacity, 0.75f, true) {
-              private static final long serialVersionUID = 1L;
-
-              @Override
-              protected boolean removeEldestEntry(Map.Entry<K, Boolean> eldest) {
-                return size() > capacity;
-              }
-            });
-  }
-
-  public void add(K objKey) {
-    LRUMap.putIfAbsent(objKey, dummyValue);
-  }
-
-  public boolean contains(K key) {
-    return LRUMap.containsKey(key);
-  }
-
-  /**
-   * Remove a key from the map
-   *
-   * @param key element to remove from the map
-   * @return true when key was in the map, null otherwise
-   */
-  public Boolean remove(K key) {
-    return LRUMap.remove(key);
-  }
-
-  public void invalidateAll() {
-    LRUMap.clear();
-  }
-
-  public int size() {
-    return LRUMap.size();
-  }
-
-  @VisibleForTesting
-  protected Object[] toArray() {
-    return LRUMap.keySet().toArray();
-  }
-
-  public int getCapacity() {
-    return capacity;
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/KeyWrapperMarshaller.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/KeyWrapperMarshaller.java
index 53de07c..1998c3d 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/KeyWrapperMarshaller.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/KeyWrapperMarshaller.java
@@ -13,6 +13,7 @@
 // limitations under the License.
 package com.googlesource.gerrit.modules.cache.chroniclemap;
 
+import com.google.gerrit.server.cache.serialize.CacheSerializer;
 import net.openhft.chronicle.bytes.Bytes;
 import net.openhft.chronicle.core.util.ReadResolvable;
 import net.openhft.chronicle.hash.serialization.BytesReader;
@@ -24,9 +25,11 @@
         ReadResolvable<KeyWrapperMarshaller<V>> {
 
   private final String name;
+  private final CacheSerializer<V> cacheSerializer;
 
   KeyWrapperMarshaller(String name) {
     this.name = name;
+    this.cacheSerializer = CacheSerializers.getKeySerializer(name);
   }
 
   @Override
@@ -34,13 +37,13 @@
     return new KeyWrapperMarshaller<>(name);
   }
 
-  @SuppressWarnings({"unchecked", "rawtypes"})
+  @SuppressWarnings("rawtypes")
   @Override
   public KeyWrapper<V> read(Bytes in, KeyWrapper<V> using) {
     int serializedLength = (int) in.readUnsignedInt();
     byte[] serialized = new byte[serializedLength];
     in.read(serialized, 0, serializedLength);
-    V v = (V) CacheSerializers.getKeySerializer(name).deserialize(serialized);
+    V v = cacheSerializer.deserialize(serialized);
     using = new KeyWrapper<>(v);
 
     return using;
@@ -49,7 +52,7 @@
   @SuppressWarnings("rawtypes")
   @Override
   public void write(Bytes out, KeyWrapper<V> toWrite) {
-    final byte[] serialized = CacheSerializers.getKeySerializer(name).serialize(toWrite.getValue());
+    final byte[] serialized = cacheSerializer.serialize(toWrite.getValue());
     out.writeUnsignedInt(serialized.length);
     out.write(serialized);
   }
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/SerializationMetricsForCache.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/SerializationMetricsForCache.java
new file mode 100644
index 0000000..098e8c6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/SerializationMetricsForCache.java
@@ -0,0 +1,88 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.modules.cache.chroniclemap;
+
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Description.Units;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.metrics.Timer0;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Metrics contains relations with objects created upon Gerrit start and as such it cannot be
+ * serialized. Upon the situation when store is being restored from a persistent file Marshaller
+ * gets deserialised and its transient metrics value needs to be re-initialised with object that was
+ * created upon the Gerrit start. Note that Marshaller creation is a step that prepends the cache
+ * build therefore it is ensured that metrics object is always available for cache's Marshaller.
+ */
+class SerializationMetricsForCache {
+  protected static class Metrics {
+    final Timer0 deserializeLatency;
+    final Timer0 serializeLatency;
+
+    private Metrics(MetricMaker metricMaker, String cacheName) {
+      String sanitizedName = metricMaker.sanitizeMetricName(cacheName);
+
+      deserializeLatency =
+          metricMaker.newTimer(
+              "cache/chroniclemap/store_deserialize_latency_" + sanitizedName,
+              new Description(
+                      String.format(
+                          "The latency of deserializing entries from chronicle-map store for %s"
+                              + " cache",
+                          cacheName))
+                  .setCumulative()
+                  .setUnit(Units.NANOSECONDS));
+
+      serializeLatency =
+          metricMaker.newTimer(
+              "cache/chroniclemap/store_serialize_latency_" + sanitizedName,
+              new Description(
+                      String.format(
+                          "The latency of serializing entries to chronicle-map store for %s cache",
+                          cacheName))
+                  .setCumulative()
+                  .setUnit(Units.NANOSECONDS));
+    }
+  }
+
+  private static final Map<String, Metrics> metricsCache = new HashMap<>();
+
+  protected final String name;
+  protected final transient Metrics metrics;
+
+  protected SerializationMetricsForCache(MetricMaker metricMaker, String name) {
+    this(createMetrics(metricMaker, name), name);
+  }
+
+  protected SerializationMetricsForCache(Metrics metrics, String name) {
+    this.metrics = Optional.ofNullable(metrics).orElseGet(() -> metricsCache.get(name));
+    this.name = name;
+  }
+
+  private static Metrics createMetrics(MetricMaker metricMaker, String name) {
+    Metrics metrics = new Metrics(metricMaker, name);
+    metricsCache.put(name, metrics);
+    return metrics;
+  }
+
+  void close() {
+    metrics.deserializeLatency.remove();
+    metrics.serializeLatency.remove();
+    metricsCache.remove(name);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValue.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValue.java
index 6bb53bc..e6ccda2 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValue.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValue.java
@@ -16,9 +16,8 @@
 import com.google.common.base.Objects;
 
 public class TimedValue<V> {
-
-  private final V value;
-  private final long created;
+  private V value;
+  private long created;
 
   TimedValue(V value) {
     this.created = System.currentTimeMillis();
@@ -34,10 +33,18 @@
     return created;
   }
 
+  void setCreated(long created) {
+    this.created = created;
+  }
+
   public V getValue() {
     return value;
   }
 
+  void setValue(V value) {
+    this.value = value;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshaller.java b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshaller.java
index f85f57e..d3dd713 100644
--- a/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshaller.java
+++ b/src/main/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshaller.java
@@ -13,78 +13,85 @@
 // limitations under the License.
 package com.googlesource.gerrit.modules.cache.chroniclemap;
 
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.metrics.Timer0;
+import com.google.gerrit.server.cache.serialize.CacheSerializer;
 import java.nio.ByteBuffer;
 import net.openhft.chronicle.bytes.Bytes;
 import net.openhft.chronicle.core.util.ReadResolvable;
 import net.openhft.chronicle.hash.serialization.BytesReader;
 import net.openhft.chronicle.hash.serialization.BytesWriter;
 
-public class TimedValueMarshaller<V>
+public class TimedValueMarshaller<V> extends SerializationMetricsForCache
     implements BytesWriter<TimedValue<V>>,
         BytesReader<TimedValue<V>>,
         ReadResolvable<TimedValueMarshaller<V>> {
 
-  private final String name;
+  private static final ThreadLocal<byte[]> staticBuffer =
+      new ThreadLocal<byte[]>() {
+        @Override
+        protected byte[] initialValue() {
+          return new byte[Long.BYTES + Integer.BYTES];
+        }
+      };
 
-  TimedValueMarshaller(String name) {
-    this.name = name;
+  private final CacheSerializer<V> cacheSerializer;
+
+  TimedValueMarshaller(MetricMaker metricMaker, String name) {
+    super(metricMaker, name);
+    this.cacheSerializer = CacheSerializers.getValueSerializer(name);
+  }
+
+  private TimedValueMarshaller(Metrics metrics, String name) {
+    super(metrics, name);
+    this.cacheSerializer = CacheSerializers.getValueSerializer(name);
   }
 
   @Override
   public TimedValueMarshaller<V> readResolve() {
-    return new TimedValueMarshaller<>(name);
+    return new TimedValueMarshaller<>(metrics, name);
   }
 
-  @SuppressWarnings({"rawtypes", "unchecked"})
+  @SuppressWarnings("rawtypes")
   @Override
   public TimedValue<V> read(Bytes in, TimedValue<V> using) {
-    long initialPosition = in.readPosition();
+    try (Timer0.Context timer = metrics.deserializeLatency.start()) {
+      byte[] bytesBuffer = staticBuffer.get();
+      in.read(bytesBuffer);
+      ByteBuffer buffer = ByteBuffer.wrap(bytesBuffer);
+      long created = buffer.getLong();
+      int vLength = buffer.getInt();
 
-    // Deserialize the creation timestamp (first 8 bytes)
-    byte[] serializedLong = new byte[Long.BYTES];
-    in.read(serializedLong, 0, Long.BYTES);
-    ByteBuffer buffer = ByteBuffer.wrap(serializedLong);
-    long created = buffer.getLong(0);
-    in.readPosition(initialPosition + Long.BYTES);
+      // Deserialize object V (remaining bytes)
+      byte[] serializedV = new byte[vLength];
+      in.read(serializedV);
+      V v = cacheSerializer.deserialize(serializedV);
 
-    // Deserialize the length of the serialized value (second 8 bytes)
-    byte[] serializedInt = new byte[Integer.BYTES];
-    in.read(serializedInt, 0, Integer.BYTES);
-    ByteBuffer buffer2 = ByteBuffer.wrap(serializedInt);
-    int vLength = buffer2.getInt(0);
-    in.readPosition(initialPosition + Long.BYTES + Integer.BYTES);
-
-    // Deserialize object V (remaining bytes)
-    byte[] serializedV = new byte[vLength];
-    in.read(serializedV, 0, vLength);
-    V v = (V) CacheSerializers.getValueSerializer(name).deserialize(serializedV);
-
-    using = new TimedValue<>(v, created);
-
-    return using;
+      if (using == null) {
+        using = new TimedValue<>(v, created);
+      } else {
+        using.setCreated(created);
+        using.setValue(v);
+      }
+      return using;
+    }
   }
 
   @SuppressWarnings("rawtypes")
   @Override
   public void write(Bytes out, TimedValue<V> toWrite) {
-    byte[] serialized = CacheSerializers.getValueSerializer(name).serialize(toWrite.getValue());
+    try (Timer0.Context timer = metrics.serializeLatency.start()) {
+      byte[] serialized = cacheSerializer.serialize(toWrite.getValue());
 
-    // Serialize as follows:
-    // created | length of serialized V | serialized value V
-    // 8 bytes |       4 bytes          | serialized_length bytes
-
-    int capacity = Long.BYTES + Integer.BYTES + serialized.length;
-    ByteBuffer buffer = ByteBuffer.allocate(capacity);
-
-    long timestamp = toWrite.getCreated();
-    buffer.putLong(0, timestamp);
-
-    buffer.position(Long.BYTES);
-    buffer.putInt(serialized.length);
-
-    buffer.position(Long.BYTES + Integer.BYTES);
-    buffer.put(serialized);
-
-    out.write(buffer.array());
+      // Serialize as follows:
+      // created | length of serialized V | serialized value V
+      // 8 bytes |       4 bytes          | serialized_length bytes
+      byte[] bytesBuffer = staticBuffer.get();
+      ByteBuffer buffer = ByteBuffer.wrap(bytesBuffer);
+      buffer.putLong(toWrite.getCreated());
+      buffer.putInt(serialized.length);
+      out.write(bytesBuffer);
+      out.write(serialized);
+    }
   }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 60d4ae1..f554b6b 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -90,6 +90,26 @@
 
 Default: *90*
 
+* `cache.persistIndexEvery`
+: Duration (in seconds if not specified) between caches keys index persist
+operations.
+
+Note that in order to avoid race condition between evict and persist operations
+the latter is performed after the former is finished. Therefore the lowest
+persist resolution is 30s. Smaller values will be rounded up to 30s whereas
+higher values will be rounded down (if needed) to the closest multiple of 30s.
+From practical point of view it means that persist operation will be performed
+after every n-th evict operation is finished.
+For instance if `cache.persistIndexEvery = 2m` then persist will be called
+after every 4th eviction is finished.
+
+Notes:
+* regardless of `cache.persistIndexEvery` persist will be called on
+  Gerrit termination unless there is one already in progress.
+* persist operation is scheduled in a dedicated thread ('ChronicleMap-Index-0')
+
+Default: *15m*
+
 ### Defaults
 
 Unless overridden by configuration, sensible default values are be provided for
@@ -128,24 +148,6 @@
  If the cache expands you will see an increase in the available free space.
 [official documentation](https://javadoc.io/static/net.openhft/chronicle-map/3.20.83/net/openhft/chronicle/map/ChronicleMap.html#percentageFreeSpace--)
 
-* `percentageHotKeys`
-: The percentage of _hot_ keys that can be kept in-memory.
-When performing evictions, _hot_ keys will be preserved and only _cold_ keys
-will be evicted from chronicle-map, in random order.
-
-This value implies a trade-off between eviction speed and eviction accuracy.
-
-The smaller the number of hotKeys allocated, the quicker the eviction phase
-will be. However, this will increase the chance of evicting entries that were
-recently accessed.
-
-Conversely, the higher the number of hotKeys allocated, the higher will be the
-accuracy in evicting only recently accessed keys, at the price of a longer
-time spent doing evictions.
-
-In order to ensure there is always a cold entry to be evicted, the number of
-`percentageHotKeys` always needs to be less than `maxEntries`.
-
 *Constraints*: [1-99]
 *Default*: 50
 
diff --git a/src/main/resources/Documentation/metrics.md b/src/main/resources/Documentation/metrics.md
index 2928708..0dca368 100644
--- a/src/main/resources/Documentation/metrics.md
+++ b/src/main/resources/Documentation/metrics.md
@@ -20,11 +20,39 @@
 * cache/chroniclemap/max_autoresizes_<cache-name>
   : The maximum number of times the cache can automatically expand its capacity.
 
-* cache/chroniclemap/hot_keys_capacity_<cache-name>
-  : Constant number of hot keys for the cache that can be kept in memory.
+* cache/chroniclemap/keys_index_size_<cache-name>
+  : The number of index keys for the cache that are currently in memory.
 
-* cache/chroniclemap/hot_keys_size_<cache-name>
-  : The number of hot keys for the cache that are currently in memory.
+* cache/chroniclemap/keys_index_add_latency_<cache-name>
+  : The latency of adding cache key to an index.
 
-* "cache/chroniclemap/store_put_failures_<cache-name>
-  : The number of errors caught when inserting entries in chronicle-map store
\ No newline at end of file
+* cache/chroniclemap/keys_index_remove_and_consume_older_than_latency_<cache-name>
+  : The latency of removing and consuming all keys older than expiration time for an index.
+
+* cache/chroniclemap/keys_index_remove_lru_key_latency_<cache-name>
+  : The latency of removing and consuming LRU key from an index.
+
+* cache/chroniclemap/keys_index_restore_latency_<cache-name>
+  : The latency of restoring an index from a file (performed once during the plugin start).
+
+* cache/chroniclemap/keys_index_persist_latency_<cache-name>
+  : The latency of persisting an index to a file.
+
+* cache/chroniclemap/store_serialize_latency_<cache-name>
+  : The latency of serializing entries in chronicle-map store
+
+* cache/chroniclemap/store_deserialize_latency_<cache-name>
+  : The latency of deserializing entries from chronicle-map store
+
+* cache/chroniclemap/store_put_failures_<cache-name>
+  : The number of errors caught when inserting entries in chronicle-map store
+
+* cache/chroniclemap/keys_index_restore_failures_<cache-name>
+  : The number of errors caught when restore cache index from file operation was performed
+
+* cache/chroniclemap/keys_index_persist_failures_<cache-name>
+  : The number of errors caught when persist cache index to file operation was performed
+
+* cache/chroniclemap/caches_without_configuration
+  : The number of caches that have no chronicle map configuration provided in `gerrit.config`
+    and fall back to defaults
diff --git a/src/main/resources/Documentation/migration.md b/src/main/resources/Documentation/migration.md
index 261f08f..f0aed00 100644
--- a/src/main/resources/Documentation/migration.md
+++ b/src/main/resources/Documentation/migration.md
@@ -103,4 +103,45 @@
 
 * size-multiplier=MULTIPLIER
 Multiplicative factor for the number of entries allowed in chronicle-map.
-*default:3*
\ No newline at end of file
+*default:3*
+
+
+### Reloading plugins with persistent caches backed by chroniclemap
+
+When chroniclemap store is initiated for a cache it locks exclusively the
+underlying file and keeps it until store is closed. Store is closed when Gerrit
+is stopped (for core caches) or when plugin is unloaded through either REST or
+SSH command. The later is problematic from the plugin `reload` command
+perspective as by default it unloads old version of plugin once new version is
+successfully loaded. Considering that old version holds the lock until it gets
+unloaded new version load will not succeed. As a result the following (or
+similar) error is visible in the log:
+
+```
+[2022-08-31T17:37:56.481+02:00] [SSH gerrit plugin reload test-cache-plugin (admin)] WARN  com.google.gerrit.server.plugins.PluginLoader : Cannot reload plugin test-cache-plugin
+com.google.inject.CreationException: Unable to create injector, see the following errors:
+
+1) [Guice/ErrorInCustomProvider]: ChronicleHashRecoveryFailedException: ChronicleFileLockException: Unable to acquire an exclusive file lock for gerrit/cache/test-cache-plugin.test_cache_0.dat. Make sure no other process is using the map.
+  at CacheModule.bindCache(CacheModule.java:188)
+      \_ installed by: Module -> TestCache$1
+  while locating Cache<String, String> annotated with @Named(value="test_cache")
+
+Learn more:
+  https://github.com/google/guice/wiki/ERROR_IN_CUSTOM_PROVIDER
+Caused by: ChronicleHashRecoveryFailedException: ChronicleFileLockException: Unable to acquire an exclusive file lock for gerrit/cache/test-cache-plugin.test_cache_0.dat. Make sure no other process is using the map.
+  at ChronicleMapBuilder.openWithExistingFile(ChronicleMapBuilder.java:1937)
+  at ChronicleMapBuilder.createWithFile(ChronicleMapBuilder.java:1706)
+  at ChronicleMapBuilder.recoverPersistedTo(ChronicleMapBuilder.java:1622)
+  ...
+```
+
+The following steps can be used in order to perform reload operation:
+
+1. perform the plugin `reload` in two steps by calling `remove` first and
+   following it with `add` command - the easiest way that doesn't require any
+   code modification
+
+2. add `Gerrit-ReloadMode: restart` to plugin's manifest so the when the plugin
+   `reload` command is called Gerrit unloads the old version prior loading the
+   new one - requires plugin's sources modification and build which might be
+   not an option in certain cases
\ No newline at end of file
diff --git a/src/main/resources/Documentation/tuning.md b/src/main/resources/Documentation/tuning.md
index 77078a4..7abb4d0 100644
--- a/src/main/resources/Documentation/tuning.md
+++ b/src/main/resources/Documentation/tuning.md
@@ -148,7 +148,7 @@
 * You can now run an the tuning command:
 
 ```bash
-ssh -p 29418 admin@<gerrit-server> cache-chroniclemap auto-adjust-caches [--dry-run] [cache-name]
+ssh -p 29418 admin@<gerrit-server> cache-chroniclemap auto-adjust-caches [--dry-run] [--adjust-caches-on-defaults] [cache-name]
 ```
 
 * You can also use the REST-API:
@@ -180,6 +180,18 @@
 to increase the size of a particular cache only you should be using this
 together with the `cache-name` parameter.
 
+* `--adjust-caches-on-defaults` or `-a` (SSH), `?adjust-caches-on-defaults` or `?a` (REST-API)
+  optional parameter
+
+The alternative to `cache-name` parameter that allows to tune all caches that are
+not configured in `gerrit.config` and thus are relying on defaults.
+Gerrit's admin can monitor `cache/chroniclemap/caches_without_configuration` metric
+(as described in
+[Detect caches that rely on defaults](#detect-caches-that-rely-on-defaults)
+section) to detect caches that rely on those defaults.
+Note that running `adjust-caches-on-defaults` in case when all caches are explicitly
+configured doesn't result in any tuning being performed.
+
 * `cache-name` (SSH), `?CACHE_NAME=cache-name` (REST-API) optional restriction of the caches
   to analyze and auto-tune. The parameter can be repeated multiple times for analyzing
   multiple caches. By default, analyze and adjust all persistent caches.
@@ -316,4 +328,19 @@
 4. restart gerrit-2
 
 Once you have tested gerrit-2 and you are happy with the results you can perform
-steps *1.* to *4.* for `gerrit-1`.
\ No newline at end of file
+steps *1.* to *4.* for `gerrit-1`.
+
+## Detect caches that rely on defaults
+
+Gerrit increments `cache/chroniclemap/caches_without_configuration`
+metric when persistent cache that has no configuration in `gerrit.config` IOW
+(relies on fallback defaults) is instantiated.
+
+*Notes:*
+* Additionally a warning is issued to the `error_log` but Gerrit's
+  core caches are initiated earlier than logging subsystem hence seeing them
+  requires starting the server in `run` mode.
+* One can use `auto-adjust-caches` command with `adjust-caches-on-defaults` option
+  (see the [auto-adjust-chronicle-map-caches](#auto-adjust-chronicle-map-caches)
+  section for details) in order to provide explicit defaults for caches in
+  question.
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesIT.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesIT.java
index 776ee45..5e4fe22 100644
--- a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesIT.java
+++ b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/AutoAdjustCachesIT.java
@@ -301,6 +301,23 @@
         .hasSize(1);
   }
 
+  @Test
+  public void shouldAdjustCachesOnDefaultsWhenSelected() throws Exception {
+    assertThat(
+            Joiner.on('\n').join(tunedFileNamesSet((n) -> n.contains(TEST_CACHE_FILENAME_TUNED))))
+        .isEmpty();
+
+    testCache.get(TEST_CACHE_KEY_100_CHARS);
+    String tuneResult = adminSshSession.exec(SSH_CMD + " --adjust-caches-on-defaults");
+    adminSshSession.assertSuccess();
+
+    assertThat(configResult(tuneResult, CONFIG_HEADER).getSubsections("cache"))
+        .contains(TEST_CACHE_NAME);
+    assertThat(
+            Joiner.on('\n').join(tunedFileNamesSet((n) -> n.contains(TEST_CACHE_FILENAME_TUNED))))
+        .isNotEmpty();
+  }
+
   private Config configResult(String result, @Nullable String configHeader)
       throws ConfigInvalidException {
     Config configResult = new Config();
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheKeysIndexTest.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheKeysIndexTest.java
new file mode 100644
index 0000000..c8a8b66
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/CacheKeysIndexTest.java
@@ -0,0 +1,150 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.modules.cache.chroniclemap;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.modules.cache.chroniclemap.CacheKeysIndex.tempIndexFile;
+import static java.util.stream.Collectors.toList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+
+import com.google.gerrit.metrics.DisabledMetricMaker;
+import com.google.gerrit.server.cache.serialize.StringCacheSerializer;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class CacheKeysIndexTest {
+  private static final String CACHE_NAME = "test-cache";
+  @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private CacheKeysIndex<String> index;
+  private File indexFile;
+
+  @Before
+  public void setup() throws IOException {
+    CacheSerializers.registerCacheKeySerializer(CACHE_NAME, StringCacheSerializer.INSTANCE);
+    indexFile = temporaryFolder.newFolder().toPath().resolve("cache.index").toFile();
+    index = new CacheKeysIndex<>(new DisabledMetricMaker(), CACHE_NAME, indexFile, false);
+  }
+
+  @Test
+  public void add_shouldUpdateElementPositionWhenAlreadyInSet() {
+    index.add("foo", 2L);
+    index.add("bar", 1L);
+    assertThat(keys(index)).containsExactly("foo", "bar");
+
+    index.add("foo", 1L);
+    assertThat(keys(index)).containsExactly("bar", "foo");
+  }
+
+  @Test
+  public void add_shouldUpdateElementInsertionTimeWhenNewerGetsAdded() {
+    index.add("foo", 1L);
+    index.add("foo", 2L);
+
+    CacheKeysIndex<String>.TimedKey key = index.keys().iterator().next();
+    assertThat(key.getKey()).isEqualTo("foo");
+    assertThat(key.getCreated()).isEqualTo(2L);
+  }
+
+  @Test
+  public void refresh_shouldRefreshInsertionTimeOnRefresh() {
+    index.add("foo", 1L);
+    index.refresh("foo");
+
+    CacheKeysIndex<String>.TimedKey key = index.keys().iterator().next();
+    assertThat(key.getKey()).isEqualTo("foo");
+    assertThat(key.getCreated()).isGreaterThan(1L);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void remove_shouldRemoveAndConsumeKeysOlderThan() {
+    long than = 5l;
+    index.add("newerThan", 10L);
+    index.add("olderThan", 2L);
+    Consumer<String> consumer = mock(Consumer.class);
+
+    index.removeAndConsumeKeysOlderThan(than, consumer);
+
+    verify(consumer).accept("olderThan");
+    assertThat(keys(index)).containsExactly("newerThan");
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void remove_shouldReturnFalseIfThereIsNoLruToRemove() {
+    Consumer<String> consumer = mock(Consumer.class);
+
+    boolean actual = index.removeAndConsumeLruKey(consumer);
+
+    assertThat(actual).isEqualTo(false);
+    verifyNoInteractions(consumer);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void remove_shouldRemoveAndConsumeLruKey() {
+    index.add("older", 1L);
+    index.add("newer", 1L);
+    Consumer<String> consumer = mock(Consumer.class);
+
+    boolean actual = index.removeAndConsumeLruKey(consumer);
+
+    assertThat(actual).isEqualTo(true);
+    verify(consumer).accept("older");
+    assertThat(keys(index)).containsExactly("newer");
+  }
+
+  @Test
+  public void persist_shouldPersistAndRestoreKeys() {
+    index.add("older", 1L);
+    index.add("newer", 1L);
+    assertThat(indexFile.exists()).isFalse();
+
+    index.persist();
+    assertThat(indexFile.isFile()).isTrue();
+    assertThat(indexFile.canRead()).isTrue();
+
+    index.clear();
+    assertThat(keys(index)).isEmpty();
+
+    index.restore(true);
+    assertThat(keys(index)).containsExactly("newer", "older");
+  }
+
+  @Test
+  public void restore_shouldDeleteExistingTemporaryIndexStorageFileDuringRestore()
+      throws IOException {
+    File indexTempFile = tempIndexFile(indexFile);
+    indexTempFile.createNewFile();
+    assertThat(indexTempFile.isFile()).isTrue();
+
+    index.restore(true);
+
+    assertThat(indexTempFile.exists()).isFalse();
+  }
+
+  private static List<String> keys(CacheKeysIndex<String> index) {
+    return index.keys().stream().map(CacheKeysIndex.TimedKey::getKey).collect(toList());
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfigDefaultsIT.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfigDefaultsIT.java
new file mode 100644
index 0000000..f3e909d
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfigDefaultsIT.java
@@ -0,0 +1,47 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.modules.cache.chroniclemap;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.AbstractDaemonTest;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.UseSsh;
+import com.google.gerrit.acceptance.config.GerritConfig;
+import com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults;
+import java.util.Set;
+import org.junit.Test;
+
+@UseLocalDisk
+@UseSsh
+public class ChronicleMapCacheConfigDefaultsIT extends AbstractDaemonTest {
+  @Override
+  public ChronicleMapCacheModule createModule() {
+    // CacheSerializers is accumulating cache names from different test executions in CI therefore
+    // it has to be cleared before this test
+    CacheSerializers.clear();
+    return new ChronicleMapCacheModule();
+  }
+
+  @Test
+  // the following caches are not persisted by default hence `diskLimit` needs to be set so that
+  // Gerrit persists them
+  @GerritConfig(name = "cache.change_notes.diskLimit", value = "1")
+  @GerritConfig(name = "cache.external_ids_map.diskLimit", value = "1")
+  public void shouldAllPersistentCachesHaveDefaultConfiguration() throws Exception {
+    Set<String> allCaches = CacheSerializers.getSerializersNames();
+    assertThat(Defaults.defaultMap.keySet()).containsExactlyElementsIn(allCaches);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfigTest.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfigTest.java
index 8cb28c1..d030732 100644
--- a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheConfigTest.java
@@ -14,13 +14,16 @@
 package com.googlesource.gerrit.modules.cache.chroniclemap;
 
 import static com.google.common.truth.Truth.assertThat;
-import static com.google.gerrit.testing.GerritJUnit.assertThrows;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.DEFAULT_AVG_KEY_SIZE;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.DEFAULT_AVG_VALUE_SIZE;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.DEFAULT_MAX_BLOAT_FACTOR;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.DEFAULT_MAX_ENTRIES;
 import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.DEFAULT_PERCENTAGE_FREE_SPACE_EVICTION_THRESHOLD;
-import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.DEFAULT_PERCENTAGE_HOT_KEYS;
+import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheConfig.Defaults.DEFAULT_PERSIST_INDEX_EVERY;
+import static com.googlesource.gerrit.modules.cache.chroniclemap.ChronicleMapCacheFactory.PRUNE_DELAY;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 
 import com.google.gerrit.server.config.SitePaths;
 import java.io.File;
@@ -33,7 +36,11 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class ChronicleMapCacheConfigTest {
 
   private final String cacheDirectory = ".";
@@ -47,6 +54,8 @@
   private SitePaths sitePaths;
   private StoredConfig gerritConfig;
 
+  @Mock CachesWithoutChronicleMapConfigMetric cachesWithoutConfigMetricMock;
+
   @Before
   public void setUp() throws Exception {
     sitePaths = new SitePaths(temporaryFolder.newFolder().toPath());
@@ -61,10 +70,10 @@
   }
 
   @Test
-  public void shouldProvidePersistedFile() throws Exception {
+  public void shouldProvideCacheFile() throws Exception {
     assertThat(
             configUnderTest(gerritConfig)
-                .getPersistedFile()
+                .getCacheFile()
                 .toPath()
                 .getParent()
                 .toRealPath()
@@ -73,6 +82,19 @@
   }
 
   @Test
+  public void shouldProvideIndexFileThatIsRelatedToCacheFile() {
+    ChronicleMapCacheConfig config = configUnderTest(gerritConfig);
+    File cacheFile = config.getCacheFile();
+    File indexFile = config.getIndexFile();
+
+    assertThat(indexFile.getParentFile()).isEqualTo(cacheFile.getParentFile());
+    String cacheFileName = cacheFile.getName();
+    assertThat(indexFile.getName())
+        .isEqualTo(
+            String.format("%s.index", cacheFileName.substring(0, cacheFileName.indexOf(".dat"))));
+  }
+
+  @Test
   public void shouldProvideConfiguredMaxEntriesWhenDefined() throws Exception {
     long maxEntries = 10;
     gerritConfig.setLong("cache", cacheKey, "maxEntries", maxEntries);
@@ -178,38 +200,48 @@
   }
 
   @Test
-  public void shouldProvidePercentageHotKeysDefault() throws Exception {
-    assertThat(configUnderTest(gerritConfig).getpercentageHotKeys())
-        .isEqualTo(DEFAULT_PERCENTAGE_HOT_KEYS);
+  public void shouldProvideDefaultIndexPersistEveryValuesWhenNotConfigured() {
+    ChronicleMapCacheConfig configUnderTest = configUnderTest(gerritConfig);
+    assertThat(configUnderTest.getPersistIndexEvery()).isEqualTo(DEFAULT_PERSIST_INDEX_EVERY);
+    assertThat(configUnderTest.getPersistIndexEveryNthPrune()).isEqualTo(30L);
   }
 
   @Test
-  public void shouldProvidePercentageHotKeysWhenConfigured() throws Exception {
-    int percentageHotKeys = 20;
-    gerritConfig.setInt("cache", cacheKey, "percentageHotKeys", percentageHotKeys);
-    gerritConfig.save();
-
-    assertThat(configUnderTest(gerritConfig).getpercentageHotKeys()).isEqualTo(percentageHotKeys);
+  public void shouldPersistIndexEveryBePruneDelayWhenPersistIndexEveryIsLowerThanPruneDelay() {
+    gerritConfig.setString(
+        "cache", null, "persistIndexEvery", String.format("%ds", PRUNE_DELAY - 1L));
+    ChronicleMapCacheConfig configUnderTest = configUnderTest(gerritConfig);
+    assertThat(configUnderTest.getPersistIndexEvery()).isEqualTo(Duration.ofSeconds(PRUNE_DELAY));
+    assertThat(configUnderTest.getPersistIndexEveryNthPrune()).isEqualTo(1L);
   }
 
   @Test
-  public void shouldThrowWhenPercentageHotKeysIs100() throws Exception {
-    gerritConfig.setInt("cache", cacheKey, "percentageHotKeys", 100);
-    gerritConfig.save();
-
-    assertThrows(IllegalArgumentException.class, () -> configUnderTest(gerritConfig));
+  public void shouldPersistIndexEveryBeRoundedDownToAMultiplyOfPruneDelay() {
+    gerritConfig.setString(
+        "cache", null, "persistIndexEvery", String.format("%ds", 2L * PRUNE_DELAY + 1L));
+    ChronicleMapCacheConfig configUnderTest = configUnderTest(gerritConfig);
+    assertThat(configUnderTest.getPersistIndexEvery())
+        .isEqualTo(Duration.ofSeconds(2L * PRUNE_DELAY));
+    assertThat(configUnderTest.getPersistIndexEveryNthPrune()).isEqualTo(2L);
   }
 
   @Test
-  public void shouldThrowWhenPercentageHotKeysIs0() throws Exception {
-    gerritConfig.setInt("cache", cacheKey, "percentageHotKeys", 0);
-    gerritConfig.save();
+  public void shouldIncrementCacheMetricWhenCacheHasNoDedicatedConfiguration() {
+    configUnderTest(gerritConfig);
 
-    assertThrows(IllegalArgumentException.class, () -> configUnderTest(gerritConfig));
+    verify(cachesWithoutConfigMetricMock, atLeastOnce()).incrementForCache(cacheKey);
+  }
+
+  @Test
+  public void shouldNotIncrementCacheMetricWhenCacheHasAtLeastSingleParameterConfigured() {
+    gerritConfig.setLong("cache", cacheKey, "maxEntries", 1L);
+    configUnderTest(gerritConfig);
+
+    verify(cachesWithoutConfigMetricMock, never()).incrementForCache(cacheName);
   }
 
   private ChronicleMapCacheConfig configUnderTest(StoredConfig gerritConfig) {
-    File persistentFile =
+    File cacheFile =
         ChronicleMapCacheFactory.fileName(
             sitePaths.site_path.resolve(cacheDirectory), cacheName, version);
     sitePaths
@@ -218,6 +250,11 @@
         .toFile();
 
     return new ChronicleMapCacheConfig(
-        gerritConfig, cacheKey, persistentFile, expireAfterWrite, refreshAfterWrite);
+        gerritConfig,
+        cachesWithoutConfigMetricMock,
+        cacheKey,
+        cacheFile,
+        expireAfterWrite,
+        refreshAfterWrite);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheIT.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheIT.java
index 444762b..389fbb1 100644
--- a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheIT.java
+++ b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheIT.java
@@ -17,7 +17,6 @@
 import static com.google.common.truth.Truth8.assertThat;
 
 import com.google.common.cache.Cache;
-import com.google.common.truth.Truth8;
 import com.google.gerrit.acceptance.AbstractDaemonTest;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.entities.Project;
@@ -68,7 +67,7 @@
     String newProjectName = name("newProject");
     adminRestSession.put("/projects/" + newProjectName).assertCreated();
 
-    Truth8.assertThat(projectCache.get(Project.nameKey(newProjectName))).isPresent();
+    assertThat(projectCache.get(Project.nameKey(newProjectName))).isPresent();
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheTest.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheTest.java
index 0041c53..b943c07 100644
--- a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheTest.java
+++ b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/ChronicleMapCacheTest.java
@@ -16,10 +16,12 @@
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.common.truth.Truth.assertWithMessage;
 import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static org.mockito.Mockito.mock;
 
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 import com.google.gerrit.acceptance.AbstractDaemonTest;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.WaitUtil;
@@ -306,7 +308,6 @@
     final String fooValue = "foo";
 
     gerritConfig.setInt("cache", testCacheName, "maxEntries", 2);
-    gerritConfig.setInt("cache", testCacheName, "percentageHotKeys", 10);
     gerritConfig.setInt("cache", testCacheName, "avgKeySize", "foo1".getBytes().length);
     gerritConfig.setInt("cache", testCacheName, "avgValueSize", valueSize(fooValue));
     gerritConfig.save();
@@ -326,7 +327,6 @@
       throws Exception {
     final String fooValue = "foo";
     gerritConfig.setInt("cache", testCacheName, "maxEntries", 2);
-    gerritConfig.setInt("cache", testCacheName, "percentageHotKeys", 10);
     gerritConfig.setInt("cache", testCacheName, "avgKeySize", "foo1".getBytes().length);
     gerritConfig.setInt("cache", testCacheName, "avgValueSize", valueSize(fooValue));
     gerritConfig.save();
@@ -347,7 +347,6 @@
   public void shouldEvictEntriesUntilFreeSpaceIsRecovered() throws Exception {
     final int uuidSize = valueSize(UUID.randomUUID().toString());
     gerritConfig.setInt("cache", "foo", "maxEntries", 50);
-    gerritConfig.setInt("cache", "foo", "percentageHotKeys", 10);
     gerritConfig.setInt("cache", "foo", "avgKeySize", uuidSize);
     gerritConfig.setInt("cache", "foo", "avgValueSize", uuidSize);
     gerritConfig.save();
@@ -458,104 +457,109 @@
   }
 
   @Test
-  public void shouldTriggerHotKeysCapacityCacheMetric() throws Exception {
+  public void shouldTriggerKeysIndexSizeCacheMetric() throws Exception {
     String cachedValue = UUID.randomUUID().toString();
-    int percentageHotKeys = 60;
     int maxEntries = 10;
-    int expectedCapacity = 6;
-    String hotKeysCapacityMetricName = "cache/chroniclemap/hot_keys_capacity_" + testCacheName;
-    gerritConfig.setInt("cache", testCacheName, "maxEntries", maxEntries);
-    gerritConfig.setInt("cache", testCacheName, "percentageHotKeys", percentageHotKeys);
-    gerritConfig.save();
-
-    newCacheWithMetrics(testCacheName, cachedValue);
-
-    assertThat(getMetric(hotKeysCapacityMetricName).getValue()).isEqualTo(expectedCapacity);
-  }
-
-  @Test
-  public void shouldTriggerHotKeysSizeCacheMetric() throws Exception {
-    String cachedValue = UUID.randomUUID().toString();
-    int percentageHotKeys = 30;
-    int maxEntries = 10;
-    int maxHotKeyCapacity = 3;
+    int expectedKeysSize = 3;
     final Duration METRIC_TRIGGER_TIMEOUT = Duration.ofSeconds(2);
-    String hotKeysSizeMetricName = "cache/chroniclemap/hot_keys_size_" + testCacheName;
+    String keysIndexSizeMetricName = "cache/chroniclemap/keys_index_size_" + testCacheName;
     gerritConfig.setInt("cache", testCacheName, "maxEntries", maxEntries);
-    gerritConfig.setInt("cache", testCacheName, "percentageHotKeys", percentageHotKeys);
     gerritConfig.save();
 
     ChronicleMapCacheImpl<String, String> cache = newCacheWithMetrics(testCacheName, cachedValue);
 
-    assertThat(getMetric(hotKeysSizeMetricName).getValue()).isEqualTo(0);
+    assertThat(getMetric(keysIndexSizeMetricName).getValue()).isEqualTo(0);
 
-    for (int i = 0; i < maxHotKeyCapacity; i++) {
+    for (int i = 0; i < expectedKeysSize; i++) {
       cache.put(cachedValue + i, cachedValue);
     }
 
     WaitUtil.waitUntil(
-        () -> (int) getMetric(hotKeysSizeMetricName).getValue() == maxHotKeyCapacity,
+        () -> (int) getMetric(keysIndexSizeMetricName).getValue() == expectedKeysSize,
         METRIC_TRIGGER_TIMEOUT);
-
-    cache.put(cachedValue + maxHotKeyCapacity + 1, cachedValue);
-
-    assertThrows(
-        InterruptedException.class,
-        () ->
-            WaitUtil.waitUntil(
-                () -> (int) getMetric(hotKeysSizeMetricName).getValue() > maxHotKeyCapacity,
-                METRIC_TRIGGER_TIMEOUT));
   }
 
   @Test
-  public void shouldResetHotKeysWhenInvalidateAll() throws Exception {
-    String cachedValue = UUID.randomUUID().toString();
-    int percentageHotKeys = 30;
+  public void shouldTriggerKeysIndexAddLatencyCacheMetric() throws Exception {
     int maxEntries = 10;
-    int maxHotKeyCapacity = 3;
     final Duration METRIC_TRIGGER_TIMEOUT = Duration.ofSeconds(2);
-    String hotKeysSizeMetricName = "cache/chroniclemap/hot_keys_size_" + testCacheName;
+    String keysIndexAddLatencyMetricName =
+        "cache/chroniclemap/keys_index_add_latency_" + testCacheName;
     gerritConfig.setInt("cache", testCacheName, "maxEntries", maxEntries);
-    gerritConfig.setInt("cache", testCacheName, "percentageHotKeys", percentageHotKeys);
+    gerritConfig.save();
+
+    ChronicleMapCacheImpl<String, String> cache = newCacheWithMetrics(testCacheName, null);
+    assertThat(getTimer(keysIndexAddLatencyMetricName).getCount()).isEqualTo(0L);
+
+    String cachedValue = UUID.randomUUID().toString();
+    cache.put(cachedValue, cachedValue);
+
+    WaitUtil.waitUntil(
+        () -> getTimer(keysIndexAddLatencyMetricName).getCount() == 1L, METRIC_TRIGGER_TIMEOUT);
+  }
+
+  @Test
+  public void shouldResetKeysIndexWhenInvalidateAll() throws Exception {
+    String cachedValue = UUID.randomUUID().toString();
+    int maxEntries = 10;
+    int expectedKeysSize = 3;
+    final Duration METRIC_TRIGGER_TIMEOUT = Duration.ofSeconds(2);
+    String keysIndexSizeMetricName = "cache/chroniclemap/keys_index_size_" + testCacheName;
+    gerritConfig.setInt("cache", testCacheName, "maxEntries", maxEntries);
     gerritConfig.save();
 
     ChronicleMapCacheImpl<String, String> cache = newCacheWithMetrics(testCacheName, cachedValue);
 
-    for (int i = 0; i < maxHotKeyCapacity; i++) {
+    for (int i = 0; i < expectedKeysSize; i++) {
       cache.put(cachedValue + i, cachedValue);
     }
 
     WaitUtil.waitUntil(
-        () -> (int) getMetric(hotKeysSizeMetricName).getValue() == maxHotKeyCapacity,
+        () -> (int) getMetric(keysIndexSizeMetricName).getValue() == expectedKeysSize,
         METRIC_TRIGGER_TIMEOUT);
-
     cache.invalidateAll();
-
     WaitUtil.waitUntil(
-        () -> (int) getMetric(hotKeysSizeMetricName).getValue() == 0, METRIC_TRIGGER_TIMEOUT);
+        () -> (int) getMetric(keysIndexSizeMetricName).getValue() == 0, METRIC_TRIGGER_TIMEOUT);
   }
 
   @Test
   public void shouldSanitizeUnwantedCharsInMetricNames() throws Exception {
     String cacheName = "very+confusing.cache#name";
     String sanitized = "very_confusing_cache_name";
-    String hotKeySizeMetricName = "cache/chroniclemap/hot_keys_size_" + sanitized;
     String percentageFreeMetricName = "cache/chroniclemap/percentage_free_space_" + sanitized;
     String autoResizeMetricName = "cache/chroniclemap/remaining_autoresizes_" + sanitized;
     String maxAutoResizeMetricName = "cache/chroniclemap/max_autoresizes_" + sanitized;
-    String hotKeyCapacityMetricName = "cache/chroniclemap/hot_keys_capacity_" + sanitized;
+    String keysIndexSizeMetricName = "cache/chroniclemap/keys_index_size_" + sanitized;
+    String keysIndexAddLatencyMetricName = "cache/chroniclemap/keys_index_add_latency_" + sanitized;
+    String keysIndexRemoveOlderThanLatencyMetricName =
+        "cache/chroniclemap/keys_index_remove_and_consume_older_than_latency_" + sanitized;
+    String keysIndexRemoveLruLatencyMetricName =
+        "cache/chroniclemap/keys_index_remove_lru_key_latency_" + sanitized;
+    String keysIndexRestoreName = "cache/chroniclemap/keys_index_restore_latency_" + sanitized;
+    String keysIndexPersistName = "cache/chroniclemap/keys_index_persist_latency_" + sanitized;
+    String keysIndexRestoreFailuresName =
+        "cache/chroniclemap/keys_index_restore_failures_" + sanitized;
+    String keysIndexPersistFailuresName =
+        "cache/chroniclemap/keys_index_persist_failures_" + sanitized;
 
     newCacheWithMetrics(cacheName, null);
 
-    getMetric(hotKeySizeMetricName);
     getMetric(percentageFreeMetricName);
     getMetric(autoResizeMetricName);
     getMetric(maxAutoResizeMetricName);
-    getMetric(hotKeyCapacityMetricName);
+    getMetric(keysIndexSizeMetricName);
+    getTimer(keysIndexAddLatencyMetricName);
+    getTimer(keysIndexRemoveOlderThanLatencyMetricName);
+    getTimer(keysIndexRemoveLruLatencyMetricName);
+    getTimer(keysIndexRestoreName);
+    getTimer(keysIndexPersistName);
+    getCounter(keysIndexRestoreFailuresName);
+    getCounter(keysIndexPersistFailuresName);
   }
 
   private int valueSize(String value) {
-    final TimedValueMarshaller<String> marshaller = new TimedValueMarshaller<>(testCacheName);
+    final TimedValueMarshaller<String> marshaller =
+        new TimedValueMarshaller<>(metricMaker, testCacheName);
 
     Bytes<ByteBuffer> out = Bytes.elasticByteBuffer();
     marshaller.write(out, new TimedValue<>(value));
@@ -608,6 +612,7 @@
     ChronicleMapCacheConfig config =
         new ChronicleMapCacheConfig(
             gerritConfig,
+            mock(CachesWithoutChronicleMapConfigMetric.class),
             cacheDef.configKey(),
             persistentFile,
             expireAfterWrite != null ? expireAfterWrite : Duration.ZERO,
@@ -654,4 +659,10 @@
     assertWithMessage(name).that(counter).isNotNull();
     return counter;
   }
+
+  private Timer getTimer(String name) {
+    Timer timer = (Timer) metricRegistry.getMetrics().get(name);
+    assertWithMessage(name).that(timer).isNotNull();
+    return timer;
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/InMemoryLRUTest.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/InMemoryLRUTest.java
deleted file mode 100644
index 3d435c9..0000000
--- a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/InMemoryLRUTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-// Copyright (C) 2020 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.googlesource.gerrit.modules.cache.chroniclemap;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import org.junit.Test;
-
-public class InMemoryLRUTest {
-
-  @Test
-  public void add_shouldUpdateElementPositionWhenAlreadyInSet() {
-    final InMemoryLRU<Object> map = new InMemoryLRU<>(2);
-
-    map.add("A");
-    map.add("B");
-
-    assertThat(map.toArray()).asList().containsExactly("A", "B");
-
-    map.add("A");
-    assertThat(map.toArray()).asList().containsExactly("B", "A");
-  }
-
-  @Test
-  public void add_shouldEvictLRUElement() {
-    final InMemoryLRU<Object> map = new InMemoryLRU<>(2);
-
-    map.add("A");
-    map.add("B");
-    map.add("C");
-
-    assertThat(map.toArray()).asList().containsExactly("B", "C");
-  }
-
-  @Test
-  public void remove_unexistingEntryShouldReturnNull() {
-    InMemoryLRU<Object> map = new InMemoryLRU<>(1);
-
-    assertThat(map.remove("foo")).isNull();
-  }
-
-  @Test
-  public void remove_unexistingEntryShouldReturnTrue() {
-    InMemoryLRU<Object> map = new InMemoryLRU<>(1);
-
-    map.add("foo");
-
-    assertThat(map.remove("foo")).isTrue();
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshallerTest.java b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshallerTest.java
index b630e36..b4eb4ca 100644
--- a/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshallerTest.java
+++ b/src/test/java/com/googlesource/gerrit/modules/cache/chroniclemap/TimedValueMarshallerTest.java
@@ -15,6 +15,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 
+import com.google.gerrit.acceptance.TestMetricMaker;
 import com.google.gerrit.server.cache.serialize.ObjectIdCacheSerializer;
 import java.nio.ByteBuffer;
 import net.openhft.chronicle.bytes.Bytes;
@@ -35,7 +36,8 @@
   public void shouldSerializeAndDeserializeBack() {
     ObjectId id = ObjectId.fromString("1234567890123456789012345678901234567890");
     long timestamp = 1600329018L;
-    TimedValueMarshaller<ObjectId> marshaller = new TimedValueMarshaller<>(TEST_CACHE_NAME);
+    TimedValueMarshaller<ObjectId> marshaller =
+        new TimedValueMarshaller<>(new TestMetricMaker(), TEST_CACHE_NAME);
 
     final TimedValue<ObjectId> wrapped = new TimedValue<>(id, timestamp);
 
