NoteDbMigrator: Use one NoteDbUpdateManager per project
We want to avoid writing out thousands of loose objects during
migration by using a custom ObjectInserter implementation. This wouldn't
do any good if NoteDbMigrator created a new NoteDbUpdateManager with a
new ObjectInserter for each change. Hoist manager creation into
NoteDbMigrator instead, even though this requires repeating a bit of
logic that previously lived in ChangeRebuilderImpl.
The old implementation of NoteDbUpdateManager unconditionally buffered
in memory all objects that were inserted, so it could expose them in the
StagedResult used by the auto-rebuilding and dry-run codepaths. This
behavior is reasonable when modifying a small number of changes, but
would not be appropriate for a batch migration where the manager
is used to update thousands of changes in a repository. Fortunately, the
InsertedObjects aren't needed in the migration codepath, so we can just
add a flag to NoteDbUpdateManager to turn this behavior off.
Similarly, plumb a flag to NoteDbUpdateManager to turn off atomic ref
updates. In an online migration, we fully expect some changes to be
modified concurrently, which is already handled by later
auto-rebuilding. No need to block a single giant BatchRefUpdate on such
changes.
Change-Id: I32446fd17d6a2b0cc8b30bd1164402580d7e0aa2
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/InMemoryInserter.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/InMemoryInserter.java
index 9c43bdb..b80f846 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/git/InMemoryInserter.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/InMemoryInserter.java
@@ -95,6 +95,10 @@
return ImmutableList.copyOf(inserted.values());
}
+ public int getInsertedObjectCount() {
+ return inserted.values().size();
+ }
+
public void clear() {
inserted.clear();
}
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeNotes.java b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeNotes.java
index c9a28f4..7799d2d 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeNotes.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeNotes.java
@@ -779,6 +779,7 @@
rebuildResult = checkNotNull(r);
checkNotNull(r.newState());
checkNotNull(r.staged());
+ checkNotNull(r.staged().changeObjects());
return LoadHandle.create(
ChangeNotesCommit.newStagedRevWalk(repo, r.staged().changeObjects()),
r.newState().getChangeMetaId());
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/NoteDbUpdateManager.java b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/NoteDbUpdateManager.java
index ea1f891..3f20766 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/NoteDbUpdateManager.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/NoteDbUpdateManager.java
@@ -98,13 +98,13 @@
ImmutableList<InsertedObject> changeObjects = ImmutableList.of();
if (changeRepo != null) {
changeCommands = changeRepo.getCommandsSnapshot();
- changeObjects = changeRepo.tempIns.getInsertedObjects();
+ changeObjects = changeRepo.getInsertedObjects();
}
ImmutableList<ReceiveCommand> allUsersCommands = ImmutableList.of();
ImmutableList<InsertedObject> allUsersObjects = ImmutableList.of();
if (allUsersRepo != null) {
allUsersCommands = allUsersRepo.getCommandsSnapshot();
- allUsersObjects = allUsersRepo.tempIns.getInsertedObjects();
+ allUsersObjects = allUsersRepo.getInsertedObjects();
}
return new AutoValue_NoteDbUpdateManager_StagedResult(
id, delta,
@@ -119,10 +119,32 @@
public abstract ImmutableList<ReceiveCommand> changeCommands();
+ /**
+ * Objects inserted into the change repo for this change.
+ *
+ * <p>Includes all objects inserted for any change in this repo that may have been processed by
+ * the corresponding {@link NoteDbUpdateManager} instance, not just those objects that were
+ * inserted to handle this specific change's updates.
+ *
+ * @return inserted objects, or null if the corresponding {@link NoteDbUpdateManager} was
+ * configured not to {@link NoteDbUpdateManager#setSaveObjects(boolean) save objects}.
+ */
+ @Nullable
public abstract ImmutableList<InsertedObject> changeObjects();
public abstract ImmutableList<ReceiveCommand> allUsersCommands();
+ /**
+ * Objects inserted into the All-Users repo for this change.
+ *
+ * <p>Includes all objects inserted into All-Users for any change that may have been processed
+ * by the corresponding {@link NoteDbUpdateManager} instance, not just those objects that were
+ * inserted to handle this specific change's updates.
+ *
+ * @return inserted objects, or null if the corresponding {@link NoteDbUpdateManager} was
+ * configured not to {@link NoteDbUpdateManager#setSaveObjects(boolean) save objects}.
+ */
+ @Nullable
public abstract ImmutableList<InsertedObject> allUsersObjects();
}
@@ -144,17 +166,20 @@
public final RevWalk rw;
public final ChainedReceiveCommands cmds;
- private final InMemoryInserter tempIns;
+ private final InMemoryInserter inMemIns;
+ private final ObjectInserter tempIns;
@Nullable private final ObjectInserter finalIns;
private final boolean close;
+ private final boolean saveObjects;
private OpenRepo(
Repository repo,
RevWalk rw,
@Nullable ObjectInserter ins,
ChainedReceiveCommands cmds,
- boolean close) {
+ boolean close,
+ boolean saveObjects) {
ObjectReader reader = rw.getObjectReader();
checkArgument(
ins == null || reader.getCreatedFromInserter() == ins,
@@ -162,11 +187,21 @@
ins,
reader.getCreatedFromInserter());
this.repo = checkNotNull(repo);
- this.tempIns = new InMemoryInserter(rw.getObjectReader());
+
+ if (saveObjects) {
+ this.inMemIns = new InMemoryInserter(rw.getObjectReader());
+ this.tempIns = inMemIns;
+ } else {
+ checkArgument(ins != null);
+ this.inMemIns = null;
+ this.tempIns = ins;
+ }
+
this.rw = new RevWalk(tempIns.newReader());
this.finalIns = ins;
this.cmds = checkNotNull(cmds);
this.close = close;
+ this.saveObjects = saveObjects;
}
public Optional<ObjectId> getObjectId(String refName) throws IOException {
@@ -177,17 +212,25 @@
return ImmutableList.copyOf(cmds.getCommands().values());
}
+ @Nullable
+ ImmutableList<InsertedObject> getInsertedObjects() {
+ return saveObjects ? inMemIns.getInsertedObjects() : null;
+ }
+
void flush() throws IOException {
flushToFinalInserter();
finalIns.flush();
}
void flushToFinalInserter() throws IOException {
+ if (!saveObjects) {
+ return;
+ }
checkState(finalIns != null);
- for (InsertedObject obj : tempIns.getInsertedObjects()) {
+ for (InsertedObject obj : inMemIns.getInsertedObjects()) {
finalIns.insert(obj.type(), obj.data().toByteArray());
}
- tempIns.clear();
+ inMemIns.clear();
}
@Override
@@ -219,6 +262,8 @@
private OpenRepo allUsersRepo;
private Map<Change.Id, StagedResult> staged;
private boolean checkExpectedState = true;
+ private boolean saveObjects = true;
+ private boolean atomicRefUpdates = true;
private String refLogMessage;
private PersonIdent refLogIdent;
private PushCertificate pushCert;
@@ -264,14 +309,14 @@
public NoteDbUpdateManager setChangeRepo(
Repository repo, RevWalk rw, @Nullable ObjectInserter ins, ChainedReceiveCommands cmds) {
checkState(changeRepo == null, "change repo already initialized");
- changeRepo = new OpenRepo(repo, rw, ins, cmds, false);
+ changeRepo = new OpenRepo(repo, rw, ins, cmds, false, true);
return this;
}
public NoteDbUpdateManager setAllUsersRepo(
Repository repo, RevWalk rw, @Nullable ObjectInserter ins, ChainedReceiveCommands cmds) {
checkState(allUsersRepo == null, "All-Users repo already initialized");
- allUsersRepo = new OpenRepo(repo, rw, ins, cmds, false);
+ allUsersRepo = new OpenRepo(repo, rw, ins, cmds, false, true);
return this;
}
@@ -280,6 +325,37 @@
return this;
}
+ /**
+ * Set whether to save objects and make them available in {@link StagedResult}s.
+ *
+ * <p>If set, all objects inserted into all repos managed by this instance will be buffered in
+ * memory, and the {@link StagedResult}s will return non-null lists from {@link
+ * StagedResult#changeObjects()} and {@link StagedResult#allUsersObjects()}.
+ *
+ * <p>Not recommended if modifying a large number of changes with a single manager.
+ *
+ * @param saveObjects whether to save objects; defaults to true.
+ * @return this
+ */
+ public NoteDbUpdateManager setSaveObjects(boolean saveObjects) {
+ this.saveObjects = saveObjects;
+ return this;
+ }
+
+ /**
+ * Set whether to use atomic ref updates.
+ *
+ * <p>Can be set to false when the change updates represented by this manager aren't logically
+ * related, e.g. when the updater is only used to group objects together with a single inserter.
+ *
+ * @param atomicRefUpdates whether to use atomic ref updates; defaults to true.
+ * @return this
+ */
+ public NoteDbUpdateManager setAtomicRefUpdates(boolean atomicRefUpdates) {
+ this.atomicRefUpdates = atomicRefUpdates;
+ return this;
+ }
+
public NoteDbUpdateManager setRefLogMessage(String message) {
this.refLogMessage = message;
return this;
@@ -336,7 +412,7 @@
ObjectInserter ins = repo.newObjectInserter(); // Closed by OpenRepo#close.
ObjectReader reader = ins.newReader(); // Not closed by OpenRepo#close.
try (RevWalk rw = new RevWalk(reader)) { // Doesn't escape OpenRepo constructor.
- return new OpenRepo(repo, rw, ins, new ChainedReceiveCommands(repo), true) {
+ return new OpenRepo(repo, rw, ins, new ChainedReceiveCommands(repo), true, saveObjects) {
@Override
public void close() {
reader.close();
@@ -543,6 +619,7 @@
} else {
// OpenRepo buffers objects separately; caller may assume that objects are available in the
// inserter it previously passed via setChangeRepo.
+ checkState(saveObjects, "cannot use dryrun with saveObjects = false");
or.flushToFinalInserter();
}
@@ -554,6 +631,7 @@
bru.setRefLogMessage(firstNonNull(guessRestApiHandler(), "Update NoteDb refs"), false);
}
bru.setRefLogIdent(refLogIdent != null ? refLogIdent : serverIdent.get());
+ bru.setAtomic(atomicRefUpdates);
or.cmds.addTo(bru);
bru.setAllowNonFastForwards(true);
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/rebuild/ChangeRebuilderImpl.java b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/rebuild/ChangeRebuilderImpl.java
index 166d8a9..a28d9c5 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/rebuild/ChangeRebuilderImpl.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/rebuild/ChangeRebuilderImpl.java
@@ -187,7 +187,7 @@
}
try (NoteDbUpdateManager manager = updateManagerFactory.create(change.getProject())) {
buildUpdates(manager, bundleReader.fromReviewDb(db, changeId));
- return execute(db, changeId, manager, checkReadOnly);
+ return execute(db, changeId, manager, checkReadOnly, true);
}
}
@@ -216,11 +216,15 @@
@Override
public Result execute(ReviewDb db, Change.Id changeId, NoteDbUpdateManager manager)
throws OrmException, IOException {
- return execute(db, changeId, manager, true);
+ return execute(db, changeId, manager, true, true);
}
public Result execute(
- ReviewDb db, Change.Id changeId, NoteDbUpdateManager manager, boolean checkReadOnly)
+ ReviewDb db,
+ Change.Id changeId,
+ NoteDbUpdateManager manager,
+ boolean checkReadOnly,
+ boolean executeManager)
throws OrmException, IOException {
db = ReviewDbUtil.unwrapDb(db);
Change change = checkNoteDbState(ChangeNotes.readOneReviewDbChange(db, changeId));
@@ -277,11 +281,13 @@
// to the caller so they know to use the staged results instead of reading from the repo.
throw new OrmException(NoteDbUpdateManager.CHANGES_READ_ONLY);
}
- manager.execute();
+ if (executeManager) {
+ manager.execute();
+ }
return r;
}
- private static Change checkNoteDbState(Change c) throws OrmException {
+ static Change checkNoteDbState(Change c) throws OrmException {
// Can only rebuild a change if its primary storage is ReviewDb.
NoteDbChangeState s = NoteDbChangeState.parse(c);
if (s != null && s.getPrimaryStorage() != PrimaryStorage.REVIEW_DB) {
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java
index add6d57..3f72ee5 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/rebuild/NoteDbMigrator.java
@@ -26,6 +26,7 @@
import static com.google.gerrit.server.notedb.NotesMigrationState.WRITE;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import com.google.common.annotations.VisibleForTesting;
@@ -56,12 +57,16 @@
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.LockFailureException;
import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.notedb.ChangeBundleReader;
+import com.google.gerrit.server.notedb.ChangeNotes;
import com.google.gerrit.server.notedb.MutableNotesMigration;
import com.google.gerrit.server.notedb.NoteDbTable;
+import com.google.gerrit.server.notedb.NoteDbUpdateManager;
import com.google.gerrit.server.notedb.NotesMigrationState;
import com.google.gerrit.server.notedb.PrimaryStorageMigrator;
import com.google.gerrit.server.notedb.RepoSequence;
import com.google.gerrit.server.notedb.rebuild.ChangeRebuilder.NoPatchSetsException;
+import com.google.gerrit.server.project.NoSuchChangeException;
import com.google.gerrit.server.util.ManualRequestContext;
import com.google.gerrit.server.util.ThreadLocalRequestContext;
import com.google.gwtorm.server.OrmException;
@@ -74,8 +79,10 @@
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -119,10 +126,12 @@
private final SitePaths sitePaths;
private final SchemaFactory<ReviewDb> schemaFactory;
private final GitRepositoryManager repoManager;
+ private final NoteDbUpdateManager.Factory updateManagerFactory;
+ private final ChangeBundleReader bundleReader;
private final AllProjectsName allProjects;
private final InternalUser.Factory userFactory;
private final ThreadLocalRequestContext requestContext;
- private final ChangeRebuilder rebuilder;
+ private final ChangeRebuilderImpl rebuilder;
private final WorkQueue workQueue;
private final MutableNotesMigration globalNotesMigration;
private final PrimaryStorageMigrator primaryStorageMigrator;
@@ -144,10 +153,12 @@
SitePaths sitePaths,
SchemaFactory<ReviewDb> schemaFactory,
GitRepositoryManager repoManager,
+ NoteDbUpdateManager.Factory updateManagerFactory,
+ ChangeBundleReader bundleReader,
AllProjectsName allProjects,
ThreadLocalRequestContext requestContext,
InternalUser.Factory userFactory,
- ChangeRebuilder rebuilder,
+ ChangeRebuilderImpl rebuilder,
WorkQueue workQueue,
MutableNotesMigration globalNotesMigration,
PrimaryStorageMigrator primaryStorageMigrator,
@@ -159,6 +170,8 @@
this.sitePaths = sitePaths;
this.schemaFactory = schemaFactory;
this.repoManager = repoManager;
+ this.updateManagerFactory = updateManagerFactory;
+ this.bundleReader = bundleReader;
this.allProjects = allProjects;
this.requestContext = requestContext;
this.userFactory = userFactory;
@@ -318,6 +331,8 @@
sitePaths,
schemaFactory,
repoManager,
+ updateManagerFactory,
+ bundleReader,
allProjects,
requestContext,
userFactory,
@@ -343,10 +358,12 @@
private final FileBasedConfig noteDbConfig;
private final SchemaFactory<ReviewDb> schemaFactory;
private final GitRepositoryManager repoManager;
+ private final NoteDbUpdateManager.Factory updateManagerFactory;
+ private final ChangeBundleReader bundleReader;
private final AllProjectsName allProjects;
private final ThreadLocalRequestContext requestContext;
private final InternalUser.Factory userFactory;
- private final ChangeRebuilder rebuilder;
+ private final ChangeRebuilderImpl rebuilder;
private final MutableNotesMigration globalNotesMigration;
private final PrimaryStorageMigrator primaryStorageMigrator;
private final DynamicSet<NotesMigrationStateListener> listeners;
@@ -365,10 +382,12 @@
SitePaths sitePaths,
SchemaFactory<ReviewDb> schemaFactory,
GitRepositoryManager repoManager,
+ NoteDbUpdateManager.Factory updateManagerFactory,
+ ChangeBundleReader bundleReader,
AllProjectsName allProjects,
ThreadLocalRequestContext requestContext,
InternalUser.Factory userFactory,
- ChangeRebuilder rebuilder,
+ ChangeRebuilderImpl rebuilder,
MutableNotesMigration globalNotesMigration,
PrimaryStorageMigrator primaryStorageMigrator,
DynamicSet<NotesMigrationStateListener> listeners,
@@ -392,6 +411,8 @@
this.schemaFactory = schemaFactory;
this.rebuilder = rebuilder;
this.repoManager = repoManager;
+ this.updateManagerFactory = updateManagerFactory;
+ this.bundleReader = bundleReader;
this.allProjects = allProjects;
this.requestContext = requestContext;
this.userFactory = userFactory;
@@ -729,43 +750,85 @@
ProgressMonitor pm =
new TextProgressMonitor(
new PrintWriter(new BufferedWriter(new OutputStreamWriter(progressOut, UTF_8))));
- pm.beginTask(FormatUtil.elide(project.get(), 50), allChanges.get(project).size());
- try {
+ try (NoteDbUpdateManager manager =
+ updateManagerFactory.create(project).setSaveObjects(false).setAtomicRefUpdates(false)) {
+ Set<Change.Id> skipExecute = new HashSet<>();
Collection<Change.Id> changes = allChanges.get(project);
- for (Change.Id changeId : changes) {
- // Update one change at a time, which ends up creating one NoteDbUpdateManager per change as
- // well. This turns out to be no more expensive than batching, since each NoteDb operation
- // is only writing single loose ref updates and loose objects. Plus we have to do one
- // ReviewDb transaction per change due to the AtomicUpdate, so if we somehow batched NoteDb
- // operations, ReviewDb would become the bottleneck.
- try {
- rebuilder.rebuild(db, changeId);
- } catch (NoPatchSetsException e) {
- log.warn(e.getMessage());
- } catch (RepositoryNotFoundException e) {
- log.warn("Repository {} not found while rebuilding change {}", project, changeId);
- } catch (ConflictingUpdateException e) {
- log.warn(
- "Rebuilding detected a conflicting ReviewDb update for change {};"
- + " will be auto-rebuilt at runtime",
- changeId);
- } catch (LockFailureException e) {
- log.warn(
- "Rebuilding detected a conflicting NoteDb update for change {};"
- + " will be auto-rebuilt at runtime",
- changeId);
- } catch (Throwable t) {
- log.error("Failed to rebuild change " + changeId, t);
- ok = false;
+ pm.beginTask(FormatUtil.elide("Rebuilding " + project.get(), 50), changes.size());
+ try {
+ for (Change.Id changeId : changes) {
+ boolean staged = false;
+ try {
+ stage(db, changeId, manager);
+ staged = true;
+ } catch (NoPatchSetsException e) {
+ log.warn(e.getMessage());
+ } catch (RepositoryNotFoundException e) {
+ log.warn("Repository {} not found while rebuilding change {}", project, changeId);
+ } catch (Throwable t) {
+ log.error("Failed to rebuild change " + changeId, t);
+ ok = false;
+ }
+ pm.update(1);
+ if (!staged) {
+ skipExecute.add(changeId);
+ }
}
- pm.update(1);
+ } finally {
+ pm.endTask();
}
- } finally {
- pm.endTask();
+
+ pm.beginTask(
+ FormatUtil.elide("Saving " + project.get(), 50), changes.size() - skipExecute.size());
+ try {
+ for (Change.Id changeId : changes) {
+ if (skipExecute.contains(changeId)) {
+ continue;
+ }
+ try {
+ rebuilder.execute(db, changeId, manager, true, false);
+ } catch (ConflictingUpdateException e) {
+ log.warn(
+ "Rebuilding detected a conflicting ReviewDb update for change {};"
+ + " will be auto-rebuilt at runtime",
+ changeId);
+ } catch (Throwable t) {
+ log.error("Failed to rebuild change " + changeId, t);
+ ok = false;
+ }
+ pm.update(1);
+ }
+ } finally {
+ pm.endTask();
+ }
+
+ try {
+ manager.execute();
+ } catch (LockFailureException e) {
+ log.warn(
+ "Rebuilding detected a conflicting NoteDb update for the following refs, which will"
+ + " be auto-rebuilt at runtime: {}",
+ e.getFailedRefs().stream().distinct().sorted().collect(joining(", ")));
+ } catch (OrmException | IOException e) {
+ log.error("Failed to save NoteDb state for " + project, e);
+ }
}
return ok;
}
+ private void stage(ReviewDb db, Change.Id changeId, NoteDbUpdateManager manager)
+ throws OrmException, IOException {
+ // Match ChangeRebuilderImpl#stage, but without calling manager.stage(), since that can only be
+ // called after building updates for all changes.
+ Change change =
+ ChangeRebuilderImpl.checkNoteDbState(ChangeNotes.readOneReviewDbChange(db, changeId));
+ if (change == null) {
+ // Could log here instead, but this matches the behavior of ChangeRebuilderImpl#stage.
+ throw new NoSuchChangeException(changeId);
+ }
+ rebuilder.buildUpdates(manager, bundleReader.fromReviewDb(db, changeId));
+ }
+
private static boolean futuresToBoolean(List<ListenableFuture<Boolean>> futures, String errMsg) {
try {
return Futures.allAsList(futures).get().stream().allMatch(b -> b);