Perform change update on multiple threads

When multiple changes need to be created or updated for a single push
operation they are now inserted into the database by parallel threads,
up to the maximum allowed thread count. The current thread is used
when the thread pool is already fully in use, falling back to the
prior behavior where each concurrent push operation can do its own
concurrent database update. The thread pool exists to reduce latency
so long as there are sufficient threads available.

This helps push times on databases that are high latency, such as
database servers that are running on a different machine from the
Gerrit server itself, e.g. gerrit.googlesource.com.

The new thread pool is disabled by default, limiting the overhead to
servers that have good latency with their database, such as using
in-process H2 database, or a MySQL or PostgreSQL on the same host.

Change-Id: I7d7368cee99a47e3f2ad1e753cc3f7e1c82d37b0
diff --git a/Documentation/config-gerrit.txt b/Documentation/config-gerrit.txt
index 0f234f7..5606768 100644
--- a/Documentation/config-gerrit.txt
+++ b/Documentation/config-gerrit.txt
@@ -1796,6 +1796,18 @@
 +
 Defaults to the number of available CPUs according to the Java runtime.
 
+[[receive.changeUpdateThreads]]receive.changeUpdateThreads::
++
+Number of threads to perform change creation or patch set updates
+concurrently. Each thread uses its own database connection from
+the database connection pool, and if all threads are busy then
+main receive thread will also perform a change creation or patch
+set update.
++
+Defaults to 1, using only the main receive thread. This feature is for
+databases with very high latency that can benfit from concurrent
+operations when multiple changes are impacted at once.
+
 [[receive.timeout]]receive.timeout::
 +
 Overall timeout on the time taken to process the change data in
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/ApprovalsUtil.java b/gerrit-server/src/main/java/com/google/gerrit/server/ApprovalsUtil.java
index a295c49..d7a487d 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/ApprovalsUtil.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/ApprovalsUtil.java
@@ -29,7 +29,6 @@
 import com.google.gwtorm.server.OrmException;
 import com.google.inject.Inject;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -74,16 +73,29 @@
    *
    * @param change Change to update
    * @throws OrmException
-   * @throws IOException
    * @return List<PatchSetApproval> The previous approvals
    */
   public List<PatchSetApproval> copyVetosToLatestPatchSet(Change change)
-      throws OrmException, IOException {
+      throws OrmException {
+    return copyVetosToLatestPatchSet(db, change);
+  }
+
+  /**
+   * Moves the PatchSetApprovals to the last PatchSet on the change while
+   * keeping the vetos.
+   *
+   * @param db database connection to use for updates.
+   * @param change Change to update
+   * @throws OrmException
+   * @return List<PatchSetApproval> The previous approvals
+   */
+  public List<PatchSetApproval> copyVetosToLatestPatchSet(ReviewDb db,
+      Change change) throws OrmException {
     PatchSet.Id source;
     if (change.getNumberOfPatchSets() > 1) {
       source = new PatchSet.Id(change.getId(), change.getNumberOfPatchSets() - 1);
     } else {
-      throw new IOException("Previous patch set could not be found");
+      throw new OrmException("Previous patch set could not be found");
     }
 
     PatchSet.Id dest = change.currPatchSetId();
@@ -103,18 +115,9 @@
     return patchSetApprovals;
   }
 
-
-  /** Attach reviewers to a change. */
-  public void addReviewers(Change change, PatchSet ps, PatchSetInfo info,
-      Set<Account.Id> wantReviewers) throws OrmException {
-    Set<Id> existing = Sets.<Account.Id> newHashSet();
-    addReviewers(change, ps, info, wantReviewers, existing);
-  }
-
-  /** Attach reviewers to a change. */
-  public void addReviewers(Change change, PatchSet ps, PatchSetInfo info,
-      Set<Account.Id> wantReviewers, Set<Account.Id> existingReviewers)
-      throws OrmException {
+  public void addReviewers(ReviewDb db, Change change, PatchSet ps,
+      PatchSetInfo info, Set<Id> wantReviewers,
+      Set<Account.Id> existingReviewers) throws OrmException {
     List<ApprovalType> allTypes = approvalTypes.getApprovalTypes();
     if (allTypes.isEmpty()) {
       return;
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ChangeUpdateExecutor.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ChangeUpdateExecutor.java
new file mode 100644
index 0000000..3452bb0
--- /dev/null
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/ChangeUpdateExecutor.java
@@ -0,0 +1,31 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.gerrit.server.git;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.Retention;
+
+/**
+ * Marker on the global {@link ListeningExecutorService} used by
+ * {@link ReceiveCommits} to create or replace changes.
+ */
+@Retention(RUNTIME)
+@BindingAnnotation
+public @interface ChangeUpdateExecutor {
+}
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java
index b2742d5..8316f7b 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java
@@ -21,6 +21,7 @@
 import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_NONFASTFORWARD;
 import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_OTHER_REASON;
 
+import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.LinkedListMultimap;
@@ -28,6 +29,9 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.gerrit.common.ChangeHooks;
 import com.google.gerrit.common.PageLinks;
 import com.google.gerrit.common.data.Capable;
@@ -67,6 +71,7 @@
 import com.google.gwtorm.server.AtomicUpdate;
 import com.google.gwtorm.server.OrmException;
 import com.google.gwtorm.server.ResultSet;
+import com.google.gwtorm.server.SchemaFactory;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
@@ -106,6 +111,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -208,11 +214,23 @@
     }
   }
 
+  private static final Function<Exception, OrmException> ORM_EXCEPTION =
+      new Function<Exception, OrmException>() {
+        @Override
+        public OrmException apply(Exception input) {
+          if (input instanceof OrmException) {
+            return (OrmException) input;
+          }
+          return new OrmException("Error updating database", input);
+        }
+      };
+
   private final Set<Account.Id> reviewerId = new HashSet<Account.Id>();
   private final Set<Account.Id> ccId = new HashSet<Account.Id>();
 
   private final IdentifiedUser currentUser;
   private final ReviewDb db;
+  private final SchemaFactory<ReviewDb> schemaFactory;
   private final AccountResolver accountResolver;
   private final CreateChangeSender.Factory createChangeSenderFactory;
   private final MergedSender.Factory mergedSenderFactory;
@@ -228,6 +246,7 @@
   private final TrackingFooters trackingFooters;
   private final TagCache tagCache;
   private final WorkQueue workQueue;
+  private final ListeningExecutorService changeUpdateExector;
   private final RequestScopePropagator requestScopePropagator;
 
   private final ProjectControl projectControl;
@@ -263,6 +282,7 @@
 
   @Inject
   ReceiveCommits(final ReviewDb db,
+      final SchemaFactory<ReviewDb> schemaFactory,
       final AccountResolver accountResolver,
       final CreateChangeSender.Factory createChangeSenderFactory,
       final MergedSender.Factory mergedSenderFactory,
@@ -278,6 +298,7 @@
       @GerritPersonIdent final PersonIdent gerritIdent,
       final TrackingFooters trackingFooters,
       final WorkQueue workQueue,
+      @ChangeUpdateExecutor ListeningExecutorService changeUpdateExector,
       final RequestScopePropagator requestScopePropagator,
 
       @Assisted final ProjectControl projectControl,
@@ -285,6 +306,7 @@
       final SubmoduleOp.Factory subOpFactory) throws IOException {
     this.currentUser = (IdentifiedUser) projectControl.getCurrentUser();
     this.db = db;
+    this.schemaFactory = schemaFactory;
     this.accountResolver = accountResolver;
     this.createChangeSenderFactory = createChangeSenderFactory;
     this.mergedSenderFactory = mergedSenderFactory;
@@ -300,6 +322,7 @@
     this.trackingFooters = trackingFooters;
     this.tagCache = tagCache;
     this.workQueue = workQueue;
+    this.changeUpdateExector = changeUpdateExector;
     this.requestScopePropagator = requestScopePropagator;
 
     this.projectControl = projectControl;
@@ -593,7 +616,7 @@
         }
       } else if (replace.cmd.getResult() == OK) {
         try {
-          if (replace.insertPatchSet() != null) {
+          if (replace.insertPatchSet().checkedGet() != null) {
             replace.inputCommand.setResult(OK);
           }
         } catch (IOException err) {
@@ -635,14 +658,19 @@
     }
 
     try {
+      List<CheckedFuture<?, OrmException>> futures = Lists.newArrayList();
       for (ReplaceRequest replace : replaceByChange.values()) {
         if (replace.inputCommand == newChange) {
-          replace.insertPatchSet();
+          futures.add(replace.insertPatchSet());
         }
       }
 
       for (CreateRequest create : newChanges) {
-        create.insertChange();
+        futures.add(create.insertChange());
+      }
+
+      for (CheckedFuture<?, OrmException> f : futures) {
+        f.checkedGet();
       }
       newChange.setResult(OK);
     } catch (OrmException err) {
@@ -1226,10 +1254,35 @@
       cmd = new ReceiveCommand(ObjectId.zeroId(), c, ps.getRefName());
     }
 
-    void insertChange() throws IOException, OrmException {
+    CheckedFuture<Void, OrmException> insertChange() throws IOException {
       rp.getRevWalk().parseBody(commit);
       warnMalformedMessage(commit);
 
+      final Thread caller = Thread.currentThread();
+      ListenableFuture<Void> future = changeUpdateExector.submit(
+          requestScopePropagator.wrap(new Callable<Void>() {
+        @Override
+        public Void call() throws OrmException {
+          if (caller == Thread.currentThread()) {
+            insertChange(db);
+          } else {
+            ReviewDb db = schemaFactory.open();
+            try {
+              insertChange(db);
+            } finally {
+              db.close();
+            }
+          }
+          synchronized (newProgress) {
+            newProgress.update(1);
+          }
+          return null;
+        }
+      }));
+      return Futures.makeChecked(future, ORM_EXCEPTION);
+    }
+
+    private void insertChange(ReviewDb db) throws OrmException {
       final Account.Id me = currentUser.getAccountId();
       final Set<Account.Id> reviewers = new HashSet<Account.Id>(reviewerId);
       final Set<Account.Id> cc = new HashSet<Account.Id>(ccId);
@@ -1251,11 +1304,12 @@
 
       db.changes().beginTransaction(change.getId());
       try {
-        insertAncestors(ps.getId(), commit);
+        insertAncestors(db, ps.getId(), commit);
         db.patchSets().insert(Collections.singleton(ps));
         db.changes().insert(Collections.singleton(change));
         ChangeUtil.updateTrackingIds(db, change, trackingFooters, footerLines);
-        approvalsUtil.addReviewers(change, ps, info, reviewers);
+        approvalsUtil.addReviewers(db, change, ps, info,
+            reviewers, Collections.<Account.Id> emptySet());
         db.commit();
       } finally {
         db.rollback();
@@ -1264,7 +1318,6 @@
       created = true;
       replication.fire(project.getNameKey(), ps.getRefName());
       hooks.doPatchsetCreatedHook(change, ps, db);
-      newProgress.update(1);
       workQueue.getDefaultQueue()
           .submit(requestScopePropagator.wrap(new Runnable() {
         @Override
@@ -1509,10 +1562,38 @@
       return true;
     }
 
-    PatchSet.Id insertPatchSet() throws IOException, OrmException {
+    CheckedFuture<PatchSet.Id, OrmException> insertPatchSet()
+        throws IOException {
       rp.getRevWalk().parseBody(newCommit);
       warnMalformedMessage(newCommit);
 
+      final Thread caller = Thread.currentThread();
+      ListenableFuture<PatchSet.Id> future = changeUpdateExector.submit(
+          requestScopePropagator.wrap(new Callable<PatchSet.Id>() {
+        @Override
+        public PatchSet.Id call() throws OrmException {
+          try {
+            if (caller == Thread.currentThread()) {
+              return insertPatchSet(db);
+            } else {
+              ReviewDb db = schemaFactory.open();
+              try {
+                return insertPatchSet(db);
+              } finally {
+                db.close();
+              }
+            }
+          } finally {
+            synchronized (newProgress) {
+              replaceProgress.update(1);
+            }
+          }
+        }
+      }));
+      return Futures.makeChecked(future, ORM_EXCEPTION);
+    }
+
+    PatchSet.Id insertPatchSet(ReviewDb db) throws OrmException {
       final Account.Id me = currentUser.getAccountId();
       final Set<Account.Id> reviewers = new HashSet<Account.Id>(reviewerId);
       final Set<Account.Id> cc = new HashSet<Account.Id>(ccId);
@@ -1554,7 +1635,7 @@
           return null;
         }
 
-        insertAncestors(newPatchSet.getId(), newCommit);
+        insertAncestors(db, newPatchSet.getId(), newCommit);
         db.patchSets().insert(Collections.singleton(newPatchSet));
 
         if (checkMergedInto) {
@@ -1562,7 +1643,8 @@
           mergedIntoRef = mergedInto != null ? mergedInto.getName() : null;
         }
 
-        List<PatchSetApproval> patchSetApprovals = approvalsUtil.copyVetosToLatestPatchSet(change);
+        List<PatchSetApproval> patchSetApprovals =
+            approvalsUtil.copyVetosToLatestPatchSet(db, change);
 
         final Set<Account.Id> haveApprovals = new HashSet<Account.Id>();
         oldReviewers.clear();
@@ -1577,7 +1659,8 @@
           }
         }
 
-        approvalsUtil.addReviewers(change, newPatchSet, info, reviewers, haveApprovals);
+        approvalsUtil.addReviewers(db, change, newPatchSet, info,
+            reviewers, haveApprovals);
 
         msg =
             new ChangeMessage(new ChangeMessage.Key(change.getId(), ChangeUtil
@@ -1638,7 +1721,6 @@
 
       replication.fire(project.getNameKey(), newPatchSet.getRefName());
       hooks.doPatchsetCreatedHook(change, newPatchSet, db);
-      replaceProgress.update(1);
       if (mergedIntoRef != null) {
         hooks.doChangeMergedHook(
             change, currentUser.getAccount(), newPatchSet, db);
@@ -2042,7 +2124,9 @@
       }
 
       for (final ReplaceRequest req : toClose) {
-        final PatchSet.Id psi = req.validate(true) ? req.insertPatchSet() : null;
+        final PatchSet.Id psi = req.validate(true)
+            ? req.insertPatchSet().checkedGet()
+            : null;
         if (psi != null) {
           closeChange(req.inputCommand, psi, req.newCommit);
           closeProgress.update(1);
@@ -2189,7 +2273,7 @@
     }));
   }
 
-  private void insertAncestors(PatchSet.Id id, RevCommit src)
+  private void insertAncestors(ReviewDb db, PatchSet.Id id, RevCommit src)
       throws OrmException {
     final int cnt = src.getParentCount();
     List<PatchSetAncestor> toInsert = new ArrayList<PatchSetAncestor>(cnt);
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java
index 063db2d..1cbd227 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java
@@ -14,15 +14,20 @@
 
 package com.google.gerrit.server.git;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.gerrit.server.config.GerritServerConfig;
-import com.google.gerrit.server.git.WorkQueue;
-import com.google.gerrit.server.git.WorkQueue.Executor;
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
 
 import org.eclipse.jgit.lib.Config;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 /** Module providing the {@link ReceiveCommitsExecutor}. */
 public class ReceiveCommitsExecutorModule extends AbstractModule {
   @Override
@@ -32,10 +37,31 @@
   @Provides
   @Singleton
   @ReceiveCommitsExecutor
-  public Executor getReceiveCommitsExecutor(@GerritServerConfig Config config,
+  public WorkQueue.Executor createReceiveCommitsExecutor(
+      @GerritServerConfig Config config,
       WorkQueue queues) {
     int poolSize = config.getInt("receive", null, "threadPoolSize",
         Runtime.getRuntime().availableProcessors());
     return queues.createQueue(poolSize, "ReceiveCommits");
   }
+
+  @Provides
+  @Singleton
+  @ChangeUpdateExecutor
+  public ListeningExecutorService createChangeUpdateExecutor(@GerritServerConfig Config config) {
+    int poolSize = config.getInt("receive", null, "changeUpdateThreads", 1);
+    if (poolSize <= 1) {
+      return MoreExecutors.sameThreadExecutor();
+    }
+    return MoreExecutors.listeningDecorator(
+        MoreExecutors.getExitingExecutorService(
+          new ThreadPoolExecutor(1, poolSize,
+              10, TimeUnit.MINUTES,
+              new ArrayBlockingQueue<Runnable>(poolSize),
+              new ThreadFactoryBuilder()
+                .setNameFormat("ChangeUpdate-%d")
+                .setDaemon(true)
+                .build(),
+              new ThreadPoolExecutor.CallerRunsPolicy())));
+  }
 }