Merge branch 'stable-3.1' into stable-3.2
* stable-3.1:
ReplicationConfig: Add missing JavaDoc
Add method to push changes directly to given replica
Change-Id: I61d49ebb90353fb8f073866cc2525f17fcf3a866
diff --git a/BUILD b/BUILD
index 50615d8..14b8876 100644
--- a/BUILD
+++ b/BUILD
@@ -21,6 +21,7 @@
junit_tests(
name = "replication_tests",
+ timeout = "long",
srcs = glob([
"src/test/java/**/*Test.java",
"src/test/java/**/*IT.java",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index fe5dbad..782ff4f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -67,6 +67,11 @@
}
@Override
+ public int getDistributionInterval() {
+ return currentConfig.getDistributionInterval();
+ }
+
+ @Override
public synchronized int getMaxRefsToLog() {
return currentConfig.getMaxRefsToLog();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
index fa26e82..424648e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
@@ -63,7 +63,8 @@
return true;
}
- repLog.warn("Cannot create new project {} on remote site {}.", projectName, replicateURI);
+ repLog.atWarning().log(
+ "Cannot create new project %s on remote site %s.", projectName, replicateURI);
return false;
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
index 4617672..8ea7227 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
@@ -55,7 +55,7 @@
return;
}
- repLog.warn("Cannot delete project {} on remote site {}.", project, replicateURI);
+ repLog.atWarning().log("Cannot delete project %s on remote site %s.", project, replicateURI);
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 85c3917..936bb2c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.replication;
+import static com.google.gerrit.server.project.ProjectCache.noSuchProject;
import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName;
import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
import static org.eclipse.jgit.transport.RemoteRefUpdate.Status.NON_EXISTING;
@@ -30,6 +31,7 @@
import com.google.gerrit.entities.BranchNameKey;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.exceptions.StorageException;
import com.google.gerrit.extensions.config.FactoryModule;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.restapi.AuthException;
@@ -51,6 +53,7 @@
import com.google.gerrit.server.project.ProjectCache;
import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.util.logging.NamedFluentLogger;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Provider;
@@ -64,10 +67,13 @@
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -81,10 +87,9 @@
import org.eclipse.jgit.transport.RemoteConfig;
import org.eclipse.jgit.transport.RemoteRefUpdate;
import org.eclipse.jgit.transport.URIish;
-import org.slf4j.Logger;
public class Destination {
- private static final Logger repLog = ReplicationQueue.repLog;
+ private static final NamedFluentLogger repLog = ReplicationQueue.repLog;
private static final String PROJECT_NOT_AVAILABLE = "source project %s not available";
@@ -94,7 +99,9 @@
private final ReplicationStateListener stateLog;
private final Object stateLock = new Object();
- private final Map<URIish, PushOne> pending = new HashMap<>();
+ // writes are covered by the stateLock, but some reads are still
+ // allowed without the lock
+ private final ConcurrentMap<URIish, PushOne> pending = new ConcurrentHashMap<>();
private final Map<URIish, PushOne> inFlight = new HashMap<>();
private final PushOne.Factory opFactory;
private final DeleteProjectTask.Factory deleteProjectFactory;
@@ -156,7 +163,7 @@
builder.add(g.getUUID());
addRecursiveParents(g.getUUID(), builder, groupIncludeCache);
} else {
- repLog.warn("Group \"{}\" not recognized, removing from authGroup", name);
+ repLog.atWarning().log("Group \"%s\" not recognized, removing from authGroup", name);
}
}
remoteUser = new RemoteSiteUser(new ListGroupMembership(builder.build()));
@@ -240,11 +247,9 @@
int numInFlight = inFlight.size();
if (numPending > 0 || numInFlight > 0) {
- repLog.warn(
- "Cancelling replication events (pending={}, inFlight={}) for destination {}",
- numPending,
- numInFlight,
- getRemoteConfigName());
+ repLog.atWarning().log(
+ "Cancelling replication events (pending=%d, inFlight=%d) for destination %s",
+ numPending, numInFlight, getRemoteConfigName());
foreachPushOp(
pending,
@@ -280,7 +285,8 @@
if (!config.replicateHiddenProjects()
&& state.getProject().getState()
== com.google.gerrit.extensions.client.ProjectState.HIDDEN) {
- repLog.debug("Project {} is hidden and replication of hidden projects is disabled", name);
+ repLog.atFine().log(
+ "Project %s is hidden and replication of hidden projects is disabled", name);
return false;
}
@@ -293,10 +299,9 @@
permissionBackend.user(user).project(state.getNameKey()).check(permissionToCheck);
return true;
} catch (AuthException e) {
- repLog.debug(
- "Project {} is not visible to current user {}",
- name,
- user.getUserName().orElse("unknown"));
+ repLog.atFine().log(
+ "Project %s is not visible to current user %s",
+ name, user.getUserName().orElse("unknown"));
return false;
}
}
@@ -309,21 +314,22 @@
() -> {
ProjectState projectState;
try {
- projectState = projectCache.checkedGet(project);
- } catch (IOException e) {
- repLog.warn("Error reading project {} from cache", project, e);
+ projectState = projectCache.get(project).orElseThrow(noSuchProject(project));
+ } catch (StorageException e) {
+ repLog.atWarning().withCause(e).log(
+ "Error reading project %s from cache", project);
return false;
}
if (projectState == null) {
- repLog.debug("Project {} does not exist", project);
+ repLog.atFine().log("Project %s does not exist", project);
throw new NoSuchProjectException(project);
}
if (!projectState.statePermitsRead()) {
- repLog.debug("Project {} does not permit read", project);
+ repLog.atFine().log("Project %s does not permit read", project);
return false;
}
if (!shouldReplicate(projectState, userProvider.get())) {
- repLog.debug("Project {} should not be replicated", project);
+ repLog.atFine().log("Project %s should not be replicated", project);
return false;
}
if (PushOne.ALL_REFS.equals(ref)) {
@@ -336,11 +342,9 @@
.ref(ref)
.check(RefPermission.READ);
} catch (AuthException e) {
- repLog.debug(
- "Ref {} on project {} is not visible to calling user {}",
- ref,
- project,
- userProvider.get().getUserName().orElse("unknown"));
+ repLog.atFine().log(
+ "Ref %s on project %s is not visible to calling user %s",
+ ref, project, userProvider.get().getUserName().orElse("unknown"));
return false;
}
return true;
@@ -362,13 +366,10 @@
() -> {
ProjectState projectState;
try {
- projectState = projectCache.checkedGet(project);
- } catch (IOException e) {
+ projectState = projectCache.get(project).orElseThrow(noSuchProject(project));
+ } catch (StorageException e) {
return false;
}
- if (projectState == null) {
- throw new NoSuchProjectException(project);
- }
return shouldReplicate(projectState, userProvider.get());
})
.call();
@@ -388,10 +389,10 @@
void schedule(
Project.NameKey project, String ref, URIish uri, ReplicationState state, boolean now) {
if (!shouldReplicate(project, ref, state)) {
- repLog.debug("Not scheduling replication {}:{} => {}", project, ref, uri);
+ repLog.atFine().log("Not scheduling replication %s:%s => %s", project, ref, uri);
return;
}
- repLog.info("scheduling replication {}:{} => {}", project, ref, uri);
+ repLog.atInfo().log("scheduling replication %s:%s => %s", project, ref, uri);
if (!config.replicatePermissions()) {
PushOne e;
@@ -433,7 +434,8 @@
task.addState(ref, state);
}
state.increasePushTaskCount(project.get(), ref);
- repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, task, config.getDelay());
+ repLog.atInfo().log(
+ "scheduled %s:%s => %s to run after %ds", project, ref, task, config.getDelay());
}
}
@@ -559,6 +561,7 @@
pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES);
} else {
pushOp.canceledByReplication();
+ pushOp.retryDone();
pending.remove(uri);
stateLog.error(
"Push to " + pushOp.getURI() + " cancelled after maximum number of retries",
@@ -580,7 +583,9 @@
if (inFlightOp != null) {
return RunwayStatus.denied(inFlightOp.getId());
}
- replicationTasksStorage.get().start(op);
+ if (!replicationTasksStorage.get().start(op)) {
+ return RunwayStatus.deniedExternal();
+ }
inFlight.put(op.getURI(), op);
}
return RunwayStatus.allowed();
@@ -595,9 +600,20 @@
}
}
+ public Set<String> getPrunableTaskNames() {
+ Set<String> names = new HashSet<>();
+ for (PushOne push : pending.values()) {
+ if (!replicationTasksStorage.get().isWaiting(push)) {
+ repLog.atFine().log("No longer isWaiting, can prune %s", push.getURI());
+ names.add(push.toString());
+ }
+ }
+ return names;
+ }
+
boolean wouldPushProject(Project.NameKey project) {
if (!shouldReplicate(project)) {
- repLog.debug("Skipping replication of project {}", project.get());
+ repLog.atFine().log("Skipping replication of project %s", project.get());
return false;
}
@@ -609,7 +625,8 @@
boolean matches = (new ReplicationFilter(projects)).matches(project);
if (!matches) {
- repLog.debug("Skipping replication of project {}; does not match filter", project.get());
+ repLog.atFine().log(
+ "Skipping replication of project %s; does not match filter", project.get());
}
return matches;
}
@@ -620,7 +637,7 @@
boolean wouldPushRef(String ref) {
if (!config.replicatePermissions() && RefNames.REFS_CONFIG.equals(ref)) {
- repLog.debug("Skipping push of ref {}; it is a meta ref", ref);
+ repLog.atFine().log("Skipping push of ref %s; it is a meta ref", ref);
return false;
}
if (PushOne.ALL_REFS.equals(ref)) {
@@ -631,7 +648,7 @@
return true;
}
}
- repLog.debug("Skipping push of ref {}; it does not match push ref specs", ref);
+ repLog.atFine().log("Skipping push of ref %s; it does not match push ref specs", ref);
return false;
}
@@ -671,7 +688,7 @@
} else if (remoteNameStyle.equals("basenameOnly")) {
name = FilenameUtils.getBaseName(name);
} else if (!remoteNameStyle.equals("slash")) {
- repLog.debug("Unknown remoteNameStyle: {}, falling back to slash", remoteNameStyle);
+ repLog.atFine().log("Unknown remoteNameStyle: %s, falling back to slash", remoteNameStyle);
}
String replacedPath = replaceName(template.getPath(), name, isSingleProjectMatch());
return (replacedPath != null) ? template.setPath(replacedPath) : template;
@@ -757,7 +774,7 @@
try {
eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
} catch (PermissionBackendException e) {
- repLog.error("error posting event", e);
+ repLog.atSevere().withCause(e).log("error posting event");
}
}
}
@@ -771,7 +788,7 @@
try {
eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
} catch (PermissionBackendException e) {
- repLog.error("error posting event", e);
+ repLog.atSevere().withCause(e).log("error posting event");
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java
index 4050c9c..9b6d431 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java
@@ -14,6 +14,8 @@
package com.googlesource.gerrit.plugins.replication;
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.flogger.FluentLogger;
@@ -54,6 +56,11 @@
continue;
}
+ if (!c.getFetchRefSpecs().isEmpty()) {
+ repLog.atInfo().log("Ignore '%s' endpoint: not a 'push' target", c.getName());
+ continue;
+ }
+
// If destination for push is not set assume equal to source.
for (RefSpec ref : c.getPushRefSpecs()) {
if (ref.getDestination() == null) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
index fcf5dfa..ecfbb8e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -106,7 +106,7 @@
try {
uri = new URIish(url);
} catch (URISyntaxException e) {
- repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage());
+ repLog.atWarning().log("adminURL '%s' is invalid: %s", url, e.getMessage());
continue;
}
@@ -114,13 +114,14 @@
String path =
replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
if (path == null) {
- repLog.warn("adminURL {} does not contain ${name}", uri);
+ repLog.atWarning().log("adminURL %s does not contain ${name}", uri);
continue;
}
uri = uri.setPath(path);
if (!isSSH(uri)) {
- repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri);
+ repLog.atWarning().log(
+ "adminURL '%s' is invalid: only SSH and HTTP are supported", uri);
continue;
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
index 4cc9974..5b24bf5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
@@ -152,6 +152,11 @@
}
@Override
+ public int getDistributionInterval() {
+ return replicationConfig.getDistributionInterval();
+ }
+
+ @Override
public String getVersion() {
Hasher hasher = Hashing.murmur3_128().newHasher();
hasher.putString(replicationConfig.getVersion(), UTF_8);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
index 66130f9..fa81dd0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
@@ -62,34 +62,34 @@
@Override
public boolean createProject(Project.NameKey project, String head) {
- repLog.info("Creating project {} on {}", project, uri);
+ repLog.atInfo().log("Creating project %s on %s", project, uri);
String url = String.format("%s/a/projects/%s", toHttpUri(uri), Url.encode(project.get()));
try {
return httpClient
.execute(new HttpPut(url), new HttpResponseHandler(), getContext())
.isSuccessful();
} catch (IOException e) {
- repLog.error("Couldn't perform project creation on {}", uri, e);
+ repLog.atSevere().withCause(e).log("Couldn't perform project creation on %s", uri);
return false;
}
}
@Override
public boolean deleteProject(Project.NameKey project) {
- repLog.info("Deleting project {} on {}", project, uri);
+ repLog.atInfo().log("Deleting project %s on %s", project, uri);
String url = String.format("%s/a/projects/%s", toHttpUri(uri), Url.encode(project.get()));
try {
httpClient.execute(new HttpDelete(url), new HttpResponseHandler(), getContext());
return true;
} catch (IOException e) {
- repLog.error("Couldn't perform project deletion on {}", uri, e);
+ repLog.atSevere().withCause(e).log("Couldn't perform project deletion on %s", uri);
}
return false;
}
@Override
public boolean updateHead(Project.NameKey project, String newHead) {
- repLog.info("Updating head of {} on {}", project, uri);
+ repLog.atInfo().log("Updating head of %s on %s", project, uri);
String url = String.format("%s/a/projects/%s/HEAD", toHttpUri(uri), Url.encode(project.get()));
try {
HttpPut req = new HttpPut(url);
@@ -98,7 +98,7 @@
httpClient.execute(req, new HttpResponseHandler(), getContext());
return true;
} catch (IOException e) {
- repLog.error("Couldn't perform update head on {}", uri, e);
+ repLog.atSevere().withCause(e).log("Couldn't perform update head on %s", uri);
}
return false;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
index da960e6..b092363 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -43,9 +43,9 @@
u.disableRefLog();
u.link(head);
}
- repLog.info("Created local repository: {}", uri);
+ repLog.atInfo().log("Created local repository: %s", uri);
} catch (IOException e) {
- repLog.error("Error creating local repository {}", uri.getPath(), e);
+ repLog.atSevere().withCause(e).log("Error creating local repository %s", uri.getPath());
return false;
}
return true;
@@ -55,9 +55,9 @@
public boolean deleteProject(Project.NameKey project) {
try {
recursivelyDelete(new File(uri.getPath()));
- repLog.info("Deleted local repository: {}", uri);
+ repLog.atInfo().log("Deleted local repository: %s", uri);
} catch (IOException e) {
- repLog.error("Error deleting local repository {}:\n", uri.getPath(), e);
+ repLog.atSevere().withCause(e).log("Error deleting local repository %s:\n", uri.getPath());
return false;
}
return true;
@@ -71,7 +71,8 @@
u.link(newHead);
}
} catch (IOException e) {
- repLog.error("Failed to update HEAD of repository {} to {}", uri.getPath(), newHead, e);
+ repLog.atSevere().withCause(e).log(
+ "Failed to update HEAD of repository %s to %s", uri.getPath(), newHead);
return false;
}
return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 634f44d..9ccb70c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.replication;
+import static com.google.common.flogger.LazyArgs.lazy;
import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -37,6 +38,7 @@
import com.google.gerrit.server.git.ProjectRunnable;
import com.google.gerrit.server.git.WorkQueue.CanceledWhileRunning;
import com.google.gerrit.server.ioutil.HexFormat;
+import com.google.gerrit.server.logging.TraceContext;
import com.google.gerrit.server.permissions.PermissionBackend;
import com.google.gerrit.server.permissions.PermissionBackend.RefFilterOptions;
import com.google.gerrit.server.permissions.PermissionBackendException;
@@ -56,6 +58,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jgit.errors.NoRemoteRepositoryException;
@@ -76,7 +79,6 @@
import org.eclipse.jgit.transport.RemoteRefUpdate;
import org.eclipse.jgit.transport.Transport;
import org.eclipse.jgit.transport.URIish;
-import org.slf4j.MDC;
/**
* A push to remote operation started by {@link GitReferenceUpdatedListener}.
@@ -87,7 +89,7 @@
class PushOne implements ProjectRunnable, CanceledWhileRunning {
private final ReplicationStateListener stateLog;
static final String ALL_REFS = "..all..";
- static final String ID_MDC_KEY = "pushOneId";
+ static final String ID_KEY = "pushOneId";
interface Factory {
PushOne create(Project.NameKey d, URIish u);
@@ -169,17 +171,16 @@
@Override
public void cancel() {
- repLog.info("Replication [{}] to {} was canceled", HexFormat.fromInt(id), getURI());
+ repLog.atInfo().log("Replication [%s] to %s was canceled", HexFormat.fromInt(id), getURI());
canceledByReplication();
pool.pushWasCanceled(this);
}
@Override
public void setCanceledWhileRunning() {
- repLog.info(
- "Replication [{}] to {} was canceled while being executed",
- HexFormat.fromInt(id),
- getURI());
+ repLog.atInfo().log(
+ "Replication [%s] to %s was canceled while being executed",
+ HexFormat.fromInt(id), getURI());
canceledWhileRunning.set(true);
}
@@ -218,7 +219,7 @@
return maxRetries == 0 || retryCount <= maxRetries;
}
- private void retryDone() {
+ void retryDone() {
this.retrying = false;
}
@@ -238,9 +239,10 @@
if (ALL_REFS.equals(ref)) {
delta.clear();
pushAllRefs = true;
- repLog.trace("Added all refs for replication to {}", uri);
+ repLog.atFinest().log("Added all refs for replication to %s", uri);
} else if (!pushAllRefs && delta.add(ref)) {
- repLog.trace("Added ref {} for replication to {}", ref, uri);
+ delta.add(ref);
+ repLog.atFinest().log("Added ref %s for replication to %s", ref, uri);
}
}
@@ -317,28 +319,35 @@
}
private void runPushOperation() {
+ try (TraceContext ctx = TraceContext.open().addTag(ID_KEY, HexFormat.fromInt(id))) {
+ doRunPushOperation();
+ }
+ }
+
+ private void doRunPushOperation() {
// Lock the queue, and remove ourselves, so we can't be modified once
// we start replication (instead a new instance, with the same URI, is
// created and scheduled for a future point in time.)
//
- MDC.put(ID_MDC_KEY, HexFormat.fromInt(id));
RunwayStatus status = pool.requestRunway(this);
isCollision = false;
if (!status.isAllowed()) {
if (status.isCanceled()) {
- repLog.info("PushOp for replication to {} was canceled and thus won't be rescheduled", uri);
+ repLog.atInfo().log(
+ "PushOp for replication to %s was canceled and thus won't be rescheduled", uri);
+ } else if (status.isExternalInflight()) {
+ repLog.atInfo().log("PushOp for replication to %s was denied externally", uri);
} else {
- repLog.info(
- "Rescheduling replication to {} to avoid collision with the in-flight push [{}].",
- uri,
- HexFormat.fromInt(status.getInFlightPushId()));
+ repLog.atInfo().log(
+ "Rescheduling replication to %s to avoid collision with the in-flight push [%s].",
+ uri, HexFormat.fromInt(status.getInFlightPushId()));
pool.reschedule(this, Destination.RetryReason.COLLISION);
isCollision = true;
}
return;
}
- repLog.info("Replication to {} started...", uri);
+ repLog.atInfo().log("Replication to %s started...", uri);
Timer1.Context<String> destinationContext = metrics.start(config.getName());
try {
long startedAt = destinationContext.getStartTime();
@@ -353,12 +362,9 @@
config.getName(), projectName.get(), pool.getSlowLatencyThreshold(), elapsed);
}
retryDone();
- repLog.info(
- "Replication to {} completed in {}ms, {}ms delay, {} retries",
- uri,
- elapsed,
- delay,
- retryCount);
+ repLog.atInfo().log(
+ "Replication to %s completed in %dms, %dms delay, %d retries",
+ uri, elapsed, delay, retryCount);
} catch (RepositoryNotFoundException e) {
stateLog.error(
"Cannot replicate " + projectName + "; Local repository error: " + e.getMessage(),
@@ -375,7 +381,7 @@
|| msg.contains("unavailable")) {
createRepository();
} else {
- repLog.error("Cannot replicate {}; Remote repository error: {}", projectName, msg);
+ repLog.atSevere().log("Cannot replicate %s; Remote repository error: %s", projectName, msg);
}
} catch (NoRemoteRepositoryException e) {
@@ -385,10 +391,10 @@
} catch (TransportException e) {
Throwable cause = e.getCause();
if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) {
- repLog.error("Cannot replicate to {}: {}", uri, cause.getMessage());
+ repLog.atSevere().log("Cannot replicate to %s: %s", uri, cause.getMessage());
} else if (e instanceof LockFailureException) {
lockRetryCount++;
- repLog.error("Cannot replicate to {} due to lock failure", uri);
+ repLog.atSevere().log("Cannot replicate to %s due to lock failure", uri);
// The remote push operation should be retried.
if (lockRetryCount <= maxLockRetries) {
@@ -398,14 +404,14 @@
pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR);
}
} else {
- repLog.error(
- "Giving up after {} lock failures during replication to {}", lockRetryCount, uri);
+ repLog.atSevere().log(
+ "Giving up after %d lock failures during replication to %s", lockRetryCount, uri);
}
} else {
if (canceledWhileRunning.get()) {
logCanceledWhileRunningException(e);
} else {
- repLog.error("Cannot replicate to {}", uri, e);
+ repLog.atSevere().withCause(e).log("Cannot replicate to %s", uri);
// The remote push operation should be retried.
pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR);
}
@@ -423,7 +429,7 @@
}
private void logCanceledWhileRunningException(TransportException e) {
- repLog.info("Cannot replicate to {}. It was canceled while running", uri, e);
+ repLog.atInfo().withCause(e).log("Cannot replicate to %s. It was canceled while running", uri);
}
private void createRepository() {
@@ -431,10 +437,11 @@
try {
Ref head = git.exactRef(Constants.HEAD);
if (createProject(projectName, head != null ? getName(head) : null)) {
- repLog.warn("Missing repository created; retry replication to {}", uri);
+ repLog.atWarning().log("Missing repository created; retry replication to %s", uri);
pool.reschedule(this, Destination.RetryReason.REPOSITORY_MISSING);
} else {
- repLog.warn("Missing repository could not be created when replicating {}", uri);
+ repLog.atWarning().log(
+ "Missing repository could not be created when replicating %s", uri);
}
} catch (IOException ioe) {
stateLog.error(
@@ -481,14 +488,14 @@
}
if (replConfig.getMaxRefsToLog() == 0 || todo.size() <= replConfig.getMaxRefsToLog()) {
- repLog.info("Push to {} references: {}", uri, refUpdatesForLogging(todo));
+ repLog.atInfo().log("Push to %s references: %s", uri, lazy(() -> refUpdatesForLogging(todo)));
} else {
- repLog.info(
- "Push to {} references (first {} of {} listed): {}",
+ repLog.atInfo().log(
+ "Push to %s references (first %d of %d listed): %s",
uri,
replConfig.getMaxRefsToLog(),
todo.size(),
- refUpdatesForLogging(todo.subList(0, replConfig.getMaxRefsToLog())));
+ lazy(() -> refUpdatesForLogging(todo.subList(0, replConfig.getMaxRefsToLog()))));
}
return tn.push(NullProgressMonitor.INSTANCE, todo);
@@ -523,8 +530,8 @@
private List<RemoteRefUpdate> generateUpdates(Transport tn)
throws IOException, PermissionBackendException {
- ProjectState projectState = projectCache.checkedGet(projectName);
- if (projectState == null) {
+ Optional<ProjectState> projectState = projectCache.get(projectName);
+ if (!projectState.isPresent()) {
return Collections.emptyList();
}
@@ -533,7 +540,7 @@
boolean filter;
PermissionBackend.ForProject forProject = permissionBackend.currentUser().project(projectName);
try {
- projectState.checkStatePermitsRead();
+ projectState.get().checkStatePermitsRead();
forProject.check(ProjectPermission.READ);
filter = false;
} catch (AuthException | ResourceConflictException e) {
@@ -553,7 +560,11 @@
}
local = n;
}
- local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build());
+ local =
+ forProject
+ .filter(local.values(), git, RefFilterOptions.builder().setFilterMeta(true).build())
+ .stream()
+ .collect(toMap(Ref::getName, r -> r));
}
List<RemoteRefUpdate> remoteUpdatesList =
@@ -570,7 +581,7 @@
Map<String, Ref> remote = listRemote(tn);
for (Ref src : local.values()) {
if (!canPushRef(src.getName(), noPerms)) {
- repLog.debug("Skipping push of ref {}", src.getName());
+ repLog.atFine().log("Skipping push of ref %s", src.getName());
continue;
}
@@ -587,7 +598,7 @@
if (config.isMirror()) {
for (Ref ref : remote.values()) {
if (Constants.HEAD.equals(ref.getName())) {
- repLog.debug("Skipping deletion of {}", ref.getName());
+ repLog.atFine().log("Skipping deletion of %s", ref.getName());
continue;
}
RefSpec spec = matchDst(ref.getName());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
index 7538298..f96c157 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -42,17 +42,14 @@
OutputStream errStream = sshHelper.newErrorBufferStream();
try {
sshHelper.executeRemoteSsh(uri, cmd, errStream);
- repLog.info("Created remote repository: {}", uri);
+ repLog.atInfo().log("Created remote repository: %s", uri);
} catch (IOException e) {
- repLog.error(
- "Error creating remote repository at {}:\n"
- + " Exception: {}\n"
- + " Command: {}\n"
- + " Output: {}",
- uri,
- e,
- cmd,
- errStream);
+ repLog.atSevere().log(
+ "Error creating remote repository at %s:\n"
+ + " Exception: %s\n"
+ + " Command: %s\n"
+ + " Output: %s",
+ uri, e, cmd, errStream);
return false;
}
return true;
@@ -65,17 +62,14 @@
OutputStream errStream = sshHelper.newErrorBufferStream();
try {
sshHelper.executeRemoteSsh(uri, cmd, errStream);
- repLog.info("Deleted remote repository: {}", uri);
+ repLog.atInfo().log("Deleted remote repository: %s", uri);
} catch (IOException e) {
- repLog.error(
- "Error deleting remote repository at {}:\n"
- + " Exception: {}\n"
- + " Command: {}\n"
- + " Output: {}",
- uri,
- e,
- cmd,
- errStream);
+ repLog.atSevere().log(
+ "Error deleting remote repository at %s:\n"
+ + " Exception: %s\n"
+ + " Command: %s\n"
+ + " Output: %s",
+ uri, e, cmd, errStream);
return false;
}
return true;
@@ -90,16 +84,12 @@
try {
sshHelper.executeRemoteSsh(uri, cmd, errStream);
} catch (IOException e) {
- repLog.error(
- "Error updating HEAD of remote repository at {} to {}:\n"
- + " Exception: {}\n"
- + " Command: {}\n"
- + " Output: {}",
- uri,
- newHead,
- e,
- cmd,
- errStream);
+ repLog.atSevere().log(
+ "Error updating HEAD of remote repository at %s to %s:\n"
+ + " Exception: %s\n"
+ + " Command: %s\n"
+ + " Output: %s",
+ uri, newHead, e, cmd, errStream);
return false;
}
return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index b978952..8bbb180 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -45,6 +45,13 @@
boolean isDefaultForceUpdate();
/**
+ * Returns the interval in seconds for running task distribution.
+ *
+ * @return number of seconds, zero if never.
+ */
+ int getDistributionInterval();
+
+ /**
* Returns the maximum number of ref-specs to log into the replication_log whenever a push
* operation is completed against a replication end.
*
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index e99f6b1..2631cbe 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -47,9 +47,9 @@
try {
config.load();
} catch (ConfigInvalidException e) {
- repLog.error("Config file {} is invalid: {}", cfgPath, e.getMessage(), e);
+ repLog.atSevere().withCause(e).log("Config file %s is invalid: %s", cfgPath, e.getMessage());
} catch (IOException e) {
- repLog.error("Cannot read {}: {}", cfgPath, e.getMessage(), e);
+ repLog.atSevere().withCause(e).log("Cannot read %s: %s", cfgPath, e.getMessage());
}
this.replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
this.defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
@@ -86,6 +86,11 @@
}
@Override
+ public int getDistributionInterval() {
+ return config.getInt("replication", "distributionInterval", 0);
+ }
+
+ @Override
public int getMaxRefsToLog() {
return maxRefsToLog;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java
index fed09f9..60a9523 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java
@@ -28,6 +28,6 @@
systemLog,
serverInfo,
ReplicationQueue.REPLICATION_LOG_NAME,
- new PatternLayout("[%d] [%X{" + PushOne.ID_MDC_KEY + "}] %m%n"));
+ new PatternLayout("[%d] %m%n"));
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 5fb97bd..21e4227 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,6 +14,8 @@
package com.googlesource.gerrit.plugins.replication;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
@@ -26,6 +28,7 @@
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.util.logging.NamedFluentLogger;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
@@ -35,9 +38,9 @@
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.eclipse.jgit.transport.URIish;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/** Manages automatic replication to remote repositories. */
public class ReplicationQueue
@@ -47,10 +50,11 @@
ProjectDeletedListener,
HeadUpdatedListener {
static final String REPLICATION_LOG_NAME = "replication_log";
- static final Logger repLog = LoggerFactory.getLogger(REPLICATION_LOG_NAME);
+ static final NamedFluentLogger repLog = NamedFluentLogger.forName(REPLICATION_LOG_NAME);
private final ReplicationStateListener stateLog;
+ private final ReplicationConfig replConfig;
private final WorkQueue workQueue;
private final DynamicItem<EventDispatcher> dispatcher;
private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency
@@ -58,14 +62,17 @@
private volatile boolean running;
private volatile boolean replaying;
private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
+ private Distributor distributor;
@Inject
ReplicationQueue(
+ ReplicationConfig rc,
WorkQueue wq,
Provider<ReplicationDestinations> rd,
DynamicItem<EventDispatcher> dis,
ReplicationStateListeners sl,
ReplicationTasksStorage rts) {
+ replConfig = rc;
workQueue = wq;
dispatcher = dis;
destinations = rd;
@@ -82,15 +89,17 @@
replicationTasksStorage.resetAll();
firePendingEvents();
fireBeforeStartupEvents();
+ distributor = new Distributor(workQueue);
}
}
@Override
public void stop() {
running = false;
+ distributor.stop();
int discarded = destinations.get().shutdown();
if (discarded > 0) {
- repLog.warn("Canceled {} replication events during shutdown", discarded);
+ repLog.atWarning().log("Canceled %d replication events during shutdown", discarded);
}
}
@@ -111,17 +120,17 @@
@VisibleForTesting
public void scheduleFullSync(
Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
- fire(project, urlMatch, PushOne.ALL_REFS, state, now);
+ fire(project, urlMatch, PushOne.ALL_REFS, state, now, false);
}
@Override
public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
- fire(event.getProjectName(), event.getRefName());
+ fire(event.getProjectName(), event.getRefName(), false);
}
- private void fire(String projectName, String refName) {
+ private void fire(String projectName, String refName, boolean isPersisted) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
- fire(Project.nameKey(projectName), null, refName, state, false);
+ fire(Project.nameKey(projectName), null, refName, state, false, isPersisted);
state.markAllPushTasksScheduled();
}
@@ -130,7 +139,8 @@
String urlMatch,
String refName,
ReplicationState state,
- boolean now) {
+ boolean now,
+ boolean isPersisted) {
if (!running) {
stateLog.warn(
"Replication plugin did not finish startup before event, event replication is postponed",
@@ -140,13 +150,13 @@
}
for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
- pushReference(cfg, project, urlMatch, refName, state, now);
+ pushReference(cfg, project, urlMatch, refName, state, now, isPersisted);
}
}
@UsedAt(UsedAt.Project.COLLABNET)
public void pushReference(Destination cfg, Project.NameKey project, String refName) {
- pushReference(cfg, project, null, refName, null, true);
+ pushReference(cfg, project, null, refName, null, true, false);
}
private void pushReference(
@@ -155,19 +165,22 @@
String urlMatch,
String refName,
ReplicationState state,
- boolean now) {
+ boolean now,
+ boolean isPersisted) {
boolean withoutState = state == null;
if (withoutState) {
state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
}
if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
for (URIish uri : cfg.getURIs(project, urlMatch)) {
- replicationTasksStorage.create(
- new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+ if (!isPersisted) {
+ replicationTasksStorage.create(
+ new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+ }
cfg.schedule(project, refName, uri, state, now);
}
} else {
- repLog.debug("Skipping ref {} on project {}", refName, project.get());
+ repLog.atFine().log("Skipping ref %s on project %s", refName, project.get());
}
if (withoutState) {
state.markAllPushTasksScheduled();
@@ -182,8 +195,8 @@
for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
String eventKey = String.format("%s:%s", t.project, t.ref);
if (!eventsReplayed.contains(eventKey)) {
- repLog.info("Firing pending task {}", eventKey);
- fire(t.project, t.ref);
+ repLog.atInfo().log("Firing pending task %s", eventKey);
+ fire(t.project, t.ref, true);
eventsReplayed.add(eventKey);
}
}
@@ -192,6 +205,30 @@
}
}
+ private void pruneCompleted() {
+ // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes.
+ // We also cannot access them by taskId since PushOnes don't have a taskId, they do have
+ // and Id, but it not the id assigned to the task in the queues. The tasks in the queue
+ // do use the same name as returned by toString() though, so that be used to correlate
+ // PushOnes with queue tasks despite their wrappers.
+ Set<String> prunableTaskNames = new HashSet<>();
+ for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
+ prunableTaskNames.addAll(destination.getPrunableTaskNames());
+ }
+
+ for (WorkQueue.Task<?> task : workQueue.getTasks()) {
+ WorkQueue.Task.State state = task.getState();
+ if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) {
+ if (task instanceof WorkQueue.ProjectTask) {
+ if (prunableTaskNames.contains(task.toString())) {
+ repLog.atFine().log("Pruning externally completed task: %s", task);
+ task.cancel(false);
+ }
+ }
+ }
+ }
+ }
+
@Override
public void onProjectDeleted(ProjectDeletedListener.Event event) {
Project.NameKey p = Project.nameKey(event.getProjectName());
@@ -211,8 +248,8 @@
for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) {
String eventKey = String.format("%s:%s", event.projectName(), event.refName());
if (!eventsReplayed.contains(eventKey)) {
- repLog.info("Firing pending task {}", event);
- fire(event.projectName(), event.refName());
+ repLog.atInfo().log("Firing pending task %s", event);
+ fire(event.projectName(), event.refName(), false);
eventsReplayed.add(eventKey);
}
}
@@ -229,4 +266,49 @@
public abstract String refName();
}
+
+ protected class Distributor implements WorkQueue.CancelableRunnable {
+ public ScheduledThreadPoolExecutor executor;
+ public ScheduledFuture<?> future;
+
+ public Distributor(WorkQueue wq) {
+ int distributionInterval = replConfig.getDistributionInterval();
+ if (distributionInterval > 0) {
+ executor = wq.createQueue(1, "Replication Distribution", false);
+ future =
+ executor.scheduleWithFixedDelay(
+ this, distributionInterval, distributionInterval, SECONDS);
+ }
+ }
+
+ @Override
+ public void run() {
+ if (!running) {
+ return;
+ }
+ try {
+ firePendingEvents();
+ pruneCompleted();
+ } catch (Exception e) {
+ repLog.atSevere().withCause(e).log("error distributing tasks");
+ }
+ }
+
+ @Override
+ public void cancel() {
+ future.cancel(true);
+ }
+
+ public void stop() {
+ if (executor != null) {
+ cancel();
+ executor.getQueue().remove(this);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Replication Distributor";
+ }
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
index f2d55de..3e73033 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
@@ -31,19 +31,19 @@
@Override
public void warn(String msg, ReplicationState... states) {
stateWriteErr("Warning: " + msg, states);
- repLog.warn(msg);
+ repLog.atWarning().log(msg);
}
@Override
public void error(String msg, ReplicationState... states) {
stateWriteErr("Error: " + msg, states);
- repLog.error(msg);
+ repLog.atSevere().log(msg);
}
@Override
public void error(String msg, Throwable t, ReplicationState... states) {
stateWriteErr("Error: " + msg, states);
- repLog.error(msg, t);
+ repLog.atSevere().withCause(t).log(msg);
}
private void stateWriteErr(String msg, ReplicationState[] states) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index d15f02d..5af0983 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -26,6 +26,7 @@
import java.io.IOException;
import java.nio.file.DirectoryIteratorException;
import java.nio.file.DirectoryStream;
+import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
@@ -48,13 +49,19 @@
* task:
*
* <p><code>
- * .../building/<tmp_name> new replication tasks under construction
- * .../running/<sha1> running replication tasks
- * .../waiting/<sha1> outstanding replication tasks
+ * .../building/<tmp_name> new replication tasks under construction
+ * .../running/<uri_sha1>/ lock for URI
+ * .../running/<uri_sha1>/<task_sha1> running replication tasks
+ * .../waiting/<task_sha1_NN_shard>/<task_sha1> outstanding replication tasks
* </code>
*
+ * <p>The URI lock is acquired by creating the directory and released by removing it.
+ *
* <p>Tasks are moved atomically via a rename between those directories to indicate the current
* state of each task.
+ *
+ * <p>Note: The .../waiting/<task_sha1_NN_shard> directories are never removed. This helps prevent
+ * failures when moving tasks to and from the shard directories from different hosts concurrently.
*/
@Singleton
public class ReplicationTasksStorage {
@@ -62,26 +69,50 @@
private boolean disableDeleteForTesting;
- public static class ReplicateRefUpdate {
- public final String project;
+ public static class ReplicateRefUpdate extends UriUpdate {
public final String ref;
- public final String uri;
- public final String remote;
- public ReplicateRefUpdate(PushOne push, String ref) {
- this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName());
+ public ReplicateRefUpdate(UriUpdate update, String ref) {
+ this(update.project, ref, update.uri, update.remote);
}
public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
- this.project = project;
+ this(project, ref, uri.toASCIIString(), remote);
+ }
+
+ protected ReplicateRefUpdate(String project, String ref, String uri, String remote) {
+ super(project, uri, remote);
this.ref = ref;
- this.uri = uri.toASCIIString();
+ }
+
+ @Override
+ public String toString() {
+ return "ref-update " + ref + " (" + super.toString() + ")";
+ }
+ }
+
+ public static class UriUpdate {
+ public final String project;
+ public final String uri;
+ public final String remote;
+
+ public UriUpdate(PushOne push) {
+ this(push.getProjectNameKey().get(), push.getURI(), push.getRemoteName());
+ }
+
+ public UriUpdate(String project, URIish uri, String remote) {
+ this(project, uri.toASCIIString(), remote);
+ }
+
+ public UriUpdate(String project, String uri, String remote) {
+ this.project = project;
+ this.uri = uri;
this.remote = remote;
}
@Override
public String toString() {
- return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
+ return "uri-update " + project + " uri:" + uri + " remote:" + remote;
}
}
@@ -109,28 +140,65 @@
this.disableDeleteForTesting = deleteDisabled;
}
- public synchronized void start(PushOne push) {
- for (String ref : push.getRefs()) {
- new Task(new ReplicateRefUpdate(push, ref)).start();
+ public synchronized boolean start(PushOne push) {
+ UriLock lock = new UriLock(push);
+ if (!lock.acquire()) {
+ return false;
}
+
+ boolean started = false;
+ for (String ref : push.getRefs()) {
+ started = new Task(lock, ref).start() || started;
+ }
+
+ if (!started) { // No tasks left, likely replicated externally
+ lock.release();
+ }
+ return started;
}
public synchronized void reset(PushOne push) {
+ UriLock lock = new UriLock(push);
for (String ref : push.getRefs()) {
- new Task(new ReplicateRefUpdate(push, ref)).reset();
+ new Task(lock, ref).reset();
}
+ lock.release();
}
public synchronized void resetAll() {
- for (ReplicateRefUpdate r : listRunning()) {
- new Task(r).reset();
+ try (DirectoryStream<Path> dirs = Files.newDirectoryStream(createDir(runningUpdates))) {
+ for (Path dir : dirs) {
+ UriLock lock = null;
+ try {
+ for (ReplicateRefUpdate u : list(dir)) {
+ if (lock == null) {
+ lock = new UriLock(u);
+ }
+ new Task(u).reset();
+ }
+ } catch (DirectoryIteratorException d) {
+ // iterating over the sub-directories is expected to have dirs disappear
+ Nfs.throwIfNotStaleFileHandle(d.getCause());
+ }
+ if (lock != null) {
+ lock.release();
+ }
+ }
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while aborting running tasks");
}
}
- public synchronized void finish(PushOne push) {
- for (String ref : push.getRefs()) {
- new Task(new ReplicateRefUpdate(push, ref)).finish();
+ public boolean isWaiting(PushOne push) {
+ return push.getRefs().stream().map(ref -> new Task(push, ref)).anyMatch(Task::isWaiting);
+ }
+
+ public void finish(PushOne push) {
+ UriLock lock = new UriLock(push);
+ for (ReplicateRefUpdate r : list(lock.runningDir)) {
+ new Task(lock, r.ref).finish();
}
+ lock.release();
}
public synchronized List<ReplicateRefUpdate> listWaiting() {
@@ -195,20 +263,73 @@
}
}
+ private class UriLock {
+ public final UriUpdate update;
+ public final String uriKey;
+ public final Path runningDir;
+
+ public UriLock(PushOne push) {
+ this(new UriUpdate(push));
+ }
+
+ public UriLock(UriUpdate update) {
+ this.update = update;
+ uriKey = sha1(update.uri).name();
+ runningDir = createDir(runningUpdates).resolve(uriKey);
+ }
+
+ public boolean acquire() {
+ try {
+ logger.atFine().log("MKDIR %s %s", runningDir, updateLog());
+ Files.createDirectory(runningDir);
+ return true;
+ } catch (FileAlreadyExistsException e) {
+ return false; // already running, likely externally
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while starting uri %s", uriKey);
+ return true; // safer to risk a duplicate than to skip it
+ }
+ }
+
+ public void release() {
+ try {
+ if (disableDeleteForTesting && Files.list(runningDir).findFirst().isPresent()) {
+ logger.atFine().log("DELETE %s %s DISABLED", runningDir, updateLog());
+ return;
+ }
+
+ logger.atFine().log("DELETE %s %s", runningDir, updateLog());
+ Files.delete(runningDir);
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while releasing uri %s", uriKey);
+ }
+ }
+
+ private String updateLog() {
+ return String.format("(%s => %s)", update.project, update.uri);
+ }
+ }
+
private class Task {
public final ReplicateRefUpdate update;
- public final String json;
public final String taskKey;
public final Path running;
public final Path waiting;
- public Task(ReplicateRefUpdate update) {
- this.update = update;
- json = GSON.toJson(update) + "\n";
+ public Task(ReplicateRefUpdate r) {
+ this(new UriLock(r), r.ref);
+ }
+
+ public Task(PushOne push, String ref) {
+ this(new UriLock(push), ref);
+ }
+
+ public Task(UriLock lock, String ref) {
+ update = new ReplicateRefUpdate(lock.update, ref);
String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
taskKey = sha1(key).name();
- running = createDir(runningUpdates).resolve(taskKey);
- waiting = createDir(waitingUpdates).resolve(taskKey);
+ running = lock.runningDir.resolve(taskKey);
+ waiting = createDir(waitingUpdates.resolve(taskKey.substring(0, 2))).resolve(taskKey);
}
public String create() {
@@ -216,6 +337,7 @@
return taskKey;
}
+ String json = GSON.toJson(update) + "\n";
try {
Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null);
logger.atFine().log("CREATE %s %s", tmp, updateLog());
@@ -228,14 +350,19 @@
return taskKey;
}
- public void start() {
+ public boolean start() {
rename(waiting, running);
+ return Files.exists(running);
}
public void reset() {
rename(running, waiting);
}
+ public boolean isWaiting() {
+ return Files.exists(waiting);
+ }
+
public void finish() {
if (disableDeleteForTesting) {
logger.atFine().log("DELETE %s %s DISABLED", running, updateLog());
@@ -246,7 +373,7 @@
logger.atFine().log("DELETE %s %s", running, updateLog());
Files.delete(running);
} catch (IOException e) {
- logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+ logger.atSevere().withCause(e).log("Error while finishing task %s", taskKey);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
index bcb1e2f..f7071d8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
@@ -27,6 +27,10 @@
return new RunwayStatus(false, inFlightPushId);
}
+ public static RunwayStatus deniedExternal() {
+ return new RunwayStatus(false, -1);
+ }
+
private final boolean allowed;
private final int inFlightPushId;
@@ -43,6 +47,10 @@
return !allowed && inFlightPushId == 0;
}
+ public boolean isExternalInflight() {
+ return !allowed && inFlightPushId == -1;
+ }
+
public int getInFlightPushId() {
return inFlightPushId;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
index ffa6be1..fdbd5e7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
@@ -58,7 +58,8 @@
return;
}
- repLog.warn("Cannot update HEAD of project {} on remote site {}.", project, replicateURI);
+ repLog.atWarning().log(
+ "Cannot update HEAD of project %s on remote site %s.", project, replicateURI);
}
@Override
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 0aad73b..fc3352d 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -94,6 +94,20 @@
: Timeout for SSH connections. If 0, there is no timeout and
the client waits indefinitely. By default, 2 minutes.
+replication.distributionInterval
+: Interval in seconds for running the replication distributor. When
+ run, the replication distributor will add all persisted waiting tasks
+ to the queue to ensure that externally loaded tasks are visible to
+ the current process. If zero, turn off the replication distributor. By
+ default, zero.
+
+ Turning this on is likely only useful when there are other processes
+ (such as other masters in the same cluster) writing to the same
+ persistence store. To ensure that updates are seen well before their
+ replicationDelay expires when the distributor is used, the recommended
+ value for this is approximately the smallest remote.NAME.replicationDelay
+ divided by 5.
+
replication.lockErrorMaxRetries
: Number of times to retry a replication operation if a lock
error is detected.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index c09fcd1..94f0dc4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -41,6 +41,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -245,9 +246,9 @@
return push;
}
- private void setupProjectCacheMock() throws IOException {
+ private void setupProjectCacheMock() {
projectCacheMock = mock(ProjectCache.class);
- when(projectCacheMock.checkedGet(projectNameKey)).thenReturn(projectStateMock);
+ when(projectCacheMock.get(projectNameKey)).thenReturn(Optional.of(projectStateMock));
}
private void setupTransportMock() throws NotSupportedException, TransportException {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
index efacae7..79b05cc 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
@@ -65,4 +65,25 @@
assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
}
+
+ @Test
+ public void shouldSkipFetchRefSpecs() throws Exception {
+ FileBasedConfig config = newReplicationConfig();
+ String pushRemote = "pushRemote";
+ final String aRemoteURL = "ssh://somewhere/${name}.git";
+ config.setString("remote", pushRemote, "url", aRemoteURL);
+
+ String fetchRemote = "fetchRemote";
+ config.setString("remote", fetchRemote, "url", aRemoteURL);
+ config.setString("remote", fetchRemote, "fetch", "refs/*:refs/*");
+ config.save();
+
+ DestinationsCollection destinationsCollections =
+ newDestinationsCollections(newReplicationFileBasedConfig());
+ destinationsCollections.startup(workQueueMock);
+ List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
+ assertThat(destinations).hasSize(1);
+
+ assertThatIsDestination(destinations.get(0), pushRemote, aRemoteURL);
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 3eb6baf..6ad5558 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -35,6 +35,8 @@
import com.google.gerrit.server.config.SitePaths;
import com.google.inject.Inject;
import com.google.inject.Key;
+import com.googlesource.gerrit.plugins.replication.Destination.QueueInfo;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.io.IOException;
import java.nio.file.DirectoryStream;
@@ -51,6 +53,7 @@
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.URIish;
import org.eclipse.jgit.util.FS;
import org.junit.Test;
@@ -63,16 +66,24 @@
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final int TEST_REPLICATION_DELAY = 1;
private static final int TEST_REPLICATION_RETRY = 1;
+ private static final int TEST_REPLICATION_MAX_RETRIES = 1;
private static final Duration TEST_TIMEOUT =
Duration.ofSeconds((TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) + 1);
+ private static final Duration MAX_RETRY_WITH_TOLERANCE_TIMEOUT =
+ Duration.ofSeconds(
+ (TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) * TEST_REPLICATION_MAX_RETRIES
+ + 10);
+
@Inject private SitePaths sitePaths;
@Inject private ProjectOperations projectOperations;
@Inject private DynamicSet<ProjectDeletedListener> deletedListeners;
+ private DestinationsCollection destinationCollection;
private Path pluginDataDir;
private Path gitPath;
private Path storagePath;
private FileBasedConfig config;
+ private ReplicationConfig replicationConfig;
private ReplicationTasksStorage tasksStorage;
@Override
@@ -90,8 +101,10 @@
super.setUpTestPlugin();
pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
+ replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class);
storagePath = pluginDataDir.resolve("ref-updates");
tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+ destinationCollection = plugin.getSysInjector().getInstance(DestinationsCollection.class);
cleanupReplicationTasks();
tasksStorage.disableDeleteForTesting(true);
}
@@ -400,6 +413,59 @@
waitUntil(() -> tasksStorage.listRunning().size() == 0);
}
+ @Test
+ public void shouldCleanupBothTasksAndLocksAfterNewProjectReplication() throws Exception {
+ tasksStorage.disableDeleteForTesting(false);
+ setReplicationDestination("task_cleanup_locks_project", "replica", ALL_PROJECTS);
+ config.setInt("remote", "task_cleanup_locks_project", "replicationRetry", 0);
+ config.save();
+ reloadConfig();
+ assertThat(tasksStorage.listRunning()).hasSize(0);
+ Project.NameKey sourceProject = createTestProject("task_cleanup_locks_project");
+
+ waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")));
+ waitUntil(() -> isTaskCleanedUp());
+ }
+
+ @Test
+ public void shouldCleanupBothTasksAndLocksAfterReplicationCancelledAfterMaxRetries()
+ throws Exception {
+ String projectName = "task_cleanup_locks_project_cancelled";
+ String remoteDestination = "http://invalidurl:9090/";
+ URIish urish = new URIish(remoteDestination + projectName + ".git");
+ tasksStorage.disableDeleteForTesting(false);
+
+ setReplicationDestination(projectName, "replica", Optional.of(projectName));
+ // replace correct urls with invalid one to trigger retry
+ config.setString("remote", projectName, "url", remoteDestination + "${name}.git");
+ config.setInt("remote", projectName, "replicationMaxRetries", TEST_REPLICATION_MAX_RETRIES);
+ config.save();
+ reloadConfig();
+ Destination destination =
+ destinationCollection.getAll(FilterType.ALL).stream()
+ .filter(dest -> dest.getProjects().contains(projectName))
+ .findFirst()
+ .get();
+
+ waitUntil(() -> tasksStorage.listRunning().size() == 0);
+
+ createTestProject(projectName);
+
+ waitUntil(() -> isTaskRescheduled(destination.getQueueInfo(), urish));
+ // replicationRetry is set to 1 minute which is the minimum value. That's why
+ // should be safe to get the pushOne object from pending because it should be
+ // here for one minute
+ PushOne pushOp = destination.getQueueInfo().pending.get(urish);
+
+ WaitUtil.waitUntil(() -> pushOp.wasCanceled(), MAX_RETRY_WITH_TOLERANCE_TIMEOUT);
+ waitUntil(() -> isTaskCleanedUp());
+ }
+
+ private boolean isTaskRescheduled(QueueInfo queue, URIish uri) {
+ PushOne pushOne = queue.pending.get(uri);
+ return pushOne == null ? false : pushOne.isRetrying();
+ }
+
private Ref getRef(Repository repo, String branchName) throws IOException {
return repo.getRefDatabase().exactRef(branchName);
}
@@ -499,6 +565,16 @@
}
}
+ private boolean isTaskCleanedUp() {
+ Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates");
+ Path runningUpdates = refUpdates.resolve("running");
+ try {
+ return Files.list(runningUpdates).count() == 0;
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
private boolean nonEmptyProjectExists(Project.NameKey name) {
try (Repository r = repoManager.openRepository(name)) {
return !r.getAllRefsByPeeledObjectId().isEmpty();