blob: d5ee14396b973ac9a7c7849c3dcec1336562cbed [file] [log] [blame]
// Copyright (C) 2022 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.approval;
import static com.google.common.collect.ImmutableList.toImmutableList;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.flogger.FluentLogger;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.entities.Change;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.RefNames;
import com.google.gerrit.extensions.restapi.RestApiException;
import com.google.gerrit.git.RefUpdateUtil;
import com.google.gerrit.server.FanOutExecutor;
import com.google.gerrit.server.InternalUser;
import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.notedb.ChangeNotes;
import com.google.gerrit.server.notedb.ChangeUpdate;
import com.google.gerrit.server.update.BatchUpdate;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.UpdateException;
import com.google.gerrit.server.util.time.TimeUtil;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.eclipse.jgit.errors.RepositoryNotFoundException;
import org.eclipse.jgit.internal.storage.file.RefDirectory;
import org.eclipse.jgit.lib.BatchRefUpdate;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.RefDatabase;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.transport.ReceiveCommand;
public class RecursiveApprovalCopier {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final int SLICE_MAX_REFS = 1000;
private final BatchUpdate.Factory batchUpdateFactory;
private final GitRepositoryManager repositoryManager;
private final InternalUser.Factory internalUserFactory;
private final ApprovalsUtil approvalsUtil;
private final ChangeNotes.Factory changeNotesFactory;
private final GitReferenceUpdated gitRefUpdated;
private final ListeningExecutorService executor;
private final ConcurrentHashMap<Project.NameKey, List<ReceiveCommand>> pendingRefUpdates =
new ConcurrentHashMap<>();
private volatile boolean failedForAtLeastOneProject;
private final AtomicInteger totalCopyApprovalsTasks = new AtomicInteger();
private final AtomicInteger finishedCopyApprovalsTasks = new AtomicInteger();
private final AtomicInteger totalRefUpdates = new AtomicInteger();
private final AtomicInteger finishedRefUpdates = new AtomicInteger();
@Inject
public RecursiveApprovalCopier(
BatchUpdate.Factory batchUpdateFactory,
GitRepositoryManager repositoryManager,
InternalUser.Factory internalUserFactory,
ApprovalsUtil approvalsUtil,
ChangeNotes.Factory changeNotesFactory,
GitReferenceUpdated gitRefUpdated,
@FanOutExecutor ExecutorService executor) {
this.batchUpdateFactory = batchUpdateFactory;
this.repositoryManager = repositoryManager;
this.internalUserFactory = internalUserFactory;
this.approvalsUtil = approvalsUtil;
this.changeNotesFactory = changeNotesFactory;
this.gitRefUpdated = gitRefUpdated;
this.executor = MoreExecutors.listeningDecorator(executor);
}
/**
* This method assumes it is used as a standalone program having exclusive access to the Git
* repositories. Therefore, it will (safely) skip locking of the loose refs when performing batch
* ref-updates.
*/
public void persistStandalone()
throws RepositoryNotFoundException, IOException, InterruptedException, ExecutionException {
persist(repositoryManager.list(), null, false);
if (failedForAtLeastOneProject) {
throw new RuntimeException("There were errors, check the logs for details");
}
}
public void persist(Project.NameKey project, @Nullable Consumer<Change> labelsCopiedListener)
throws IOException, RepositoryNotFoundException, InterruptedException, ExecutionException {
persist(ImmutableList.of(project), labelsCopiedListener, true);
}
private void persist(
Collection<Project.NameKey> projects,
@Nullable Consumer<Change> labelsCopiedListener,
boolean shouldLockLooseRefs)
throws InterruptedException, ExecutionException, RepositoryNotFoundException, IOException {
List<ListenableFuture<Void>> copyApprovalsTasks = new LinkedList<>();
for (Project.NameKey project : projects) {
copyApprovalsTasks.addAll(submitCopyApprovalsTasks(project, labelsCopiedListener));
}
Futures.successfulAsList(copyApprovalsTasks).get();
List<ListenableFuture<Void>> batchRefUpdateTasks =
submitBatchRefUpdateTasks(shouldLockLooseRefs);
Futures.successfulAsList(batchRefUpdateTasks).get();
}
private List<ListenableFuture<Void>> submitCopyApprovalsTasks(
Project.NameKey project, @Nullable Consumer<Change> labelsCopiedListener)
throws RepositoryNotFoundException, IOException {
List<ListenableFuture<Void>> futures = new LinkedList<>();
try (Repository repository = repositoryManager.openRepository(project)) {
ImmutableList<Ref> allMetaRefs =
repository.getRefDatabase().getRefsByPrefix(RefNames.REFS_CHANGES).stream()
.filter(r -> r.getName().endsWith(RefNames.META_SUFFIX))
.collect(toImmutableList());
totalCopyApprovalsTasks.addAndGet(allMetaRefs.size());
for (List<Ref> slice : Lists.partition(allMetaRefs, SLICE_MAX_REFS)) {
futures.add(
executor.submit(
() -> {
copyApprovalsForSlice(project, slice, labelsCopiedListener);
return null;
}));
}
}
return futures;
}
private void copyApprovalsForSlice(
Project.NameKey project, List<Ref> slice, @Nullable Consumer<Change> labelsCopiedListener)
throws Exception {
try {
copyApprovalsForSlice(project, slice, labelsCopiedListener, false);
} catch (Exception e) {
failedForAtLeastOneProject = true;
logger.atSevere().withCause(e).log(
"Error in a slice of project %s, will retry and skip corrupt meta-refs", project);
copyApprovalsForSlice(project, slice, labelsCopiedListener, true);
}
logProgress();
}
private void copyApprovalsForSlice(
Project.NameKey project,
List<Ref> slice,
@Nullable Consumer<Change> labelsCopiedListener,
boolean checkForCorruptMetaRefs)
throws Exception {
logger.atInfo().log("copy-approvals for a slice of %s project", project);
try (BatchUpdate bu =
batchUpdateFactory.create(project, internalUserFactory.create(), TimeUtil.nowTs())) {
for (Ref metaRef : slice) {
Change.Id changeId = Change.Id.fromRef(metaRef.getName());
if (checkForCorruptMetaRefs && isCorrupt(project, changeId)) {
logger.atSevere().log("skipping corrupt meta-ref %s", metaRef.getName());
continue;
}
bu.addOp(
changeId,
new RecursiveApprovalCopier.PersistCopiedVotesOp(approvalsUtil, labelsCopiedListener));
}
BatchRefUpdate bru = bu.prepareRefUpdates();
if (bru != null) {
List<ReceiveCommand> cmds = bru.getCommands();
pendingRefUpdates.compute(
project,
(p, u) -> {
if (u == null) {
return new LinkedList<>(cmds);
}
u.addAll(cmds);
return u;
});
totalRefUpdates.addAndGet(cmds.size());
}
finishedCopyApprovalsTasks.addAndGet(slice.size());
}
}
private List<ListenableFuture<Void>> submitBatchRefUpdateTasks(boolean shouldLockLooseRefs) {
logger.atInfo().log("submitting batch ref-update tasks");
List<ListenableFuture<Void>> futures = new LinkedList<>();
for (Map.Entry<Project.NameKey, List<ReceiveCommand>> e : pendingRefUpdates.entrySet()) {
Project.NameKey project = e.getKey();
List<ReceiveCommand> updates = e.getValue();
futures.add(
executor.submit(
() -> {
executeRefUpdates(project, updates, shouldLockLooseRefs);
return null;
}));
}
return futures;
}
private void executeRefUpdates(
Project.NameKey project, List<ReceiveCommand> updates, boolean shouldLockLooseRefs)
throws RepositoryNotFoundException, IOException {
logger.atInfo().log(
"executing batch ref-update for project %s, size %d", project, updates.size());
try (Repository repository = repositoryManager.openRepository(project)) {
RefDatabase refdb = repository.getRefDatabase();
BatchRefUpdate bu;
if (refdb instanceof RefDirectory) {
bu = ((RefDirectory) refdb).newBatchUpdate(shouldLockLooseRefs);
} else {
bu = refdb.newBatchUpdate();
}
bu.addCommand(updates);
RefUpdateUtil.executeChecked(bu, repository);
gitRefUpdated.fire(project, bu, null);
finishedRefUpdates.addAndGet(updates.size());
logProgress();
}
}
private boolean isCorrupt(Project.NameKey project, Change.Id changeId) {
Change c = ChangeNotes.Factory.newChange(project, changeId);
try {
changeNotesFactory.createForBatchUpdate(c, true);
return false;
} catch (Exception e) {
logger.atSevere().withCause(e).log(e.getMessage());
return true;
}
}
public void persist(Change change) throws UpdateException, RestApiException {
Project.NameKey project = change.getProject();
try (BatchUpdate bu =
batchUpdateFactory.create(project, internalUserFactory.create(), TimeUtil.nowTs())) {
Change.Id changeId = change.getId();
bu.addOp(changeId, new PersistCopiedVotesOp(approvalsUtil, null));
bu.execute();
}
}
private void logProgress() {
logger.atInfo().log(
"copy-approvals tasks done: %d/%d, ref-update tasks done: %d/%d",
finishedCopyApprovalsTasks.get(),
totalCopyApprovalsTasks.get(),
finishedRefUpdates.get(),
totalRefUpdates.get());
}
private static class PersistCopiedVotesOp implements BatchUpdateOp {
private final ApprovalsUtil approvalsUtil;
private final Consumer<Change> listener;
PersistCopiedVotesOp(
ApprovalsUtil approvalsUtil, @Nullable Consumer<Change> labelsCopiedListener) {
this.approvalsUtil = approvalsUtil;
this.listener = labelsCopiedListener;
}
@Override
public boolean updateChange(ChangeContext ctx) throws IOException {
Change change = ctx.getChange();
ChangeUpdate update = ctx.getUpdate(change.currentPatchSetId(), change.getLastUpdatedOn());
approvalsUtil.persistCopiedApprovals(
ctx.getNotes(),
ctx.getNotes().getCurrentPatchSet(),
ctx.getRevWalk(),
ctx.getRepoView().getConfig(),
update);
boolean labelsCopied = update.hasCopiedApprovals();
if (labelsCopied && listener != null) {
listener.accept(change);
}
return labelsCopied;
}
}
}