blob: 255c51bc378fc21e4d8db86aa2610c5cacd48e6e [file] [log] [blame]
// Copyright (C) 2009 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.plugins.replication;
import com.google.common.base.Throwables;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.gerrit.common.TimeUtil;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.client.RefNames;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.git.ChangeCache;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.PerThreadRequestScope;
import com.google.gerrit.server.git.ProjectRunnable;
import com.google.gerrit.server.git.TagCache;
import com.google.gerrit.server.git.VisibleRefFilter;
import com.google.gerrit.server.project.NoSuchProjectException;
import com.google.gerrit.server.project.ProjectControl;
import com.google.gerrit.server.util.IdGenerator;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
import com.jcraft.jsch.JSchException;
import org.eclipse.jgit.errors.NoRemoteRepositoryException;
import org.eclipse.jgit.errors.NotSupportedException;
import org.eclipse.jgit.errors.RemoteRepositoryException;
import org.eclipse.jgit.errors.RepositoryNotFoundException;
import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.NullProgressMonitor;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.transport.CredentialsProvider;
import org.eclipse.jgit.transport.FetchConnection;
import org.eclipse.jgit.transport.PushResult;
import org.eclipse.jgit.transport.RefSpec;
import org.eclipse.jgit.transport.RemoteConfig;
import org.eclipse.jgit.transport.RemoteRefUpdate;
import org.eclipse.jgit.transport.Transport;
import org.eclipse.jgit.transport.URIish;
import org.slf4j.Logger;
import org.slf4j.MDC;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
/**
* A push to remote operation started by {@link GitReferenceUpdatedListener}.
* <p>
* Instance members are protected by the lock within PushQueue. Callers must
* take that lock to ensure they are working with a current view of the object.
*/
class PushOne implements ProjectRunnable {
private static final Logger repLog = ReplicationQueue.repLog;
private static final ReplicationStateLogger stateLog =
new ReplicationStateLogger(repLog);
static final String ALL_REFS = "..all..";
static final String ID_MDC_KEY = "pushOneId";
interface Factory {
PushOne create(Project.NameKey d, URIish u);
}
private final GitRepositoryManager gitManager;
private final SchemaFactory<ReviewDb> schema;
private final Destination pool;
private final RemoteConfig config;
private final CredentialsProvider credentialsProvider;
private final TagCache tagCache;
private final PerThreadRequestScope.Scoper threadScoper;
private final ChangeCache changeCache;
private final ReplicationQueue replicationQueue;
private final Project.NameKey projectName;
private final URIish uri;
private final Set<String> delta = Sets.newHashSetWithExpectedSize(4);
private boolean pushAllRefs;
private Repository git;
private boolean retrying;
private int retryCount;
private boolean canceled;
private final Multimap<String,ReplicationState> stateMap =
LinkedListMultimap.create();
private final int maxLockRetries;
private int lockRetryCount;
private final int id;
private final long createdAt;
@Inject
PushOne(final GitRepositoryManager grm,
final SchemaFactory<ReviewDb> s,
final Destination p,
final RemoteConfig c,
final CredentialsFactory cpFactory,
final TagCache tc,
final PerThreadRequestScope.Scoper ts,
final ChangeCache cc,
final ReplicationQueue rq,
final IdGenerator ig,
@Assisted final Project.NameKey d,
@Assisted final URIish u) {
gitManager = grm;
schema = s;
pool = p;
config = c;
credentialsProvider = cpFactory.create(c.getName());
tagCache = tc;
threadScoper = ts;
changeCache = cc;
replicationQueue = rq;
projectName = d;
uri = u;
lockRetryCount = 0;
maxLockRetries = pool.getLockErrorMaxRetries();
id = ig.next();
createdAt = TimeUtil.nowMs();
}
@Override
public Project.NameKey getProjectNameKey() {
return projectName;
}
@Override
public String getRemoteName() {
return config.getName();
}
@Override
public boolean hasCustomizedPrint() {
return true;
}
@Override
public String toString() {
String print = "[" + IdGenerator.format(id) + "] push " + uri;
if (retryCount > 0) {
print = "(retry " + retryCount + ") " + print;
}
return print;
}
boolean isRetrying() {
return retrying;
}
void setToRetry() {
retrying = true;
retryCount++;
}
void cancel() {
canceled = true;
}
boolean wasCanceled() {
return canceled;
}
URIish getURI() {
return uri;
}
void addRef(String ref) {
if (ALL_REFS.equals(ref)) {
delta.clear();
pushAllRefs = true;
repLog.trace("Added all refs for replication to " + uri);
} else if (!pushAllRefs) {
delta.add(ref);
repLog.trace("Added ref " + ref + " for replication to " + uri);
}
}
Set<String> getRefs() {
return pushAllRefs ? Sets.newHashSet(ALL_REFS) : delta;
}
void addRefs(Set<String> refs) {
if (!pushAllRefs) {
for (String ref : refs) {
addRef(ref);
}
}
}
void addState(String ref, ReplicationState state) {
stateMap.put(ref, state);
}
Multimap<String,ReplicationState> getStates() {
return stateMap;
}
ReplicationState[] getStatesAsArray() {
Set<ReplicationState> statesSet = new HashSet<>();
statesSet.addAll(stateMap.values());
return statesSet.toArray(new ReplicationState[statesSet.size()]);
}
ReplicationState[] getStatesByRef(String ref) {
Collection<ReplicationState> states = stateMap.get(ref);
return states.toArray(new ReplicationState[states.size()]);
}
void addStates(Multimap<String,ReplicationState> states) {
stateMap.putAll(states);
}
void removeStates() {
stateMap.clear();
}
private void statesCleanUp() {
if (!stateMap.isEmpty() && !isRetrying()) {
for (Map.Entry<String,ReplicationState> entry : stateMap.entries()) {
entry.getValue().notifyRefReplicated(projectName.get(), entry.getKey(), uri,
RefPushResult.FAILED);
}
}
}
@Override
public void run() {
try {
threadScoper.scope(new Callable<Void>(){
@Override
public Void call() {
runPushOperation();
return null;
}
}).call();
} catch (Exception e) {
throw Throwables.propagate(e);
} finally {
statesCleanUp();
}
}
private void runPushOperation() {
// Lock the queue, and remove ourselves, so we can't be modified once
// we start replication (instead a new instance, with the same URI, is
// created and scheduled for a future point in time.)
//
MDC.put(ID_MDC_KEY, IdGenerator.format(id));
if (!pool.requestRunway(this)) {
if (!canceled) {
repLog.info("Rescheduling replication to " + uri
+ " to avoid collision with an in-flight push.");
pool.reschedule(this, Destination.RetryReason.COLLISION);
}
return;
}
long startedAt = TimeUtil.nowMs();
repLog.info("Replication to " + uri + " started...");
try {
git = gitManager.openRepository(projectName);
runImpl();
long finishedAt = TimeUtil.nowMs();
repLog.info("Replication to " + uri + " completed in "
+ (finishedAt - startedAt) + "ms, "
+ (startedAt - createdAt) + "ms delay, " + retryCount + " retries");
} catch (RepositoryNotFoundException e) {
stateLog.error("Cannot replicate " + projectName
+ "; Local repository error: "
+ e.getMessage(), getStatesAsArray());
} catch (RemoteRepositoryException e) {
// Tried to replicate to a remote via anonymous git:// but the repository
// does not exist. In this case NoRemoteRepositoryException is not
// raised.
final String msg = e.getMessage();
if (msg.contains("access denied") || msg.contains("no such repository")) {
createRepository();
} else {
repLog.error("Cannot replicate " + projectName
+ "; Remote repository error: " + msg);
}
} catch (NoRemoteRepositoryException e) {
createRepository();
} catch (NotSupportedException e) {
stateLog.error("Cannot replicate to " + uri, e, getStatesAsArray());
} catch (TransportException e) {
Throwable cause = e.getCause();
if (cause instanceof JSchException
&& cause.getMessage().startsWith("UnknownHostKey:")) {
repLog.error("Cannot replicate to " + uri + ": " + cause.getMessage());
} else if (e instanceof LockFailureException) {
lockRetryCount++;
// The LockFailureException message contains both URI and reason
// for this failure.
repLog.error("Cannot replicate to " + e.getMessage());
// The remote push operation should be retried.
if (lockRetryCount <= maxLockRetries) {
pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR);
} else {
repLog.error("Giving up after " + lockRetryCount
+ " of this error during replication to " + e.getMessage());
}
} else {
repLog.error("Cannot replicate to " + uri, e);
// The remote push operation should be retried.
pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR);
}
} catch (IOException e) {
stateLog.error("Cannot replicate to " + uri, e, getStatesAsArray());
} catch (RuntimeException | Error e) {
stateLog.error("Unexpected error during replication to " + uri, e, getStatesAsArray());
} finally {
if (git != null) {
git.close();
}
pool.notifyFinished(this);
}
}
private void createRepository() {
if (pool.isCreateMissingRepos()) {
try {
final Ref head = git.getRef(Constants.HEAD);
if (replicationQueue.createProject(projectName, head != null ? head.getName() : null)) {
repLog.warn("Missing repository created; retry replication to " + uri);
pool.reschedule(this, Destination.RetryReason.REPOSITORY_MISSING);
} else {
repLog.warn("Missing repository could not be created when replicating " + uri +
". You can only create missing repositories locally, over SSH or when " +
"using adminUrl in replication.config. See documentation for more information.");
}
} catch (IOException ioe) {
stateLog.error("Cannot replicate to " + uri + "; failed to create missing repository",
ioe, getStatesAsArray());
}
} else {
stateLog.error("Cannot replicate to " + uri + "; repository not found", getStatesAsArray());
}
}
private void runImpl() throws IOException {
Transport tn = Transport.open(git, uri);
PushResult res;
try {
res = pushVia(tn);
} finally {
try {
tn.close();
} catch (Throwable e2) {
repLog.warn("Unexpected error while closing " + uri, e2);
}
}
updateStates(res.getRemoteUpdates());
}
private PushResult pushVia(Transport tn)
throws IOException, NotSupportedException, TransportException {
tn.applyConfig(config);
tn.setCredentialsProvider(credentialsProvider);
List<RemoteRefUpdate> todo = generateUpdates(tn);
if (todo.isEmpty()) {
// If we have no commands selected, we have nothing to do.
// Calling JGit at this point would just redo the work we
// already did, and come up with the same answer. Instead
// send back an empty result.
return new PushResult();
}
repLog.info("Push to " + uri + " references: " + todo);
return tn.push(NullProgressMonitor.INSTANCE, todo);
}
private List<RemoteRefUpdate> generateUpdates(Transport tn)
throws IOException {
ProjectControl pc;
try {
pc = pool.controlFor(projectName);
} catch (NoSuchProjectException e) {
return Collections.emptyList();
}
Map<String, Ref> local = git.getAllRefs();
if (!pc.allRefsAreVisible()) {
if (!pushAllRefs) {
// If we aren't mirroring, reduce the space we need to filter
// to only the references we will update during this operation.
//
Map<String, Ref> n = Maps.newHashMap();
for (String src : delta) {
Ref r = local.get(src);
if (r != null) {
n.put(src, r);
}
}
local = n;
}
ReviewDb db;
try {
db = schema.open();
} catch (OrmException e) {
stateLog.error("Cannot read database to replicate to " + projectName, e, getStatesAsArray());
return Collections.emptyList();
}
try {
local = new VisibleRefFilter(tagCache, changeCache, git, pc, db, true)
.filter(local, true);
} finally {
db.close();
}
}
return pushAllRefs ? doPushAll(tn, local) : doPushDelta(local);
}
private List<RemoteRefUpdate> doPushAll(Transport tn, Map<String, Ref> local)
throws NotSupportedException, TransportException, IOException {
List<RemoteRefUpdate> cmds = Lists.newArrayList();
boolean noPerms = !pool.isReplicatePermissions();
Map<String, Ref> remote = listRemote(tn);
for (Ref src : local.values()) {
if (!canPushRef(src.getName(), noPerms)) {
continue;
}
RefSpec spec = matchSrc(src.getName());
if (spec != null) {
Ref dst = remote.get(spec.getDestination());
if (dst == null || !src.getObjectId().equals(dst.getObjectId())) {
// Doesn't exist yet, or isn't the same value, request to push.
push(cmds, spec, src);
}
}
}
if (config.isMirror()) {
for (Ref ref : remote.values()) {
if (!Constants.HEAD.equals(ref.getName())) {
RefSpec spec = matchDst(ref.getName());
if (spec != null && !local.containsKey(spec.getSource())) {
// No longer on local side, request removal.
delete(cmds, spec);
}
}
}
}
return cmds;
}
private List<RemoteRefUpdate> doPushDelta(Map<String, Ref> local)
throws IOException {
List<RemoteRefUpdate> cmds = Lists.newArrayList();
boolean noPerms = !pool.isReplicatePermissions();
for (String src : delta) {
RefSpec spec = matchSrc(src);
if (spec != null) {
// If the ref still exists locally, send it, otherwise delete it.
Ref srcRef = local.get(src);
if (srcRef != null && canPushRef(src, noPerms)) {
push(cmds, spec, srcRef);
} else if (config.isMirror()) {
delete(cmds, spec);
}
}
}
return cmds;
}
private boolean canPushRef(String ref, boolean noPerms) {
return !(noPerms && RefNames.REFS_CONFIG.equals(ref)) &&
!ref.startsWith(RefNames.REFS_CACHE_AUTOMERGE);
}
private Map<String, Ref> listRemote(Transport tn)
throws NotSupportedException, TransportException {
FetchConnection fc = tn.openFetch();
try {
return fc.getRefsMap();
} finally {
fc.close();
}
}
private RefSpec matchSrc(String ref) {
for (RefSpec s : config.getPushRefSpecs()) {
if (s.matchSource(ref)) {
return s.expandFromSource(ref);
}
}
return null;
}
private RefSpec matchDst(String ref) {
for (RefSpec s : config.getPushRefSpecs()) {
if (s.matchDestination(ref)) {
return s.expandFromDestination(ref);
}
}
return null;
}
private void push(List<RemoteRefUpdate> cmds, RefSpec spec, Ref src)
throws IOException {
String dst = spec.getDestination();
boolean force = spec.isForceUpdate();
cmds.add(new RemoteRefUpdate(git, src, dst, force, null, null));
}
private void delete(List<RemoteRefUpdate> cmds, RefSpec spec)
throws IOException {
String dst = spec.getDestination();
boolean force = spec.isForceUpdate();
cmds.add(new RemoteRefUpdate(git, (Ref) null, dst, force, null, null));
}
private void updateStates(Collection<RemoteRefUpdate> refUpdates)
throws LockFailureException {
Set<String> doneRefs = new HashSet<>();
boolean anyRefFailed = false;
for (RemoteRefUpdate u : refUpdates) {
RefPushResult pushStatus = RefPushResult.SUCCEEDED;
Set<ReplicationState> logStates = new HashSet<>();
logStates.addAll(stateMap.get(u.getSrcRef()));
logStates.addAll(stateMap.get(ALL_REFS));
ReplicationState[] logStatesArray = logStates.toArray(new ReplicationState[logStates.size()]);
doneRefs.add(u.getSrcRef());
switch (u.getStatus()) {
case OK:
case UP_TO_DATE:
case NON_EXISTING:
break;
case NOT_ATTEMPTED:
case AWAITING_REPORT:
case REJECTED_NODELETE:
case REJECTED_NONFASTFORWARD:
case REJECTED_REMOTE_CHANGED:
stateLog.error(String.format("Failed replicate of %s to %s: status %s",
u.getRemoteName(), uri, u.getStatus()), logStatesArray);
pushStatus = RefPushResult.FAILED;
anyRefFailed = true;
break;
case REJECTED_OTHER_REASON:
if ("non-fast-forward".equals(u.getMessage())) {
stateLog.error(String.format("Failed replicate of %s to %s"
+ ", remote rejected non-fast-forward push."
+ " Check receive.denyNonFastForwards variable in config file"
+ " of destination repository.", u.getRemoteName(), uri), logStatesArray);
} else if ("failed to lock".equals(u.getMessage())) {
throw new LockFailureException(uri, u.getMessage());
} else {
stateLog.error(String.format(
"Failed replicate of %s to %s, reason: %s",
u.getRemoteName(), uri, u.getMessage()), logStatesArray);
}
pushStatus = RefPushResult.FAILED;
anyRefFailed = true;
break;
}
for (ReplicationState rs : getStatesByRef(u.getSrcRef())) {
rs.notifyRefReplicated(projectName.get(), u.getSrcRef(),
uri, pushStatus);
}
}
doneRefs.add(ALL_REFS);
for (ReplicationState rs : getStatesByRef(ALL_REFS)) {
rs.notifyRefReplicated(projectName.get(), ALL_REFS,
uri, anyRefFailed ? RefPushResult.FAILED : RefPushResult.SUCCEEDED);
}
for (Map.Entry<String,ReplicationState> entry : stateMap.entries()) {
if (!doneRefs.contains(entry.getKey())) {
entry.getValue().notifyRefReplicated(projectName.get(), entry.getKey(), uri,
RefPushResult.NOT_ATTEMPTED);
}
}
stateMap.clear();
}
public class LockFailureException extends TransportException {
private static final long serialVersionUID = 1L;
public LockFailureException(URIish uri, String message) {
super(uri, message);
}
}
}