BatchUpdate: Add a metric for executeChangeOps latency

For a NoteDb migration (specifically, a revamp of If177aa0b), we want
to know the length of the race window between when a change's
noteDbChangeState is read from ReviewDb and the corresponding NoteDb
update completes. Since opening a transaction on a change is basically
the first I/O operation that happens within executeChangeOps, and
writing to NoteDb is basically the last, the simplest and most easily
understandable thing to do is just to record the latency of
executeChangeOps.

There are a several things that may make this an overestimate of this
race windows, such as scheduling latency for the first background
ChangeTask to start, but in general an overestimate of a race window
is not as problematic as an underestimate.

Change-Id: I8a340da132bae49a2a61f61c6641ba18e98d0791
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/BatchUpdate.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/BatchUpdate.java
index 2bd30be..8a41ebc 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/git/BatchUpdate.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/BatchUpdate.java
@@ -18,7 +18,9 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.Comparator.comparing;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ListMultimap;
@@ -32,6 +34,11 @@
 import com.google.gerrit.extensions.restapi.ResourceConflictException;
 import com.google.gerrit.extensions.restapi.ResourceNotFoundException;
 import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Description.Units;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.metrics.Timer1;
 import com.google.gerrit.reviewdb.client.Account;
 import com.google.gerrit.reviewdb.client.Change;
 import com.google.gerrit.reviewdb.client.PatchSet;
@@ -58,6 +65,8 @@
 import com.google.gwtorm.server.OrmConcurrencyException;
 import com.google.gwtorm.server.OrmException;
 import com.google.gwtorm.server.SchemaFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
 
@@ -341,6 +350,22 @@
     }
   }
 
+  @Singleton
+  private static class Metrics {
+    final Timer1<Boolean> executeChangeOpsLatency;
+
+    @Inject
+    Metrics(MetricMaker metricMaker) {
+      executeChangeOpsLatency = metricMaker.newTimer(
+          "batch_update/execute_change_ops",
+          new Description(
+                  "BatchUpdate change update latency, excluding reindexing")
+              .setCumulative()
+              .setUnit(Units.MILLISECONDS),
+          Field.ofBoolean("success"));
+    }
+  }
+
   private static Order getOrder(Collection<BatchUpdate> updates) {
     Order o = null;
     for (BatchUpdate u : updates) {
@@ -401,13 +426,15 @@
           }
           listener.afterRefUpdates();
           for (BatchUpdate u : updates) {
-            u.executeChangeOps(updateChangesInParallel, dryrun);
+            u.reindexChanges(
+              u.executeChangeOps(updateChangesInParallel, dryrun));
           }
           listener.afterUpdateChanges();
           break;
         case DB_BEFORE_REPO:
           for (BatchUpdate u : updates) {
-            u.executeChangeOps(updateChangesInParallel, dryrun);
+            u.reindexChanges(
+                u.executeChangeOps(updateChangesInParallel, dryrun));
           }
           listener.afterUpdateChanges();
           for (BatchUpdate u : updates) {
@@ -476,6 +503,7 @@
   private final GitReferenceUpdated gitRefUpdated;
   private final GitRepositoryManager repoManager;
   private final ListeningExecutorService changeUpdateExector;
+  private final Metrics metrics;
   private final NoteDbUpdateManager.Factory updateManagerFactory;
   private final NotesMigration notesMigration;
   private final ReviewDb db;
@@ -514,6 +542,7 @@
       @GerritPersonIdent PersonIdent serverIdent,
       GitReferenceUpdated gitRefUpdated,
       GitRepositoryManager repoManager,
+      Metrics metrics,
       NoteDbUpdateManager.Factory updateManagerFactory,
       NotesMigration notesMigration,
       SchemaFactory<ReviewDb> schemaFactory,
@@ -528,6 +557,7 @@
     this.changeUpdateFactory = changeUpdateFactory;
     this.gitRefUpdated = gitRefUpdated;
     this.indexer = indexer;
+    this.metrics = metrics;
     this.notesMigration = notesMigration;
     this.repoManager = repoManager;
     this.schemaFactory = schemaFactory;
@@ -697,58 +727,71 @@
     }
   }
 
-  private void executeChangeOps(boolean parallel,
+  private List<ChangeTask> executeChangeOps(boolean parallel,
       boolean dryrun) throws UpdateException,
       RestApiException {
-    logDebug("Executing change ops (parallel? {})", parallel);
-    ListeningExecutorService executor = parallel
-        ? changeUpdateExector
-        : MoreExecutors.newDirectExecutorService();
-
-    List<ChangeTask> tasks = new ArrayList<>(ops.keySet().size());
+    List<ChangeTask> tasks;
+    boolean success = false;
+    Stopwatch sw = Stopwatch.createStarted();
     try {
-      if (notesMigration.commitChangeWrites() && repo != null) {
-        // A NoteDb change may have been rebuilt since the repo was originally
-        // opened, so make sure we see that.
-        logDebug("Preemptively scanning for repo changes");
-        repo.scanForRepoChanges();
-      }
-      if (!ops.isEmpty() && notesMigration.failChangeWrites()) {
-        // Fail fast before attempting any writes if changes are read-only, as
-        // this is a programmer error.
-        logDebug("Failing early due to read-only Changes table");
-        throw new OrmException(NoteDbUpdateManager.CHANGES_READ_ONLY);
-      }
-      List<ListenableFuture<?>> futures = new ArrayList<>(ops.keySet().size());
-      for (Map.Entry<Change.Id, Collection<Op>> e : ops.asMap().entrySet()) {
-        ChangeTask task =
-            new ChangeTask(e.getKey(), e.getValue(), Thread.currentThread(),
-                dryrun);
-        tasks.add(task);
-        if (!parallel) {
-          logDebug("Direct execution of task for ops: {}", ops);
-        }
-        futures.add(executor.submit(task));
-      }
-      if (parallel) {
-        logDebug("Waiting on futures for {} ops spanning {} changes",
-            ops.size(), ops.keySet().size());
-      }
-      Futures.allAsList(futures).get();
+      logDebug("Executing change ops (parallel? {})", parallel);
+      ListeningExecutorService executor = parallel
+          ? changeUpdateExector
+          : MoreExecutors.newDirectExecutorService();
 
-      if (notesMigration.commitChangeWrites()) {
-        if (!dryrun) {
-          executeNoteDbUpdates(tasks);
+      tasks = new ArrayList<>(ops.keySet().size());
+      try {
+        if (notesMigration.commitChangeWrites() && repo != null) {
+          // A NoteDb change may have been rebuilt since the repo was originally
+          // opened, so make sure we see that.
+          logDebug("Preemptively scanning for repo changes");
+          repo.scanForRepoChanges();
         }
+        if (!ops.isEmpty() && notesMigration.failChangeWrites()) {
+          // Fail fast before attempting any writes if changes are read-only, as
+          // this is a programmer error.
+          logDebug("Failing early due to read-only Changes table");
+          throw new OrmException(NoteDbUpdateManager.CHANGES_READ_ONLY);
+        }
+        List<ListenableFuture<?>> futures =
+            new ArrayList<>(ops.keySet().size());
+        for (Map.Entry<Change.Id, Collection<Op>> e : ops.asMap().entrySet()) {
+          ChangeTask task =
+              new ChangeTask(e.getKey(), e.getValue(), Thread.currentThread(),
+                  dryrun);
+          tasks.add(task);
+          if (!parallel) {
+            logDebug("Direct execution of task for ops: {}", ops);
+          }
+          futures.add(executor.submit(task));
+        }
+        if (parallel) {
+          logDebug("Waiting on futures for {} ops spanning {} changes",
+              ops.size(), ops.keySet().size());
+        }
+        Futures.allAsList(futures).get();
+
+        if (notesMigration.commitChangeWrites()) {
+          if (!dryrun) {
+            executeNoteDbUpdates(tasks);
+          }
+        }
+        success = true;
+      } catch (ExecutionException | InterruptedException e) {
+        Throwables.propagateIfInstanceOf(e.getCause(), UpdateException.class);
+        Throwables.propagateIfInstanceOf(e.getCause(), RestApiException.class);
+        throw new UpdateException(e);
+      } catch (OrmException | IOException e) {
+        throw new UpdateException(e);
       }
-    } catch (ExecutionException | InterruptedException e) {
-      Throwables.propagateIfInstanceOf(e.getCause(), UpdateException.class);
-      Throwables.propagateIfInstanceOf(e.getCause(), RestApiException.class);
-      throw new UpdateException(e);
-    } catch (OrmException | IOException e) {
-      throw new UpdateException(e);
+    } finally {
+      metrics.executeChangeOpsLatency.record(
+          success, sw.elapsed(NANOSECONDS), NANOSECONDS);
     }
+    return tasks;
+  }
 
+  private void reindexChanges(List<ChangeTask> tasks) {
     // Reindex changes.
     for (ChangeTask task : tasks) {
       if (task.deleted) {