Fix threading issue in Diff cache

We have observed occasional failures in computing diffs. The
computation fails with a Zlib exception making it look like
the pack file is currupted. Fsck, however, tells that the
pack file is fine.

Upon closer investigation, it seems like there is a case
in the diff cache were we process on a different thread
and cancel in case of a timeout. The objects we pass
around (ObjectReader and DiffFormatter) are not thread safe.

The JavaDoc of ObjectReader states that it is not thread
safe and the Dfs implementation restates that. Checking
the implementation also shows that it's not thread safe
since it uses member variables to store intermediary
results. Other implementations might be thread safe, but
if the abstract class doesn't guarantee that, we can't
rely on it.

In Java, a timeout of a future does not guarantee to
have actually terminated the work item when it occurs. So
there could be a situation where the work item has timed out,
but we are still processing anyway. The main thread then
continues to process a different diff in a different region
of the pack. The reused ObjectReader then gets confused as
it races to read different parts of the pack.

This is an attempt to fix the problem by using a pool to
share DiffFormatter instances. DiffFormatters that are closed
will be returned to the pool. If the pool itself is already
closed, the a close call to the DiffFormatter's wrapper
(Handle) will propagate to the DiffFormatter and consequently
the ObjectReader). Closing the pool will close any resources
currently in the pool.

We run into this problem, because the diff cache uses getAll
which sequentially loads many diffs for a single request.

Google-Bug-Id: b/209491483
Release-Notes: Fix threading issue in diff cache
Change-Id: I51bd1a73f573feab0eb58f825f28024852c17e47
(cherry picked from commit 4f5f8c7e80788b168a3a5d74568198d894019de4)
diff --git a/java/com/google/gerrit/server/patch/gitfilediff/GitFileDiffCacheImpl.java b/java/com/google/gerrit/server/patch/gitfilediff/GitFileDiffCacheImpl.java
index 6a59872..44af22e 100644
--- a/java/com/google/gerrit/server/patch/gitfilediff/GitFileDiffCacheImpl.java
+++ b/java/com/google/gerrit/server/patch/gitfilediff/GitFileDiffCacheImpl.java
@@ -43,6 +43,7 @@
 import com.google.gerrit.server.logging.TraceContext.TraceTimer;
 import com.google.gerrit.server.patch.DiffExecutor;
 import com.google.gerrit.server.patch.DiffNotAvailableException;
+import com.google.gerrit.server.util.git.CloseablePool;
 import com.google.inject.Inject;
 import com.google.inject.Module;
 import com.google.inject.Singleton;
@@ -69,7 +70,6 @@
 import org.eclipse.jgit.lib.AbbreviatedObjectId;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.lib.ObjectId;
-import org.eclipse.jgit.lib.ObjectReader;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.patch.FileHeader;
 import org.eclipse.jgit.util.io.DisabledOutputStream;
@@ -218,8 +218,7 @@
                 .collect(Collectors.groupingBy(GitFileDiffCacheKey::project));
 
         for (Map.Entry<Project.NameKey, List<GitFileDiffCacheKey>> entry : byProject.entrySet()) {
-          try (Repository repo = repoManager.openRepository(entry.getKey());
-              ObjectReader reader = repo.newObjectReader()) {
+          try (Repository repo = repoManager.openRepository(entry.getKey())) {
 
             // Grouping keys by diff options because each group of keys will be processed with a
             // separate call to JGit using the DiffFormatter object.
@@ -228,7 +227,7 @@
 
             for (Map.Entry<DiffOptions, List<GitFileDiffCacheKey>> group :
                 optionsGroups.entrySet()) {
-              result.putAll(loadAllImpl(repo, reader, group.getKey(), group.getValue()));
+              result.putAll(loadAllImpl(repo, group.getKey(), group.getValue()));
             }
           }
         }
@@ -243,42 +242,46 @@
      * @return The git file diffs for all input keys.
      */
     private Map<GitFileDiffCacheKey, GitFileDiff> loadAllImpl(
-        Repository repo, ObjectReader reader, DiffOptions options, List<GitFileDiffCacheKey> keys)
+        Repository repo, DiffOptions options, List<GitFileDiffCacheKey> keys)
         throws IOException, DiffNotAvailableException {
       ImmutableMap.Builder<GitFileDiffCacheKey, GitFileDiff> result =
           ImmutableMap.builderWithExpectedSize(keys.size());
       Map<GitFileDiffCacheKey, String> filePaths =
           keys.stream().collect(Collectors.toMap(identity(), GitFileDiffCacheKey::newFilePath));
-      DiffFormatter formatter = createDiffFormatter(options, repo, reader);
-      ListMultimap<String, DiffEntry> diffEntries =
-          loadDiffEntries(formatter, options, filePaths.values());
-      for (GitFileDiffCacheKey key : filePaths.keySet()) {
-        String newFilePath = filePaths.get(key);
-        if (!diffEntries.containsKey(newFilePath)) {
-          result.put(
-              key,
-              GitFileDiff.empty(
-                  AbbreviatedObjectId.fromObjectId(key.oldTree()),
-                  AbbreviatedObjectId.fromObjectId(key.newTree()),
-                  newFilePath));
-          continue;
+      try (CloseablePool<DiffFormatter> diffPool =
+          new CloseablePool<>(() -> createDiffFormatter(options, repo))) {
+        ListMultimap<String, DiffEntry> diffEntries;
+        try (CloseablePool<DiffFormatter>.Handle formatter = diffPool.get()) {
+          diffEntries = loadDiffEntries(formatter.get(), options, filePaths.values());
         }
-        List<DiffEntry> entries = diffEntries.get(newFilePath);
-        if (entries.size() == 1) {
-          result.put(key, createGitFileDiff(entries.get(0), formatter, key));
-        } else {
-          // Handle when JGit returns two {Added, Deleted} entries for the same file. This happens,
-          // for example, when a file's mode is changed between patchsets (e.g. converting a
-          // symlink to a regular file). We combine both diff entries into a single entry with
-          // {changeType = Rewrite}.
-          List<GitFileDiff> gitDiffs = new ArrayList<>();
-          for (DiffEntry entry : diffEntries.get(newFilePath)) {
-            gitDiffs.add(createGitFileDiff(entry, formatter, key));
+        for (GitFileDiffCacheKey key : filePaths.keySet()) {
+          String newFilePath = filePaths.get(key);
+          if (!diffEntries.containsKey(newFilePath)) {
+            result.put(
+                key,
+                GitFileDiff.empty(
+                    AbbreviatedObjectId.fromObjectId(key.oldTree()),
+                    AbbreviatedObjectId.fromObjectId(key.newTree()),
+                    newFilePath));
+            continue;
           }
-          result.put(key, createRewriteEntry(gitDiffs));
+          List<DiffEntry> entries = diffEntries.get(newFilePath);
+          if (entries.size() == 1) {
+            result.put(key, createGitFileDiff(entries.get(0), key, diffPool));
+          } else {
+            // Handle when JGit returns two {Added, Deleted} entries for the same file. This
+            // happens, for example, when a file's mode is changed between patchsets (e.g.
+            // converting a symlink to a regular file). We combine both diff entries into a single
+            // entry with {changeType = Rewrite}.
+            List<GitFileDiff> gitDiffs = new ArrayList<>();
+            for (DiffEntry entry : diffEntries.get(newFilePath)) {
+              gitDiffs.add(createGitFileDiff(entry, key, diffPool));
+            }
+            result.put(key, createRewriteEntry(gitDiffs));
+          }
         }
+        return result.build();
       }
-      return result.build();
     }
 
     private static ListMultimap<String, DiffEntry> loadDiffEntries(
@@ -299,10 +302,9 @@
                   MultimapBuilder.treeKeys().arrayListValues()::build));
     }
 
-    private static DiffFormatter createDiffFormatter(
-        DiffOptions diffOptions, Repository repo, ObjectReader reader) {
+    private static DiffFormatter createDiffFormatter(DiffOptions diffOptions, Repository repo) {
       try (DiffFormatter diffFormatter = new DiffFormatter(DisabledOutputStream.INSTANCE)) {
-        diffFormatter.setReader(reader, repo.getConfig());
+        diffFormatter.setRepository(repo);
         RawTextComparator cmp = comparatorFor(diffOptions.whitespace());
         diffFormatter.setDiffComparator(cmp);
         if (diffOptions.renameScore() != -1) {
@@ -345,25 +347,31 @@
      *       timeout enforcement.
      */
     private GitFileDiff createGitFileDiff(
-        DiffEntry diffEntry, DiffFormatter formatter, GitFileDiffCacheKey key) throws IOException {
+        DiffEntry diffEntry, GitFileDiffCacheKey key, CloseablePool<DiffFormatter> diffPool)
+        throws IOException {
       if (!key.useTimeout()) {
-        FileHeader fileHeader = formatter.toFileHeader(diffEntry);
-        return GitFileDiff.create(diffEntry, fileHeader);
+        try (CloseablePool<DiffFormatter>.Handle formatter = diffPool.get()) {
+          FileHeader fileHeader = formatter.get().toFileHeader(diffEntry);
+          return GitFileDiff.create(diffEntry, fileHeader);
+        }
       }
-      Future<FileHeader> fileHeaderFuture =
+      // This submits the DiffFormatter to a different thread. The CloseablePool and our usage of it
+      // ensures that any DiffFormatter instance and the ObjectReader it references internally is
+      // only used by a single thread concurrently. However, ObjectReaders have a reference to
+      // Repository which might not be thread safe (FileRepository is, DfsRepository might not).
+      // This could lead to a race condition.
+      Future<GitFileDiff> fileDiffFuture =
           diffExecutor.submit(
               () -> {
-                synchronized (diffEntry) {
-                  return formatter.toFileHeader(diffEntry);
+                try (CloseablePool<DiffFormatter>.Handle formatter = diffPool.get()) {
+                  return GitFileDiff.create(diffEntry, formatter.get().toFileHeader(diffEntry));
                 }
               });
       try {
         // We employ the timeout because of a bug in Myers diff in JGit. See
         // bugs.chromium.org/p/gerrit/issues/detail?id=487 for more details. The bug may happen
         // if the algorithm used in diffs is HISTOGRAM_WITH_FALLBACK_MYERS.
-        fileHeaderFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
-        FileHeader fileHeader = formatter.toFileHeader(diffEntry);
-        return GitFileDiff.create(diffEntry, fileHeader);
+        return fileDiffFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
       } catch (InterruptedException | TimeoutException e) {
         // If timeout happens, create a negative result
         metrics.timeouts.increment();
diff --git a/java/com/google/gerrit/server/util/git/BUILD b/java/com/google/gerrit/server/util/git/BUILD
index 4f4ba83..bbc6bf0 100644
--- a/java/com/google/gerrit/server/util/git/BUILD
+++ b/java/com/google/gerrit/server/util/git/BUILD
@@ -7,5 +7,6 @@
     deps = [
         "//java/com/google/gerrit/entities",
         "//lib:jgit",
+        "//lib/flogger:api",
     ],
 )
diff --git a/java/com/google/gerrit/server/util/git/CloseablePool.java b/java/com/google/gerrit/server/util/git/CloseablePool.java
new file mode 100644
index 0000000..442bd09
--- /dev/null
+++ b/java/com/google/gerrit/server/util/git/CloseablePool.java
@@ -0,0 +1,116 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.gerrit.server.util.git;
+
+import com.google.common.flogger.FluentLogger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Pool to manage resources that need to be closed but to whom we might lose the reference to or
+ * where closing resources individually is not always possible.
+ *
+ * <p>This pool can be used when we want to reuse closable resources in a multithreaded context.
+ * Example:
+ *
+ * <pre>{@code
+ * try (CloseablePool<T> pool = new CloseablePool(() -> new T())) {
+ *   for (int i = 0; i < 100; i++) {
+ *     executor.submit(() -> {
+ *       try (CloseablePool<T>.Handle handle = pool.get()) {
+ *         // Do work that might potentially take longer than the timeout.
+ *         handle.get(); // pooled instance to be used
+ *       }
+ *     }).get(1000, MILLISECONDS);
+ *   }
+ * }
+ * }</pre>
+ */
+public class CloseablePool<T extends AutoCloseable> implements AutoCloseable {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final Supplier<T> tCreator;
+  private List<T> ts;
+
+  /**
+   * Instantiate a new pool. The {@link Supplier} must be capable of creating a new instance on
+   * every call.
+   */
+  public CloseablePool(Supplier<T> tCreator) {
+    this.ts = new ArrayList<>();
+    this.tCreator = tCreator;
+  }
+
+  /**
+   * Get a shared instance or create a new instance. Close the returned handle to return it to the
+   * pool.
+   */
+  public synchronized Handle get() {
+    if (ts.isEmpty()) {
+      return new Handle(tCreator.get());
+    }
+    return new Handle(ts.remove(ts.size() - 1));
+  }
+
+  private synchronized boolean discard(T t) {
+    if (ts != null) {
+      ts.add(t);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized void close() {
+    for (T t : ts)
+      try {
+        t.close();
+      } catch (Exception e) {
+        logger.atWarning().withCause(e).log(
+            "Failed to close resource %s in CloseablePool %s", t, this);
+      }
+    ts = null;
+  }
+
+  /**
+   * Wrapper around an {@link AutoCloseable}. Will try to return the resource to the pool and close
+   * it in case the pool was already closed.
+   */
+  public class Handle implements AutoCloseable {
+    private final T t;
+
+    private Handle(T t) {
+      this.t = t;
+    }
+
+    /** Returns the managed instance. */
+    public T get() {
+      return t;
+    }
+
+    @Override
+    public void close() {
+      if (!discard(t)) {
+        try {
+          t.close();
+        } catch (Exception e) {
+          logger.atWarning().withCause(e).log(
+              "Failed to close resource %s in CloseablePool %s", this, CloseablePool.this);
+        }
+      }
+    }
+  }
+}