blob: cb1af0775933cc112cfc0d18f14de6d48ad11a51 [file] [log] [blame]
// Copyright (C) 2012 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.git.receive;
import static com.google.common.base.Preconditions.checkState;
import static com.google.gerrit.server.quota.QuotaGroupDefinitions.REPOSITORY_SIZE_GROUP;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.FluentLogger;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.common.UsedAt;
import com.google.gerrit.common.data.Capable;
import com.google.gerrit.entities.Change;
import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.restapi.AuthException;
import com.google.gerrit.metrics.Counter0;
import com.google.gerrit.metrics.Description;
import com.google.gerrit.metrics.Description.Units;
import com.google.gerrit.metrics.Field;
import com.google.gerrit.metrics.Histogram1;
import com.google.gerrit.metrics.MetricMaker;
import com.google.gerrit.metrics.Timer1;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.PublishCommentsOp;
import com.google.gerrit.server.cache.PerThreadCache;
import com.google.gerrit.server.config.AllUsersName;
import com.google.gerrit.server.config.ConfigUtil;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.ReceiveCommitsExecutor;
import com.google.gerrit.server.git.MultiProgressMonitor;
import com.google.gerrit.server.git.MultiProgressMonitor.TaskKind;
import com.google.gerrit.server.git.PermissionAwareRepositoryManager;
import com.google.gerrit.server.git.ProjectRunnable;
import com.google.gerrit.server.git.TransferConfig;
import com.google.gerrit.server.git.UsersSelfAdvertiseRefsHook;
import com.google.gerrit.server.logging.Metadata;
import com.google.gerrit.server.permissions.PermissionBackend;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gerrit.server.permissions.ProjectPermission;
import com.google.gerrit.server.project.ContributorAgreementsChecker;
import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.query.change.InternalChangeQuery;
import com.google.gerrit.server.quota.QuotaBackend;
import com.google.gerrit.server.quota.QuotaException;
import com.google.gerrit.server.quota.QuotaResponse;
import com.google.gerrit.server.util.MagicBranch;
import com.google.gerrit.server.util.RequestScopePropagator;
import com.google.inject.Inject;
import com.google.inject.PrivateModule;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.name.Named;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.transport.PreReceiveHook;
import org.eclipse.jgit.transport.ReceiveCommand;
import org.eclipse.jgit.transport.ReceiveCommand.Result;
import org.eclipse.jgit.transport.ReceivePack;
/**
* Hook that delegates to {@link ReceiveCommits} in a worker thread.
*
* <p>Since the work that {@link ReceiveCommits} does may take a long, potentially unbounded amount
* of time, it runs in the background so it can be monitored for timeouts and cancelled, and have
* stalls reported to the user from the main thread.
*/
public class AsyncReceiveCommits {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final String RECEIVE_OVERALL_TIMEOUT_NAME = "ReceiveCommitsOverallTimeout";
private static final String RECEIVE_CANCELLATION_TIMEOUT_NAME =
"ReceiveCommitsCancellationTimeout";
public interface Factory {
AsyncReceiveCommits create(
ProjectState projectState,
IdentifiedUser user,
Repository repository,
@Nullable MessageSender messageSender);
}
public static class AsyncReceiveCommitsModule extends PrivateModule {
@Override
public void configure() {
install(new FactoryModuleBuilder().build(LazyPostReceiveHookChain.Factory.class));
install(new FactoryModuleBuilder().build(AsyncReceiveCommits.Factory.class));
expose(AsyncReceiveCommits.Factory.class);
// Don't expose the binding for ReceiveCommits.Factory. All callers should
// be using AsyncReceiveCommits.Factory instead.
install(new FactoryModuleBuilder().build(ReceiveCommits.Factory.class));
install(new FactoryModuleBuilder().build(PublishCommentsOp.Factory.class));
install(new FactoryModuleBuilder().build(BranchCommitValidator.Factory.class));
}
@Provides
@Singleton
@Named(RECEIVE_OVERALL_TIMEOUT_NAME)
long getReceiveTimeoutMillis(@GerritServerConfig Config cfg) {
return ConfigUtil.getTimeUnit(
cfg, "receive", null, "timeout", TimeUnit.MINUTES.toMillis(4), TimeUnit.MILLISECONDS);
}
@Provides
@Singleton
@Named(RECEIVE_CANCELLATION_TIMEOUT_NAME)
long getCancellationTimeoutMillis(@GerritServerConfig Config cfg) {
return ConfigUtil.getTimeUnit(
cfg,
"receive",
null,
"cancellationTimeout",
TimeUnit.SECONDS.toMillis(5),
TimeUnit.MILLISECONDS);
}
}
private static MultiProgressMonitor newMultiProgressMonitor(
MultiProgressMonitor.Factory multiProgressMonitorFactory, MessageSender messageSender) {
return multiProgressMonitorFactory.create(
new OutputStream() {
@Override
public void write(int b) {
messageSender.sendBytes(new byte[] {(byte) b});
}
@Override
public void write(byte[] what, int off, int len) {
messageSender.sendBytes(what, off, len);
}
@Override
public void write(byte[] what) {
messageSender.sendBytes(what);
}
@Override
public void flush() {
messageSender.flush();
}
},
TaskKind.RECEIVE_COMMITS,
"Processing changes");
}
private enum PushType {
CREATE_REPLACE,
NORMAL,
AUTOCLOSE,
}
@Singleton
private static class Metrics {
private final Histogram1<PushType> changes;
private final Timer1<PushType> latencyPerChange;
private final Timer1<PushType> latencyPerPush;
private final Counter0 timeouts;
@Inject
Metrics(MetricMaker metricMaker) {
// For the changes metric the push type field is never set to PushType.NORMAL, hence it is not
// mentioned in the field description.
changes =
metricMaker.newHistogram(
"receivecommits/changes_per_push",
new Description("number of changes uploaded in a single push.").setCumulative(),
Field.ofEnum(PushType.class, "type", Metadata.Builder::pushType)
.description("type of push (create/replace, autoclose)")
.build());
Field<PushType> pushTypeField =
Field.ofEnum(PushType.class, "type", Metadata.Builder::pushType)
.description("type of push (create/replace, autoclose, normal)")
.build();
latencyPerChange =
metricMaker.newTimer(
"receivecommits/latency_per_push_per_change",
new Description(
"Processing delay per push divided by the number of changes in said push. "
+ "(Only includes pushes which contain changes.)")
.setUnit(Units.MILLISECONDS)
.setCumulative(),
pushTypeField);
latencyPerPush =
metricMaker.newTimer(
"receivecommits/latency_per_push",
new Description("processing delay for a processing single push")
.setUnit(Units.MILLISECONDS)
.setCumulative(),
pushTypeField);
timeouts =
metricMaker.newCounter(
"receivecommits/timeout", new Description("rate of push timeouts").setRate());
}
}
private final MultiProgressMonitor.Factory multiProgressMonitorFactory;
private final Metrics metrics;
private final ReceiveCommits receiveCommits;
private final PermissionBackend.ForProject perm;
private final ReceivePack receivePack;
private final ExecutorService executor;
private final RequestScopePropagator scopePropagator;
private final ReceiveConfig receiveConfig;
private final ContributorAgreementsChecker contributorAgreements;
private final long receiveTimeoutMillis;
private final long cancellationTimeoutMillis;
private final ProjectState projectState;
private final IdentifiedUser user;
private final Repository repo;
private final AllRefsWatcher allRefsWatcher;
@Inject
AsyncReceiveCommits(
MultiProgressMonitor.Factory multiProgressMonitorFactory,
ReceiveCommits.Factory factory,
PermissionBackend permissionBackend,
Provider<InternalChangeQuery> queryProvider,
@ReceiveCommitsExecutor ExecutorService executor,
RequestScopePropagator scopePropagator,
ReceiveConfig receiveConfig,
TransferConfig transferConfig,
LazyPostReceiveHookChain.Factory lazyPostReceive,
ContributorAgreementsChecker contributorAgreements,
Metrics metrics,
QuotaBackend quotaBackend,
UsersSelfAdvertiseRefsHook usersSelfAdvertiseRefsHook,
AllUsersName allUsersName,
@Named(RECEIVE_OVERALL_TIMEOUT_NAME) long receiveTimeoutMillis,
@Named(RECEIVE_CANCELLATION_TIMEOUT_NAME) long cancellationTimeoutMillis,
@Assisted ProjectState projectState,
@Assisted IdentifiedUser user,
@Assisted Repository repo,
@Assisted @Nullable MessageSender messageSender)
throws PermissionBackendException {
this.multiProgressMonitorFactory = multiProgressMonitorFactory;
this.executor = executor;
this.scopePropagator = scopePropagator;
this.receiveConfig = receiveConfig;
this.contributorAgreements = contributorAgreements;
this.receiveTimeoutMillis = receiveTimeoutMillis;
this.cancellationTimeoutMillis = cancellationTimeoutMillis;
this.projectState = projectState;
this.user = user;
this.repo = repo;
this.metrics = metrics;
// If the user lacks READ permission, some references may be filtered and hidden from view.
// Check objects mentioned inside the incoming pack file are reachable from visible refs.
Project.NameKey projectName = projectState.getNameKey();
this.perm = permissionBackend.user(user).project(projectName);
receivePack = new ReceivePack(PermissionAwareRepositoryManager.wrap(repo, perm));
receivePack.setAllowCreates(true);
receivePack.setAllowDeletes(true);
receivePack.setAllowNonFastForwards(true);
receivePack.setRefLogIdent(user.newRefLogIdent());
receivePack.setTimeout(transferConfig.getTimeout());
receivePack.setMaxObjectSizeLimit(projectState.getEffectiveMaxObjectSizeLimit().value);
receivePack.setCheckReceivedObjects(projectState.getConfig().getCheckReceivedObjects());
receivePack.setRefFilter(new ReceiveRefFilter());
receivePack.setAllowPushOptions(true);
receivePack.setPreReceiveHook(asHook());
receivePack.setPostReceiveHook(lazyPostReceive.create(user, projectName));
if (!projectState.statePermitsRead() || !this.perm.test(ProjectPermission.READ)) {
receivePack.setCheckReferencedObjectsAreReachable(
receiveConfig.checkReferencedObjectsAreReachable);
}
allRefsWatcher = new AllRefsWatcher();
receivePack.setAdvertiseRefsHook(
ReceiveCommitsAdvertiseRefsHookChain.create(
allRefsWatcher,
usersSelfAdvertiseRefsHook,
allUsersName,
queryProvider,
projectName,
user.getAccountId()));
receiveCommits =
factory.create(projectState, user, receivePack, repo, allRefsWatcher, messageSender);
receiveCommits.init();
QuotaResponse.Aggregated availableTokens =
quotaBackend.user(user).project(projectName).availableTokens(REPOSITORY_SIZE_GROUP);
try {
availableTokens.throwOnError();
} catch (QuotaException e) {
logger.atWarning().withCause(e).log(
"Quota %s availableTokens request failed for project %s",
REPOSITORY_SIZE_GROUP, projectName);
throw new RuntimeException(e);
}
availableTokens.availableTokens().ifPresent(receivePack::setMaxPackSizeLimit);
}
/** Determine if the user can upload commits. */
public Capable canUpload() throws IOException, PermissionBackendException {
if (!perm.test(ProjectPermission.PUSH_AT_LEAST_ONE_REF)) {
return new Capable("Upload denied for project '" + projectState.getName() + "'");
}
try {
contributorAgreements.check(projectState.getNameKey(), user);
} catch (AuthException e) {
return new Capable(e.getMessage());
}
if (receiveConfig.checkMagicRefs) {
return MagicBranch.checkMagicBranchRefs(repo, projectState.getProject());
}
return Capable.OK;
}
/**
* Returns a {@link PreReceiveHook} implementation that can be used directly by JGit when
* processing a push.
*/
public PreReceiveHook asHook() {
return (rp, commands) -> {
checkState(receivePack == rp, "can't perform PreReceive for a different receive pack");
long startNanos = System.nanoTime();
ReceiveCommitsResult result;
try {
result = preReceive(commands);
} catch (TimeoutException e) {
receivePack.sendError("timeout while processing changes");
rejectCommandsNotAttempted(commands);
return;
} catch (Exception e) {
logger.atSevere().withCause(e.getCause()).log("error while processing push");
receivePack.sendError("internal error");
rejectCommandsNotAttempted(commands);
return;
} finally {
// Flush the messages queued up until now (if any).
receiveCommits.sendMessages();
}
reportMetrics(result, System.nanoTime() - startNanos);
};
}
/** Processes {@code commands}, applies them to Git storage and communicates back on the wire. */
@UsedAt(UsedAt.Project.GOOGLE)
public ReceiveCommitsResult preReceive(Collection<ReceiveCommand> commands)
throws TimeoutException, UncheckedExecutionException {
if (commands.stream().anyMatch(c -> c.getResult() != Result.NOT_ATTEMPTED)) {
// Stop processing when command was already processed by previously invoked
// pre-receive hooks
return ReceiveCommitsResult.empty();
}
String currentThreadName = Thread.currentThread().getName();
MultiProgressMonitor monitor =
newMultiProgressMonitor(multiProgressMonitorFactory, receiveCommits.getMessageSender());
Callable<ReceiveCommitsResult> callable =
() -> {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(oldName + "-for-" + currentThreadName);
try (PerThreadCache threadLocalCache = PerThreadCache.create()) {
return receiveCommits.processCommands(commands, monitor);
} finally {
Thread.currentThread().setName(oldName);
}
};
try {
// WorkQueue does not support Callable<T>, so we have to covert it here.
FutureTask<ReceiveCommitsResult> runnable =
ProjectRunnable.fromCallable(
callable, receiveCommits.getProject().getNameKey(), "receive-commits", null, false);
monitor.waitFor(
executor.submit(scopePropagator.wrap(runnable)),
receiveTimeoutMillis,
TimeUnit.MILLISECONDS,
cancellationTimeoutMillis,
TimeUnit.MILLISECONDS);
if (!runnable.isDone()) {
// At this point we are either done or have thrown a TimeoutException and bailed out.
throw new IllegalStateException("unable to get receive commits result");
}
return runnable.get();
} catch (TimeoutException e) {
metrics.timeouts.increment();
logger.atWarning().withCause(e).log(
"Timeout in ReceiveCommits while processing changes for project %s",
projectState.getName());
throw e;
} catch (InterruptedException | ExecutionException e) {
throw new UncheckedExecutionException(e);
}
}
@UsedAt(UsedAt.Project.GOOGLE)
public void reportMetrics(ReceiveCommitsResult result, long deltaNanos) {
PushType pushType;
int totalChanges = 0;
if (result.magicPush()) {
pushType = PushType.CREATE_REPLACE;
ImmutableSet<Change.Id> created =
result.changes().get(ReceiveCommitsResult.ChangeStatus.CREATED);
ImmutableSet<Change.Id> replaced =
result.changes().get(ReceiveCommitsResult.ChangeStatus.REPLACED);
metrics.changes.record(pushType, created.size() + replaced.size());
totalChanges = replaced.size() + created.size();
} else {
ImmutableSet<Change.Id> autoclosed =
result.changes().get(ReceiveCommitsResult.ChangeStatus.AUTOCLOSED);
if (!autoclosed.isEmpty()) {
pushType = PushType.AUTOCLOSE;
metrics.changes.record(pushType, autoclosed.size());
totalChanges = autoclosed.size();
} else {
pushType = PushType.NORMAL;
}
}
if (totalChanges > 0) {
metrics.latencyPerChange.record(pushType, deltaNanos / totalChanges, NANOSECONDS);
}
metrics.latencyPerPush.record(pushType, deltaNanos, NANOSECONDS);
}
/**
* Sends all messages which have been collected while processing the push to the client.
*
* @see ReceiveCommits#sendMessages()
*/
@UsedAt(UsedAt.Project.GOOGLE)
public void sendMessages() {
receiveCommits.sendMessages();
}
public ReceivePack getReceivePack() {
return receivePack;
}
/**
* Marks all commands that were not processed yet as {@link Result#REJECTED_OTHER_REASON}.
* Intended to be used to finish up remaining commands when errors occur during processing.
*/
private static void rejectCommandsNotAttempted(Collection<ReceiveCommand> commands) {
for (ReceiveCommand c : commands) {
if (c.getResult() == Result.NOT_ATTEMPTED) {
c.setResult(Result.REJECTED_OTHER_REASON, "internal error");
}
}
}
}