blob: d4e22d9975224e80af8b97bad0ac7a9fd74a50ca [file] [log] [blame]
// Copyright (C) 2019 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.plugins.replication.pull;
import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import com.google.common.base.Throwables;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Sets;
import com.google.gerrit.entities.Project;
import com.google.gerrit.metrics.Timer1;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.PerThreadRequestScope;
import com.google.gerrit.server.git.ProjectRunnable;
import com.google.gerrit.server.git.WorkQueue.CanceledWhileRunning;
import com.google.gerrit.server.ioutil.HexFormat;
import com.google.gerrit.server.util.IdGenerator;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.googlesource.gerrit.plugins.replication.pull.fetch.Fetch;
import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchFactory;
import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
import com.jcraft.jsch.JSchException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.eclipse.jgit.errors.NoRemoteRepositoryException;
import org.eclipse.jgit.errors.NotSupportedException;
import org.eclipse.jgit.errors.RemoteRepositoryException;
import org.eclipse.jgit.errors.RepositoryNotFoundException;
import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.lib.RefUpdate;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.transport.RefSpec;
import org.eclipse.jgit.transport.RemoteConfig;
import org.eclipse.jgit.transport.URIish;
import org.slf4j.MDC;
/**
* A pull from remote operation started by command-line.
*
* <p>Instance members are protected by the lock within FetchQueue. Callers must take that lock to
* ensure they are working with a current view of the object.
*/
public class FetchOne implements ProjectRunnable, CanceledWhileRunning {
private final ReplicationStateListener stateLog;
static final String ALL_REFS = "..all..";
static final String ID_MDC_KEY = "fetchOneId";
interface Factory {
FetchOne create(Project.NameKey d, URIish u);
}
private final GitRepositoryManager gitManager;
private final Source pool;
private final RemoteConfig config;
private final PerThreadRequestScope.Scoper threadScoper;
private final Project.NameKey projectName;
private final URIish uri;
private final Set<String> delta = Sets.newHashSetWithExpectedSize(4);
private boolean fetchAllRefs;
private Repository git;
private boolean retrying;
private int retryCount;
private final int maxRetries;
private boolean canceled;
private final ListMultimap<String, ReplicationState> stateMap = LinkedListMultimap.create();
private final int maxLockRetries;
private int lockRetryCount;
private final int id;
private final long createdAt;
private final FetchReplicationMetrics metrics;
private final AtomicBoolean canceledWhileRunning;
private final FetchFactory fetchFactory;
@Inject
FetchOne(
GitRepositoryManager grm,
Source s,
SourceConfiguration c,
PerThreadRequestScope.Scoper ts,
IdGenerator ig,
ReplicationStateListeners sl,
FetchReplicationMetrics m,
FetchFactory fetchFactory,
@Assisted Project.NameKey d,
@Assisted URIish u) {
gitManager = grm;
pool = s;
config = c.getRemoteConfig();
threadScoper = ts;
projectName = d;
uri = u;
lockRetryCount = 0;
maxLockRetries = pool.getLockErrorMaxRetries();
id = ig.next();
stateLog = sl;
createdAt = System.nanoTime();
metrics = m;
canceledWhileRunning = new AtomicBoolean(false);
this.fetchFactory = fetchFactory;
maxRetries = s.getMaxRetries();
}
@Override
public void cancel() {
repLog.info("Replication {} was canceled", getURI());
canceledByReplication();
pool.fetchWasCanceled(this);
}
@Override
public void setCanceledWhileRunning() {
repLog.info("Replication {} was canceled while being executed", getURI());
canceledWhileRunning.set(true);
}
@Override
public Project.NameKey getProjectNameKey() {
return projectName;
}
@Override
public String getRemoteName() {
return config.getName();
}
@Override
public boolean hasCustomizedPrint() {
return true;
}
@Override
public String toString() {
String print = "[" + HexFormat.fromInt(id) + "] fetch " + uri;
if (retryCount > 0) {
print = "(retry " + retryCount + ") " + print;
}
return print;
}
boolean isRetrying() {
return retrying;
}
boolean setToRetry() {
retrying = true;
retryCount++;
return maxRetries == 0 || retryCount <= maxRetries;
}
void canceledByReplication() {
canceled = true;
}
boolean wasCanceled() {
return canceled;
}
URIish getURI() {
return uri;
}
void addRef(String ref) {
if (ALL_REFS.equals(ref)) {
delta.clear();
fetchAllRefs = true;
repLog.trace("Added all refs for replication from {}", uri);
} else if (!fetchAllRefs) {
delta.add(ref);
repLog.trace("Added ref {} for replication from {}", ref, uri);
}
}
Set<String> getRefs() {
return fetchAllRefs ? Sets.newHashSet(ALL_REFS) : delta;
}
void addRefs(Set<String> refs) {
if (!fetchAllRefs) {
for (String ref : refs) {
addRef(ref);
}
}
}
void addState(String ref, ReplicationState state) {
stateMap.put(ref, state);
}
ListMultimap<String, ReplicationState> getStates() {
return stateMap;
}
ReplicationState[] getStatesAsArray() {
Set<ReplicationState> statesSet = new HashSet<>();
statesSet.addAll(stateMap.values());
return statesSet.toArray(new ReplicationState[statesSet.size()]);
}
ReplicationState[] getStatesByRef(String ref) {
Collection<ReplicationState> states = stateMap.get(ref);
return states.toArray(new ReplicationState[states.size()]);
}
void addStates(ListMultimap<String, ReplicationState> states) {
stateMap.putAll(states);
}
void removeStates() {
stateMap.clear();
}
private void statesCleanUp() {
if (!stateMap.isEmpty() && !isRetrying()) {
for (Map.Entry<String, ReplicationState> entry : stateMap.entries()) {
entry
.getValue()
.notifyRefReplicated(
projectName.get(),
entry.getKey(),
uri,
ReplicationState.RefFetchResult.FAILED,
null);
}
}
}
@Override
public void run() {
try {
threadScoper
.scope(
new Callable<Void>() {
@Override
public Void call() {
runFetchOperation();
return null;
}
})
.call();
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally {
statesCleanUp();
}
}
private void runFetchOperation() {
// Lock the queue, and remove ourselves, so we can't be modified once
// we start replication (instead a new instance, with the same URI, is
// created and scheduled for a future point in time.)
//
MDC.put(ID_MDC_KEY, HexFormat.fromInt(id));
if (!pool.requestRunway(this)) {
if (!canceled) {
repLog.info(
"Rescheduling replication to {} to avoid collision with an in-flight fetch.", uri);
pool.reschedule(this, Source.RetryReason.COLLISION);
}
return;
}
repLog.info("Replication from {} started...", uri);
Timer1.Context<String> context = metrics.start(config.getName());
try {
long startedAt = context.getStartTime();
long delay = NANOSECONDS.toMillis(startedAt - createdAt);
metrics.record(config.getName(), delay, retryCount);
git = gitManager.openRepository(projectName);
runImpl();
long elapsed = NANOSECONDS.toMillis(context.stop());
repLog.info(
"Replication from {} completed in {}ms, {}ms delay, {} retries",
uri,
elapsed,
delay,
retryCount);
} catch (RepositoryNotFoundException e) {
stateLog.error(
"Cannot replicate " + projectName + "; Local repository error: " + e.getMessage(),
getStatesAsArray());
} catch (NoRemoteRepositoryException | RemoteRepositoryException e) {
// Tried to replicate to a remote via anonymous git:// but the repository
// does not exist. In this case NoRemoteRepositoryException is not
// raised.
String msg = e.getMessage();
repLog.error("Cannot replicate {}; Remote repository error: {}", projectName, msg);
} catch (NotSupportedException e) {
stateLog.error("Cannot replicate from " + uri, e, getStatesAsArray());
} catch (TransportException e) {
Throwable cause = e.getCause();
if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) {
repLog.error("Cannot replicate from {}: {}", uri, cause.getMessage());
} else if (e instanceof LockFailureException) {
lockRetryCount++;
// The LockFailureException message contains both URI and reason
// for this failure.
repLog.error("Cannot replicate from {}: {}", uri, e.getMessage());
// The remote fetch operation should be retried.
if (lockRetryCount <= maxLockRetries) {
if (canceledWhileRunning.get()) {
logCanceledWhileRunningException(e);
} else {
pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR);
}
} else {
repLog.error(
"Giving up after {} occurrences of this error: {} during replication from {}",
lockRetryCount,
e.getMessage(),
uri);
}
} else {
if (canceledWhileRunning.get()) {
logCanceledWhileRunningException(e);
} else {
repLog.error("Cannot replicate from {}", uri, e);
// The remote fetch operation should be retried.
pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR);
}
}
} catch (IOException e) {
stateLog.error("Cannot replicate from " + uri, e, getStatesAsArray());
} catch (RuntimeException | Error e) {
stateLog.error("Unexpected error during replication from " + uri, e, getStatesAsArray());
} finally {
if (git != null) {
git.close();
}
pool.notifyFinished(this);
}
}
private void logCanceledWhileRunningException(TransportException e) {
repLog.info("Cannot replicate from {}. It was canceled while running", uri, e);
}
private void runImpl() throws IOException {
Fetch fetch = fetchFactory.create(uri, git);
List<RefSpec> fetchRefSpecs = getFetchRefSpecs();
updateStates(fetch.fetch(fetchRefSpecs));
}
private List<RefSpec> getFetchRefSpecs() {
if (delta.isEmpty()) {
return config.getFetchRefSpecs();
}
return delta.stream().map(ref -> new RefSpec(ref + ":" + ref)).collect(Collectors.toList());
}
private void updateStates(List<RefUpdateState> refUpdates) throws IOException {
Set<String> doneRefs = new HashSet<>();
boolean anyRefFailed = false;
RefUpdate.Result lastRefUpdateResult = RefUpdate.Result.NO_CHANGE;
for (RefUpdateState u : refUpdates) {
ReplicationState.RefFetchResult fetchStatus = ReplicationState.RefFetchResult.SUCCEEDED;
Set<ReplicationState> logStates = new HashSet<>();
lastRefUpdateResult = u.getResult();
logStates.addAll(stateMap.get(u.getRemoteName()));
logStates.addAll(stateMap.get(ALL_REFS));
ReplicationState[] logStatesArray = logStates.toArray(new ReplicationState[logStates.size()]);
doneRefs.add(u.getRemoteName());
switch (u.getResult()) {
case NO_CHANGE:
case NEW:
case FORCED:
case RENAMED:
case FAST_FORWARD:
break;
case NOT_ATTEMPTED:
case REJECTED:
case REJECTED_CURRENT_BRANCH:
case REJECTED_MISSING_OBJECT:
stateLog.error(
String.format(
"Failed replicate %s from %s: result %s", uri, u.getRemoteName(), u.getResult()),
logStatesArray);
fetchStatus = ReplicationState.RefFetchResult.FAILED;
anyRefFailed = true;
break;
case LOCK_FAILURE:
throw new LockFailureException(uri, u.toString());
case IO_FAILURE:
throw new IOException(u.toString());
case REJECTED_OTHER_REASON:
stateLog.error(
String.format(
"Failed replicate %s from %s, reason: %s", uri, u.getRemoteName(), u.toString()),
logStatesArray);
fetchStatus = ReplicationState.RefFetchResult.FAILED;
anyRefFailed = true;
break;
}
for (ReplicationState rs : getStatesByRef(u.getRemoteName())) {
rs.notifyRefReplicated(
projectName.get(), u.getRemoteName(), uri, fetchStatus, u.getResult());
}
}
doneRefs.add(ALL_REFS);
for (ReplicationState rs : getStatesByRef(ALL_REFS)) {
rs.notifyRefReplicated(
projectName.get(),
ALL_REFS,
uri,
anyRefFailed
? ReplicationState.RefFetchResult.FAILED
: ReplicationState.RefFetchResult.SUCCEEDED,
lastRefUpdateResult);
}
for (Map.Entry<String, ReplicationState> entry : stateMap.entries()) {
if (!doneRefs.contains(entry.getKey())) {
entry
.getValue()
.notifyRefReplicated(
projectName.get(),
entry.getKey(),
uri,
ReplicationState.RefFetchResult.NOT_ATTEMPTED,
null);
}
}
stateMap.clear();
}
public static class LockFailureException extends TransportException {
private static final long serialVersionUID = 1L;
LockFailureException(URIish uri, String message) {
super(uri, message);
}
}
}