blob: 4cff68eaab75d729bb71681de9f3bfe3a2070464 [file] [log] [blame]
// Copyright (C) 2009 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.plugins.replication;
import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toMap;
import com.google.common.base.Throwables;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Sets;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.RefNames;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.restapi.AuthException;
import com.google.gerrit.extensions.restapi.ResourceConflictException;
import com.google.gerrit.metrics.Timer1;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.PerThreadRequestScope;
import com.google.gerrit.server.git.ProjectRunnable;
import com.google.gerrit.server.git.WorkQueue.CanceledWhileRunning;
import com.google.gerrit.server.ioutil.HexFormat;
import com.google.gerrit.server.permissions.PermissionBackend;
import com.google.gerrit.server.permissions.PermissionBackend.RefFilterOptions;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gerrit.server.permissions.ProjectPermission;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.util.IdGenerator;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
import com.jcraft.jsch.JSchException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jgit.errors.NoRemoteRepositoryException;
import org.eclipse.jgit.errors.NotSupportedException;
import org.eclipse.jgit.errors.RemoteRepositoryException;
import org.eclipse.jgit.errors.RepositoryNotFoundException;
import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.NullProgressMonitor;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.transport.CredentialsProvider;
import org.eclipse.jgit.transport.FetchConnection;
import org.eclipse.jgit.transport.PushResult;
import org.eclipse.jgit.transport.RefSpec;
import org.eclipse.jgit.transport.RemoteConfig;
import org.eclipse.jgit.transport.RemoteRefUpdate;
import org.eclipse.jgit.transport.Transport;
import org.eclipse.jgit.transport.URIish;
import org.slf4j.MDC;
/**
* A push to remote operation started by {@link GitReferenceUpdatedListener}.
*
* <p>Instance members are protected by the lock within PushQueue. Callers must take that lock to
* ensure they are working with a current view of the object.
*/
class PushOne implements ProjectRunnable, CanceledWhileRunning {
private final ReplicationStateListener stateLog;
static final String ALL_REFS = "..all..";
static final String ID_MDC_KEY = "pushOneId";
interface Factory {
PushOne create(Project.NameKey d, URIish u);
}
private final GitRepositoryManager gitManager;
private final PermissionBackend permissionBackend;
private final Destination pool;
private final RemoteConfig config;
private final ReplicationConfig replConfig;
private final CredentialsProvider credentialsProvider;
private final PerThreadRequestScope.Scoper threadScoper;
private final Project.NameKey projectName;
private final URIish uri;
private final Set<String> delta = Sets.newHashSetWithExpectedSize(4);
private boolean pushAllRefs;
private Repository git;
private boolean retrying;
private int retryCount;
private final int maxRetries;
private boolean canceled;
private final ListMultimap<String, ReplicationState> stateMap = LinkedListMultimap.create();
private final int maxLockRetries;
private int lockRetryCount;
private final int id;
private final long createdAt;
private final ReplicationMetrics metrics;
private final ProjectCache projectCache;
private final CreateProjectTask.Factory createProjectFactory;
private final AtomicBoolean canceledWhileRunning;
private final TransportFactory transportFactory;
private DynamicItem<ReplicationPushFilter> replicationPushFilter;
@Inject
PushOne(
GitRepositoryManager grm,
PermissionBackend permissionBackend,
Destination p,
RemoteConfig c,
ReplicationConfig rc,
CredentialsFactory cpFactory,
PerThreadRequestScope.Scoper ts,
IdGenerator ig,
ReplicationStateListeners sl,
ReplicationMetrics m,
ProjectCache pc,
CreateProjectTask.Factory cpf,
TransportFactory tf,
@Assisted Project.NameKey d,
@Assisted URIish u) {
gitManager = grm;
this.permissionBackend = permissionBackend;
pool = p;
config = c;
replConfig = rc;
credentialsProvider = cpFactory.create(c.getName());
threadScoper = ts;
projectName = d;
uri = u;
lockRetryCount = 0;
maxLockRetries = pool.getLockErrorMaxRetries();
id = ig.next();
stateLog = sl;
createdAt = System.nanoTime();
metrics = m;
projectCache = pc;
createProjectFactory = cpf;
canceledWhileRunning = new AtomicBoolean(false);
maxRetries = p.getMaxRetries();
transportFactory = tf;
}
@Inject(optional = true)
public void setReplicationPushFilter(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
this.replicationPushFilter = replicationPushFilter;
}
@Override
public void cancel() {
repLog.atInfo().log("Replication [%s] to %s was canceled", HexFormat.fromInt(id), getURI());
canceledByReplication();
pool.pushWasCanceled(this);
}
@Override
public void setCanceledWhileRunning() {
repLog.atInfo().log(
"Replication [%s] to %s was canceled while being executed",
HexFormat.fromInt(id), getURI());
canceledWhileRunning.set(true);
}
@Override
public Project.NameKey getProjectNameKey() {
return projectName;
}
@Override
public String getRemoteName() {
return config.getName();
}
@Override
public boolean hasCustomizedPrint() {
return true;
}
@Override
public String toString() {
String print = "[" + HexFormat.fromInt(id) + "] push " + uri;
if (retryCount > 0) {
print = "(retry " + retryCount + ") " + print;
}
return print;
}
boolean isRetrying() {
return retrying;
}
boolean setToRetry() {
retrying = true;
retryCount++;
return maxRetries == 0 || retryCount <= maxRetries;
}
void canceledByReplication() {
canceled = true;
}
boolean wasCanceled() {
return canceled || canceledWhileRunning.get();
}
URIish getURI() {
return uri;
}
void addRef(String ref) {
if (ALL_REFS.equals(ref)) {
delta.clear();
pushAllRefs = true;
repLog.atFinest().log("Added all refs for replication to %s", uri);
} else if (!pushAllRefs) {
delta.add(ref);
repLog.atFinest().log("Added ref %s for replication to %s", ref, uri);
}
}
Set<String> getRefs() {
return pushAllRefs ? Sets.newHashSet(ALL_REFS) : delta;
}
void addRefs(Set<String> refs) {
if (!pushAllRefs) {
for (String ref : refs) {
addRef(ref);
}
}
}
void addState(String ref, ReplicationState state) {
stateMap.put(ref, state);
}
ListMultimap<String, ReplicationState> getStates() {
return stateMap;
}
ReplicationState[] getStatesAsArray() {
Set<ReplicationState> statesSet = new HashSet<>();
statesSet.addAll(stateMap.values());
return statesSet.toArray(new ReplicationState[statesSet.size()]);
}
ReplicationState[] getStatesByRef(String ref) {
Collection<ReplicationState> states = stateMap.get(ref);
return states.toArray(new ReplicationState[states.size()]);
}
public int getId() {
return id;
}
void addStates(ListMultimap<String, ReplicationState> states) {
stateMap.putAll(states);
}
void removeStates() {
stateMap.clear();
}
private void statesCleanUp() {
if (!stateMap.isEmpty() && !isRetrying()) {
for (Map.Entry<String, ReplicationState> entry : stateMap.entries()) {
entry
.getValue()
.notifyRefReplicated(
projectName.get(), entry.getKey(), uri, RefPushResult.FAILED, null);
}
}
}
@Override
public void run() {
try {
threadScoper
.scope(
() -> {
runPushOperation();
return null;
})
.call();
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally {
statesCleanUp();
}
}
private void runPushOperation() {
// Lock the queue, and remove ourselves, so we can't be modified once
// we start replication (instead a new instance, with the same URI, is
// created and scheduled for a future point in time.)
//
MDC.put(ID_MDC_KEY, HexFormat.fromInt(id));
RunwayStatus status = pool.requestRunway(this);
if (!status.isAllowed()) {
if (status.isCanceled()) {
repLog.atInfo().log(
"PushOp for replication to %s was canceled and thus won't be rescheduled", uri);
} else {
repLog.atInfo().log(
"Rescheduling replication to %s to avoid collision with the in-flight push [%s].",
uri, HexFormat.fromInt(status.getInFlightPushId()));
pool.reschedule(this, Destination.RetryReason.COLLISION);
}
return;
}
repLog.atInfo().log("Replication to %s started...", uri);
Timer1.Context<String> destinationContext = metrics.start(config.getName());
try {
long startedAt = destinationContext.getStartTime();
long delay = NANOSECONDS.toMillis(startedAt - createdAt);
metrics.record(config.getName(), delay, retryCount);
git = gitManager.openRepository(projectName);
runImpl();
long elapsed = NANOSECONDS.toMillis(destinationContext.stop());
if (elapsed > SECONDS.toMillis(pool.getSlowLatencyThreshold())) {
metrics.recordSlowProjectReplication(
config.getName(), projectName.get(), pool.getSlowLatencyThreshold(), elapsed);
}
repLog.atInfo().log(
"Replication to %s completed in %dms, %dms delay, %d retries",
uri, elapsed, delay, retryCount);
} catch (RepositoryNotFoundException e) {
stateLog.error(
"Cannot replicate " + projectName + "; Local repository error: " + e.getMessage(),
getStatesAsArray());
} catch (RemoteRepositoryException e) {
// Tried to replicate to a remote via anonymous git:// but the repository
// does not exist. In this case NoRemoteRepositoryException is not
// raised.
String msg = e.getMessage();
if (msg.contains("access denied")
|| msg.contains("no such repository")
|| msg.contains("Git repository not found")
|| msg.contains("unavailable")) {
createRepository();
} else {
repLog.atSevere().log("Cannot replicate %s; Remote repository error: %s", projectName, msg);
}
} catch (NoRemoteRepositoryException e) {
createRepository();
} catch (NotSupportedException e) {
stateLog.error("Cannot replicate to " + uri, e, getStatesAsArray());
} catch (TransportException e) {
Throwable cause = e.getCause();
if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) {
repLog.atSevere().log("Cannot replicate to %s: %s", uri, cause.getMessage());
} else if (e instanceof LockFailureException) {
lockRetryCount++;
repLog.atSevere().log("Cannot replicate to %s due to lock failure", uri);
// The remote push operation should be retried.
if (lockRetryCount <= maxLockRetries) {
if (canceledWhileRunning.get()) {
logCanceledWhileRunningException(e);
} else {
pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR);
}
} else {
repLog.atSevere().log(
"Giving up after %d lock failures during replication to %s", lockRetryCount, uri);
}
} else {
if (canceledWhileRunning.get()) {
logCanceledWhileRunningException(e);
} else {
repLog.atSevere().withCause(e).log("Cannot replicate to %s", uri);
// The remote push operation should be retried.
pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR);
}
}
} catch (IOException e) {
stateLog.error("Cannot replicate to " + uri, e, getStatesAsArray());
} catch (PermissionBackendException | RuntimeException | Error e) {
stateLog.error("Unexpected error during replication to " + uri, e, getStatesAsArray());
} finally {
pool.notifyFinished(this);
if (git != null) {
git.close();
}
}
}
private void logCanceledWhileRunningException(TransportException e) {
repLog.atInfo().withCause(e).log("Cannot replicate to %s. It was canceled while running", uri);
}
private void createRepository() {
if (pool.isCreateMissingRepos()) {
try {
Ref head = git.exactRef(Constants.HEAD);
if (createProject(projectName, head != null ? getName(head) : null)) {
repLog.atWarning().log("Missing repository created; retry replication to %s", uri);
pool.reschedule(this, Destination.RetryReason.REPOSITORY_MISSING);
} else {
repLog.atWarning().log(
"Missing repository could not be created when replicating %s", uri);
}
} catch (IOException ioe) {
stateLog.error(
"Cannot replicate to " + uri + "; failed to create missing repository",
ioe,
getStatesAsArray());
}
} else {
stateLog.error("Cannot replicate to " + uri + "; repository not found", getStatesAsArray());
}
}
private boolean createProject(Project.NameKey project, String head) {
return createProjectFactory.create(project, head).create();
}
private String getName(Ref ref) {
Ref target = ref;
while (target.isSymbolic()) {
target = target.getTarget();
}
return target.getName();
}
private void runImpl() throws IOException, PermissionBackendException {
PushResult res;
try (Transport tn = transportFactory.open(git, uri)) {
res = pushVia(tn);
}
updateStates(res.getRemoteUpdates());
}
private PushResult pushVia(Transport tn) throws IOException, PermissionBackendException {
tn.applyConfig(config);
tn.setCredentialsProvider(credentialsProvider);
List<RemoteRefUpdate> todo = generateUpdates(tn);
if (todo.isEmpty()) {
// If we have no commands selected, we have nothing to do.
// Calling JGit at this point would just redo the work we
// already did, and come up with the same answer. Instead
// send back an empty result.
return new PushResult();
}
if (replConfig.getMaxRefsToLog() == 0 || todo.size() <= replConfig.getMaxRefsToLog()) {
repLog.atInfo().log("Push to %s references: %s", uri, todo);
} else {
repLog.atInfo().log(
"Push to %s references (first %d of %d listed): %s",
uri,
replConfig.getMaxRefsToLog(),
todo.size(),
todo.subList(0, replConfig.getMaxRefsToLog()));
}
return tn.push(NullProgressMonitor.INSTANCE, todo);
}
private List<RemoteRefUpdate> generateUpdates(Transport tn)
throws IOException, PermissionBackendException {
ProjectState projectState = projectCache.checkedGet(projectName);
if (projectState == null) {
return Collections.emptyList();
}
Map<String, Ref> local =
git.getRefDatabase().getRefs().stream().collect(toMap(Ref::getName, r -> r));
boolean filter;
PermissionBackend.ForProject forProject = permissionBackend.currentUser().project(projectName);
try {
projectState.checkStatePermitsRead();
forProject.check(ProjectPermission.READ);
filter = false;
} catch (AuthException | ResourceConflictException e) {
filter = true;
}
if (filter) {
if (!pushAllRefs) {
// If we aren't mirroring, reduce the space we need to filter
// to only the references we will update during this operation.
//
Map<String, Ref> n = new HashMap<>();
for (String src : delta) {
Ref r = local.get(src);
if (r != null) {
n.put(src, r);
}
}
local = n;
}
local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build());
}
List<RemoteRefUpdate> remoteUpdatesList =
pushAllRefs ? doPushAll(tn, local) : doPushDelta(local);
return replicationPushFilter == null || replicationPushFilter.get() == null
? remoteUpdatesList
: replicationPushFilter.get().filter(projectName.get(), remoteUpdatesList);
}
private List<RemoteRefUpdate> doPushAll(Transport tn, Map<String, Ref> local) throws IOException {
List<RemoteRefUpdate> cmds = new ArrayList<>();
boolean noPerms = !pool.isReplicatePermissions();
Map<String, Ref> remote = listRemote(tn);
for (Ref src : local.values()) {
if (!canPushRef(src.getName(), noPerms)) {
repLog.atFine().log("Skipping push of ref %s", src.getName());
continue;
}
RefSpec spec = matchSrc(src.getName());
if (spec != null) {
Ref dst = remote.get(spec.getDestination());
if (dst == null || !src.getObjectId().equals(dst.getObjectId())) {
// Doesn't exist yet, or isn't the same value, request to push.
push(cmds, spec, src);
}
}
}
if (config.isMirror()) {
for (Ref ref : remote.values()) {
if (Constants.HEAD.equals(ref.getName())) {
repLog.atFine().log("Skipping deletion of %s", ref.getName());
continue;
}
RefSpec spec = matchDst(ref.getName());
if (spec != null && !local.containsKey(spec.getSource())) {
// No longer on local side, request removal.
delete(cmds, spec);
}
}
}
return cmds;
}
private List<RemoteRefUpdate> doPushDelta(Map<String, Ref> local) throws IOException {
List<RemoteRefUpdate> cmds = new ArrayList<>();
boolean noPerms = !pool.isReplicatePermissions();
for (String src : delta) {
RefSpec spec = matchSrc(src);
if (spec != null) {
// If the ref still exists locally, send it, otherwise delete it.
Ref srcRef = local.get(src);
// Second try to ensure that the ref is truly not found locally
if (srcRef == null) {
srcRef = git.exactRef(src);
}
if (srcRef != null && canPushRef(src, noPerms)) {
push(cmds, spec, srcRef);
} else if (config.isMirror()) {
delete(cmds, spec);
}
}
}
return cmds;
}
private boolean canPushRef(String ref, boolean noPerms) {
return !(noPerms && RefNames.REFS_CONFIG.equals(ref))
&& !ref.startsWith(RefNames.REFS_CACHE_AUTOMERGE);
}
private Map<String, Ref> listRemote(Transport tn)
throws NotSupportedException, TransportException {
try (FetchConnection fc = tn.openFetch()) {
return fc.getRefsMap();
}
}
private RefSpec matchSrc(String ref) {
for (RefSpec s : config.getPushRefSpecs()) {
if (s.matchSource(ref)) {
return s.expandFromSource(ref);
}
}
return null;
}
private RefSpec matchDst(String ref) {
for (RefSpec s : config.getPushRefSpecs()) {
if (s.matchDestination(ref)) {
return s.expandFromDestination(ref);
}
}
return null;
}
private void push(List<RemoteRefUpdate> cmds, RefSpec spec, Ref src) throws IOException {
String dst = spec.getDestination();
boolean force = spec.isForceUpdate();
cmds.add(new RemoteRefUpdate(git, src, dst, force, null, null));
}
private void delete(List<RemoteRefUpdate> cmds, RefSpec spec) throws IOException {
String dst = spec.getDestination();
boolean force = spec.isForceUpdate();
cmds.add(new RemoteRefUpdate(git, (Ref) null, dst, force, null, null));
}
private void updateStates(Collection<RemoteRefUpdate> refUpdates) throws LockFailureException {
Set<String> doneRefs = new HashSet<>();
boolean anyRefFailed = false;
RemoteRefUpdate.Status lastRefStatusError = RemoteRefUpdate.Status.OK;
for (RemoteRefUpdate u : refUpdates) {
RefPushResult pushStatus = RefPushResult.SUCCEEDED;
Set<ReplicationState> logStates = new HashSet<>();
logStates.addAll(stateMap.get(u.getSrcRef()));
logStates.addAll(stateMap.get(ALL_REFS));
ReplicationState[] logStatesArray = logStates.toArray(new ReplicationState[logStates.size()]);
doneRefs.add(u.getSrcRef());
switch (u.getStatus()) {
case OK:
case UP_TO_DATE:
case NON_EXISTING:
break;
case NOT_ATTEMPTED:
case AWAITING_REPORT:
case REJECTED_NODELETE:
case REJECTED_NONFASTFORWARD:
case REJECTED_REMOTE_CHANGED:
stateLog.error(
String.format(
"Failed replicate of %s to %s: status %s", u.getRemoteName(), uri, u.getStatus()),
logStatesArray);
pushStatus = RefPushResult.FAILED;
anyRefFailed = true;
lastRefStatusError = u.getStatus();
break;
case REJECTED_OTHER_REASON:
if ("non-fast-forward".equals(u.getMessage())) {
stateLog.error(
String.format(
"Failed replicate of %s to %s"
+ ", remote rejected non-fast-forward push."
+ " Check receive.denyNonFastForwards variable in config file"
+ " of destination repository.",
u.getRemoteName(), uri),
logStatesArray);
} else if ("failed to lock".equals(u.getMessage())) {
throw new LockFailureException(uri, u.getMessage());
} else {
stateLog.error(
String.format(
"Failed replicate of %s to %s, reason: %s",
u.getRemoteName(), uri, u.getMessage()),
logStatesArray);
}
pushStatus = RefPushResult.FAILED;
anyRefFailed = true;
lastRefStatusError = u.getStatus();
break;
}
for (ReplicationState rs : getStatesByRef(u.getSrcRef())) {
rs.notifyRefReplicated(projectName.get(), u.getSrcRef(), uri, pushStatus, u.getStatus());
}
}
doneRefs.add(ALL_REFS);
for (ReplicationState rs : getStatesByRef(ALL_REFS)) {
rs.notifyRefReplicated(
projectName.get(),
ALL_REFS,
uri,
anyRefFailed ? RefPushResult.FAILED : RefPushResult.SUCCEEDED,
lastRefStatusError);
}
for (Map.Entry<String, ReplicationState> entry : stateMap.entries()) {
if (!doneRefs.contains(entry.getKey())) {
entry
.getValue()
.notifyRefReplicated(
projectName.get(), entry.getKey(), uri, RefPushResult.NOT_ATTEMPTED, null);
}
}
stateMap.clear();
}
public static class LockFailureException extends TransportException {
private static final long serialVersionUID = 1L;
LockFailureException(URIish uri, String message) {
super(uri, message);
}
}
}