Finish NoteDbMigrator

The final phase is converting all changes to NoteDb primary. This is
very straightforward, as PrimaryStorageMigrator was designed to be run
in arbitrary order with arbitrary parallelism.

The only tricky thing is that this phase changes to _NOTE_DB_PRIMARY
before doing any work, then NOTE_DB(_UNFUSED) after. The only way it's
possible to iterate through the main state loop in _PRIMARY is if there
was a failure, in which case we run the failed step again on the next
execution.

This exposed one now-faulty assumption in the NoteDbBatchUpdate
implementations that the specific ReviewDb instance handed to the
constructor has its changes tables disabled. It's possible for the
config read from the NotesMigration to determine which BatchUpdate
implementation to use to differ from the bits used by
NotesMigrationSchemaFactory at context creation time to open a ReviewDb.
There's no harm in removing these overly-conservative checks.

Change-Id: I0a7def4dc7e1da18eb928920b2538225fc6fea57
diff --git a/gerrit-acceptance-tests/src/test/java/com/google/gerrit/acceptance/server/notedb/OnlineNoteDbMigrationIT.java b/gerrit-acceptance-tests/src/test/java/com/google/gerrit/acceptance/server/notedb/OnlineNoteDbMigrationIT.java
index 1dc9059..bb72a58 100644
--- a/gerrit-acceptance-tests/src/test/java/com/google/gerrit/acceptance/server/notedb/OnlineNoteDbMigrationIT.java
+++ b/gerrit-acceptance-tests/src/test/java/com/google/gerrit/acceptance/server/notedb/OnlineNoteDbMigrationIT.java
@@ -17,6 +17,7 @@
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.common.truth.Truth8.assertThat;
 import static com.google.common.truth.TruthJUnit.assume;
+import static com.google.gerrit.server.notedb.NotesMigrationState.NOTE_DB_UNFUSED;
 import static com.google.gerrit.server.notedb.NotesMigrationState.READ_WRITE_NO_SEQUENCE;
 import static com.google.gerrit.server.notedb.NotesMigrationState.READ_WRITE_WITH_SEQUENCE_NOTE_DB_PRIMARY;
 import static com.google.gerrit.server.notedb.NotesMigrationState.READ_WRITE_WITH_SEQUENCE_REVIEW_DB_PRIMARY;
@@ -292,6 +293,50 @@
     }
   }
 
+  @Test
+  public void fullMigration() throws Exception {
+    PushOneCommit.Result r = createChange();
+    Change.Id id = r.getChange().getId();
+
+    migrate(b -> b);
+    assertNotesMigrationState(NOTE_DB_UNFUSED);
+
+    assertThat(sequences.nextChangeId()).isEqualTo(502);
+
+    ObjectId oldMetaId;
+    int rowVersion;
+    try (ReviewDb db = schemaFactory.open();
+        Repository repo = repoManager.openRepository(project)) {
+      Ref ref = repo.exactRef(RefNames.changeMetaRef(id));
+      assertThat(ref).isNotNull();
+      oldMetaId = ref.getObjectId();
+
+      Change c = db.changes().get(id);
+      assertThat(c.getTopic()).isNull();
+      rowVersion = c.getRowVersion();
+      NoteDbChangeState s = NoteDbChangeState.parse(c);
+      assertThat(s.getPrimaryStorage()).isEqualTo(PrimaryStorage.NOTE_DB);
+      assertThat(s.getRefState()).isEmpty();
+    }
+
+    // Do not open a new context, to simulate races with other threads that opened a context earlier
+    // in the migration process; this needs to work.
+    gApi.changes().id(id.get()).topic(name("a-topic"));
+
+    // Of course, it should also work with a new context.
+    resetCurrentApiUser();
+    gApi.changes().id(id.get()).topic(name("another-topic"));
+
+    try (ReviewDb db = schemaFactory.open();
+        Repository repo = repoManager.openRepository(project)) {
+      assertThat(repo.exactRef(RefNames.changeMetaRef(id)).getObjectId()).isNotEqualTo(oldMetaId);
+
+      Change c = db.changes().get(id);
+      assertThat(c.getTopic()).isNull();
+      assertThat(c.getRowVersion()).isEqualTo(rowVersion);
+    }
+  }
+
   private void assertNotesMigrationState(NotesMigrationState expected) throws Exception {
     assertThat(NotesMigrationState.forNotesMigration(notesMigration)).hasValue(expected);
     gerritConfig.load();
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java
index 6e8d163..a866314 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java
@@ -16,20 +16,21 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 import static com.google.gerrit.reviewdb.server.ReviewDbUtil.unwrapDb;
 import static com.google.gerrit.server.notedb.NotesMigrationState.NOTE_DB_UNFUSED;
 import static com.google.gerrit.server.notedb.NotesMigrationState.READ_WRITE_NO_SEQUENCE;
+import static com.google.gerrit.server.notedb.NotesMigrationState.READ_WRITE_WITH_SEQUENCE_NOTE_DB_PRIMARY;
 import static com.google.gerrit.server.notedb.NotesMigrationState.READ_WRITE_WITH_SEQUENCE_REVIEW_DB_PRIMARY;
 import static com.google.gerrit.server.notedb.NotesMigrationState.WRITE;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.toList;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicates;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.MultimapBuilder;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.SetMultimap;
@@ -53,6 +54,7 @@
 import com.google.gerrit.server.notedb.ConfigNotesMigration;
 import com.google.gerrit.server.notedb.NotesMigration;
 import com.google.gerrit.server.notedb.NotesMigrationState;
+import com.google.gerrit.server.notedb.PrimaryStorageMigrator;
 import com.google.gerrit.server.notedb.RepoSequence;
 import com.google.gerrit.server.notedb.rebuild.ChangeRebuilder.NoPatchSetsException;
 import com.google.gwtorm.server.OrmException;
@@ -93,6 +95,7 @@
     private final ChangeRebuilder rebuilder;
     private final WorkQueue workQueue;
     private final NotesMigration globalNotesMigration;
+    private final PrimaryStorageMigrator primaryStorageMigrator;
 
     private int threads;
     private ImmutableList<Project.NameKey> projects = ImmutableList.of();
@@ -112,7 +115,8 @@
         AllProjectsName allProjects,
         ChangeRebuilder rebuilder,
         WorkQueue workQueue,
-        NotesMigration globalNotesMigration) {
+        NotesMigration globalNotesMigration,
+        PrimaryStorageMigrator primaryStorageMigrator) {
       this.cfg = cfg;
       this.sitePaths = sitePaths;
       this.schemaFactory = schemaFactory;
@@ -121,6 +125,7 @@
       this.rebuilder = rebuilder;
       this.workQueue = workQueue;
       this.globalNotesMigration = globalNotesMigration;
+      this.primaryStorageMigrator = primaryStorageMigrator;
     }
 
     /**
@@ -258,6 +263,7 @@
           allProjects,
           rebuilder,
           globalNotesMigration,
+          primaryStorageMigrator,
           threads > 1
               ? MoreExecutors.listeningDecorator(workQueue.createQueue(threads, "RebuildChange"))
               : MoreExecutors.newDirectExecutorService(),
@@ -277,6 +283,7 @@
   private final AllProjectsName allProjects;
   private final ChangeRebuilder rebuilder;
   private final NotesMigration globalNotesMigration;
+  private final PrimaryStorageMigrator primaryStorageMigrator;
 
   private final ListeningExecutorService executor;
   private final ImmutableList<Project.NameKey> projects;
@@ -294,6 +301,7 @@
       AllProjectsName allProjects,
       ChangeRebuilder rebuilder,
       NotesMigration globalNotesMigration,
+      PrimaryStorageMigrator primaryStorageMigrator,
       ListeningExecutorService executor,
       ImmutableList<Project.NameKey> projects,
       ImmutableList<Change.Id> changes,
@@ -315,6 +323,7 @@
     this.repoManager = repoManager;
     this.allProjects = allProjects;
     this.globalNotesMigration = globalNotesMigration;
+    this.primaryStorageMigrator = primaryStorageMigrator;
     this.gerritConfig = new FileBasedConfig(sitePaths.gerrit_config.toFile(), FS.detect());
     this.executor = executor;
     this.projects = projects;
@@ -387,11 +396,16 @@
             state = rebuildAndEnableReads(state);
             rebuilt = true;
           } else {
-            state = setNoteDbPrimary();
+            state = setNoteDbPrimary(state);
           }
           break;
         case READ_WRITE_WITH_SEQUENCE_NOTE_DB_PRIMARY:
-          state = disableReviewDb();
+          // The only way we can get here is if there was a failure on a previous run of
+          // setNoteDbPrimary, since that method moves to NOTE_DB_UNFUSED if it completes
+          // successfully. Assume that not all changes were converted and re-run the step.
+          // migrateToNoteDbPrimary is a relatively fast no-op for already-migrated changes, so this
+          // isn't actually repeating work.
+          state = setNoteDbPrimary(state);
           break;
         case NOTE_DB_UNFUSED:
           // Done!
@@ -435,12 +449,61 @@
     return saveState(prev, READ_WRITE_WITH_SEQUENCE_REVIEW_DB_PRIMARY);
   }
 
-  private NotesMigrationState setNoteDbPrimary() {
-    throw new UnsupportedOperationException("not yet implemented");
+  private NotesMigrationState setNoteDbPrimary(NotesMigrationState prev)
+      throws MigrationException, OrmException, IOException {
+    checkState(
+        projects.isEmpty() && changes.isEmpty(),
+        "Should not have attempted setNoteDbPrimary with a subset of changes");
+    checkState(
+        prev == READ_WRITE_WITH_SEQUENCE_REVIEW_DB_PRIMARY
+            || prev == READ_WRITE_WITH_SEQUENCE_NOTE_DB_PRIMARY,
+        "Unexpected start state for setNoteDbPrimary: %s",
+        prev);
+
+    // Before changing the primary storage of old changes, ensure new changes are created with
+    // NoteDb primary.
+    prev = saveState(prev, READ_WRITE_WITH_SEQUENCE_NOTE_DB_PRIMARY);
+
+    Stopwatch sw = Stopwatch.createStarted();
+    log.info("Setting primary storage to NoteDb");
+    List<Change.Id> allChanges;
+    try (ReviewDb db = unwrapDb(schemaFactory.open())) {
+      allChanges = Streams.stream(db.changes().all()).map(Change::getId).collect(toList());
+    }
+
+    List<ListenableFuture<Boolean>> futures =
+        allChanges
+            .stream()
+            .map(
+                id ->
+                    executor.submit(
+                        () -> {
+                          // TODO(dborowitz): Avoid reopening db if using a single thread.
+                          try (ReviewDb db = unwrapDb(schemaFactory.open())) {
+                            primaryStorageMigrator.migrateToNoteDbPrimary(id);
+                            return true;
+                          } catch (Exception e) {
+                            log.error("Error migrating primary storage for " + id, e);
+                            return false;
+                          }
+                        }))
+            .collect(toList());
+
+    boolean ok = futuresToBoolean(futures, "Error migrating primary storage");
+    double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
+    log.info(
+        String.format(
+            "Migrated primary storage of %d changes in %.01fs (%.01f/s)\n",
+            allChanges.size(), t, allChanges.size() / t));
+    if (!ok) {
+      throw new MigrationException("Migrating primary storage for some changes failed, see log");
+    }
+
+    return disableReviewDb(prev);
   }
 
-  private NotesMigrationState disableReviewDb() {
-    throw new UnsupportedOperationException("not yet implemented");
+  private NotesMigrationState disableReviewDb(NotesMigrationState prev) throws IOException {
+    return saveState(prev, NOTE_DB_UNFUSED);
   }
 
   private Optional<NotesMigrationState> loadState() throws IOException {
@@ -485,7 +548,6 @@
     if (!globalNotesMigration.commitChangeWrites()) {
       throw new MigrationException("Cannot rebuild without noteDb.changes.write=true");
     }
-    boolean ok;
     Stopwatch sw = Stopwatch.createStarted();
     log.info("Rebuilding changes in NoteDb");
 
@@ -507,13 +569,7 @@
       futures.add(future);
     }
 
-    try {
-      ok = Iterables.all(Futures.allAsList(futures).get(), Predicates.equalTo(true));
-    } catch (InterruptedException | ExecutionException e) {
-      log.error("Error rebuilding projects", e);
-      ok = false;
-    }
-
+    boolean ok = futuresToBoolean(futures, "Error rebuilding projects");
     double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
     log.info(
         String.format(
@@ -593,4 +649,13 @@
     }
     return ok;
   }
+
+  private static boolean futuresToBoolean(List<ListenableFuture<Boolean>> futures, String errMsg) {
+    try {
+      return Futures.allAsList(futures).get().stream().allMatch(b -> b);
+    } catch (InterruptedException | ExecutionException e) {
+      log.error(errMsg, e);
+      return false;
+    }
+  }
 }
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/update/FusedNoteDbBatchUpdate.java b/gerrit-server/src/main/java/com/google/gerrit/server/update/FusedNoteDbBatchUpdate.java
index b979636..7db5e43 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/update/FusedNoteDbBatchUpdate.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/update/FusedNoteDbBatchUpdate.java
@@ -286,7 +286,6 @@
       @Assisted CurrentUser user,
       @Assisted Timestamp when) {
     super(repoManager, serverIdent, project, user, when);
-    checkArgument(!db.changesTablesEnabled(), "expected Change tables to be disabled on %s", db);
     this.changeNotesFactory = changeNotesFactory;
     this.changeControlFactory = changeControlFactory;
     this.changeUpdateFactory = changeUpdateFactory;
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/update/UnfusedNoteDbBatchUpdate.java b/gerrit-server/src/main/java/com/google/gerrit/server/update/UnfusedNoteDbBatchUpdate.java
index b5c7256..ce96c0e 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/update/UnfusedNoteDbBatchUpdate.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/update/UnfusedNoteDbBatchUpdate.java
@@ -14,7 +14,6 @@
 
 package com.google.gerrit.server.update;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Comparator.comparing;
 import static java.util.stream.Collectors.toList;
@@ -267,7 +266,6 @@
       @Assisted CurrentUser user,
       @Assisted Timestamp when) {
     super(repoManager, serverIdent, project, user, when);
-    checkArgument(!db.changesTablesEnabled(), "expected Change tables to be disabled on %s", db);
     this.changeNotesFactory = changeNotesFactory;
     this.changeControlFactory = changeControlFactory;
     this.changeUpdateFactory = changeUpdateFactory;