Merge branch 'stable-3.1'
* stable-3.1:
Fix ReplicationIT flakiness by listing all persistent tasks
Revert "ReplicationIT: Retry/timeout on assertion of replication tasks count"
Revert "ReplicationIT: Increase timeout for tests"
Change-Id: I67824461340b6a693df18de59e11572ceed8ed7a
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 4c07f40..c043baf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -66,6 +66,11 @@
}
@Override
+ public int getDistributionInterval() {
+ return currentConfig.getDistributionInterval();
+ }
+
+ @Override
public synchronized int getMaxRefsToLog() {
return currentConfig.getMaxRefsToLog();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 92b07ed..1524c2a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -64,6 +64,7 @@
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -563,7 +564,9 @@
if (inFlightOp != null) {
return RunwayStatus.denied(inFlightOp.getId());
}
- replicationTasksStorage.get().start(op);
+ if (!replicationTasksStorage.get().start(op)) {
+ return RunwayStatus.deniedExternal();
+ }
inFlight.put(op.getURI(), op);
}
return RunwayStatus.allowed();
@@ -576,6 +579,17 @@
}
}
+ public Set<String> getPrunableTaskNames() {
+ Set<String> names = new HashSet<>();
+ for (PushOne push : pending.values()) {
+ if (!replicationTasksStorage.get().isWaiting(push)) {
+ repLog.debug("No longer isWaiting, can prune " + push.getURI());
+ names.add(push.toString());
+ }
+ }
+ return names;
+ }
+
boolean wouldPushProject(Project.NameKey project) {
if (!shouldReplicate(project)) {
return false;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index b46c278..df6cc42 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -319,6 +319,8 @@
if (!status.isAllowed()) {
if (status.isCanceled()) {
repLog.info("PushOp for replication to {} was canceled and thus won't be rescheduled", uri);
+ } else if (status.isExternalInflight()) {
+ repLog.info("PushOp for replication to {} was denied externally", uri);
} else {
repLog.info(
"Rescheduling replication to {} to avoid collision with the in-flight push [{}].",
@@ -522,7 +524,11 @@
}
local = n;
}
- local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build());
+ local =
+ forProject
+ .filter(local.values(), git, RefFilterOptions.builder().setFilterMeta(true).build())
+ .stream()
+ .collect(toMap(Ref::getName, r -> r));
}
List<RemoteRefUpdate> remoteUpdatesList =
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index b981bc8..68fe430 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -44,6 +44,13 @@
boolean isDefaultForceUpdate();
/**
+ * Returns the interval in seconds for running task distribution.
+ *
+ * @return number of seconds, zero if never.
+ */
+ int getDistributionInterval();
+
+ /**
* Returns the maximum number of ref-specs to log into the replication_log whenever a push
* operation is completed against a replication end.
*
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 554e441..d714376 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -76,6 +76,11 @@
}
@Override
+ public int getDistributionInterval() {
+ return config.getInt("replication", "distributionInterval", 0);
+ }
+
+ @Override
public int getMaxRefsToLog() {
return maxRefsToLog;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index e9a60e4..0dcfc95 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,6 +14,8 @@
package com.googlesource.gerrit.plugins.replication;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
@@ -34,6 +36,8 @@
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.eclipse.jgit.transport.URIish;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +54,7 @@
private final ReplicationStateListener stateLog;
+ private final ReplicationConfig replConfig;
private final WorkQueue workQueue;
private final DynamicItem<EventDispatcher> dispatcher;
private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency
@@ -57,14 +62,17 @@
private volatile boolean running;
private volatile boolean replaying;
private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
+ private Distributor distributor;
@Inject
ReplicationQueue(
+ ReplicationConfig rc,
WorkQueue wq,
Provider<ReplicationDestinations> rd,
DynamicItem<EventDispatcher> dis,
ReplicationStateListeners sl,
ReplicationTasksStorage rts) {
+ replConfig = rc;
workQueue = wq;
dispatcher = dis;
destinations = rd;
@@ -81,12 +89,14 @@
replicationTasksStorage.resetAll();
firePendingEvents();
fireBeforeStartupEvents();
+ distributor = new Distributor(workQueue);
}
}
@Override
public void stop() {
running = false;
+ distributor.stop();
int discarded = destinations.get().shutdown();
if (discarded > 0) {
repLog.warn("Canceled {} replication events during shutdown", discarded);
@@ -110,17 +120,17 @@
@VisibleForTesting
public void scheduleFullSync(
Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
- fire(project, urlMatch, PushOne.ALL_REFS, state, now);
+ fire(project, urlMatch, PushOne.ALL_REFS, state, now, false);
}
@Override
public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
- fire(event.getProjectName(), event.getRefName());
+ fire(event.getProjectName(), event.getRefName(), false);
}
- private void fire(String projectName, String refName) {
+ private void fire(String projectName, String refName, boolean isPersisted) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
- fire(Project.nameKey(projectName), null, refName, state, false);
+ fire(Project.nameKey(projectName), null, refName, state, false, isPersisted);
state.markAllPushTasksScheduled();
}
@@ -129,7 +139,8 @@
String urlMatch,
String refName,
ReplicationState state,
- boolean now) {
+ boolean now,
+ boolean isPersisted) {
if (!running) {
stateLog.warn(
"Replication plugin did not finish startup before event, event replication is postponed",
@@ -141,8 +152,10 @@
for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
for (URIish uri : cfg.getURIs(project, urlMatch)) {
- replicationTasksStorage.create(
- new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+ if (!isPersisted) {
+ replicationTasksStorage.create(
+ new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+ }
cfg.schedule(project, refName, uri, state, now);
}
}
@@ -158,7 +171,7 @@
String eventKey = String.format("%s:%s", t.project, t.ref);
if (!eventsReplayed.contains(eventKey)) {
repLog.info("Firing pending task {}", eventKey);
- fire(t.project, t.ref);
+ fire(t.project, t.ref, true);
eventsReplayed.add(eventKey);
}
}
@@ -167,6 +180,30 @@
}
}
+ private void pruneCompleted() {
+ // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes.
+ // We also cannot access them by taskId since PushOnes don't have a taskId, they do have
+ // and Id, but it not the id assigned to the task in the queues. The tasks in the queue
+ // do use the same name as returned by toString() though, so that be used to correlate
+ // PushOnes with queue tasks despite their wrappers.
+ Set<String> prunableTaskNames = new HashSet<>();
+ for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
+ prunableTaskNames.addAll(destination.getPrunableTaskNames());
+ }
+
+ for (WorkQueue.Task<?> task : workQueue.getTasks()) {
+ WorkQueue.Task.State state = task.getState();
+ if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) {
+ if (task instanceof WorkQueue.ProjectTask) {
+ if (prunableTaskNames.contains(task.toString())) {
+ repLog.debug("Pruning externally completed task:" + task);
+ task.cancel(false);
+ }
+ }
+ }
+ }
+ }
+
@Override
public void onProjectDeleted(ProjectDeletedListener.Event event) {
Project.NameKey p = Project.nameKey(event.getProjectName());
@@ -187,7 +224,7 @@
String eventKey = String.format("%s:%s", event.projectName(), event.refName());
if (!eventsReplayed.contains(eventKey)) {
repLog.info("Firing pending task {}", event);
- fire(event.projectName(), event.refName());
+ fire(event.projectName(), event.refName(), false);
eventsReplayed.add(eventKey);
}
}
@@ -204,4 +241,49 @@
public abstract String refName();
}
+
+ protected class Distributor implements WorkQueue.CancelableRunnable {
+ public ScheduledThreadPoolExecutor executor;
+ public ScheduledFuture<?> future;
+
+ public Distributor(WorkQueue wq) {
+ int distributionInterval = replConfig.getDistributionInterval();
+ if (distributionInterval > 0) {
+ executor = wq.createQueue(1, "Replication Distribution", false);
+ future =
+ executor.scheduleWithFixedDelay(
+ this, distributionInterval, distributionInterval, SECONDS);
+ }
+ }
+
+ @Override
+ public void run() {
+ if (!running) {
+ return;
+ }
+ try {
+ firePendingEvents();
+ pruneCompleted();
+ } catch (Exception e) {
+ repLog.error("error distributing tasks", e);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ future.cancel(true);
+ }
+
+ public void stop() {
+ if (executor != null) {
+ cancel();
+ executor.getQueue().remove(this);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Replication Distributor";
+ }
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 5564925..e827d4f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -25,6 +25,7 @@
import com.google.inject.Singleton;
import java.io.IOException;
import java.nio.file.DirectoryStream;
+import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
@@ -46,11 +47,14 @@
* task:
*
* <p><code>
- * .../building/<tmp_name> new replication tasks under construction
- * .../running/<sha1> running replication tasks
- * .../waiting/<sha1> outstanding replication tasks
+ * .../building/<tmp_name> new replication tasks under construction
+ * .../running/<uri_sha1>/ lock for URI
+ * .../running/<uri_sha1>/<task_sha1> running replication tasks
+ * .../waiting/<task_sha1> outstanding replication tasks
* </code>
*
+ * <p>The URI lock is acquired by creating the directory and released by removing it.
+ *
* <p>Tasks are moved atomically via a rename between those directories to indicate the current
* state of each task.
*/
@@ -60,26 +64,50 @@
private boolean disableDeleteForTesting;
- public static class ReplicateRefUpdate {
- public final String project;
+ public static class ReplicateRefUpdate extends UriUpdate {
public final String ref;
- public final String uri;
- public final String remote;
- public ReplicateRefUpdate(PushOne push, String ref) {
- this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName());
+ public ReplicateRefUpdate(UriUpdate update, String ref) {
+ this(update.project, ref, update.uri, update.remote);
}
public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
- this.project = project;
+ this(project, ref, uri.toASCIIString(), remote);
+ }
+
+ protected ReplicateRefUpdate(String project, String ref, String uri, String remote) {
+ super(project, uri, remote);
this.ref = ref;
- this.uri = uri.toASCIIString();
+ }
+
+ @Override
+ public String toString() {
+ return "ref-update " + ref + " (" + super.toString() + ")";
+ }
+ }
+
+ public static class UriUpdate {
+ public final String project;
+ public final String uri;
+ public final String remote;
+
+ public UriUpdate(PushOne push) {
+ this(push.getProjectNameKey().get(), push.getURI(), push.getRemoteName());
+ }
+
+ public UriUpdate(String project, URIish uri, String remote) {
+ this(project, uri.toASCIIString(), remote);
+ }
+
+ public UriUpdate(String project, String uri, String remote) {
+ this.project = project;
+ this.uri = uri;
this.remote = remote;
}
@Override
public String toString() {
- return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
+ return "uri-update " + project + " uri:" + uri + " remote:" + remote;
}
}
@@ -106,28 +134,60 @@
this.disableDeleteForTesting = deleteDisabled;
}
- public void start(PushOne push) {
- for (String ref : push.getRefs()) {
- new Task(new ReplicateRefUpdate(push, ref)).start();
+ public boolean start(PushOne push) {
+ UriLock lock = new UriLock(push);
+ if (!lock.acquire()) {
+ return false;
}
+
+ boolean started = false;
+ for (String ref : push.getRefs()) {
+ started = new Task(lock, ref).start() || started;
+ }
+
+ if (!started) { // No tasks left, likely replicated externally
+ lock.release();
+ }
+ return started;
}
public void reset(PushOne push) {
+ UriLock lock = new UriLock(push);
for (String ref : push.getRefs()) {
- new Task(new ReplicateRefUpdate(push, ref)).reset();
+ new Task(lock, ref).reset();
}
+ lock.release();
}
public void resetAll() {
- for (ReplicateRefUpdate r : list(createDir(runningUpdates))) {
- new Task(r).reset();
+ try (DirectoryStream<Path> dirs = Files.newDirectoryStream(createDir(runningUpdates))) {
+ for (Path dir : dirs) {
+ UriLock lock = null;
+ for (ReplicateRefUpdate u : list(dir)) {
+ if (lock == null) {
+ lock = new UriLock(u);
+ }
+ new Task(u).reset();
+ }
+ if (lock != null) {
+ lock.release();
+ }
+ }
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while aborting running tasks");
}
}
+ public boolean isWaiting(PushOne push) {
+ return push.getRefs().stream().map(ref -> new Task(push, ref)).anyMatch(Task::isWaiting);
+ }
+
public void finish(PushOne push) {
- for (String ref : push.getRefs()) {
- new Task(new ReplicateRefUpdate(push, ref)).finish();
+ UriLock lock = new UriLock(push);
+ for (ReplicateRefUpdate r : list(lock.runningDir)) {
+ new Task(lock, r.ref).finish();
}
+ lock.release();
}
public List<ReplicateRefUpdate> listWaiting() {
@@ -172,6 +232,53 @@
}
}
+ private class UriLock {
+ public final UriUpdate update;
+ public final String uriKey;
+ public final Path runningDir;
+
+ public UriLock(PushOne push) {
+ this(new UriUpdate(push));
+ }
+
+ public UriLock(UriUpdate update) {
+ this.update = update;
+ uriKey = sha1(update.uri).name();
+ runningDir = createDir(runningUpdates).resolve(uriKey);
+ }
+
+ public boolean acquire() {
+ try {
+ logger.atFine().log("MKDIR %s %s", runningDir, updateLog());
+ Files.createDirectory(runningDir);
+ return true;
+ } catch (FileAlreadyExistsException e) {
+ return false; // already running, likely externally
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while starting uri %s", uriKey);
+ return true; // safer to risk a duplicate than to skip it
+ }
+ }
+
+ public void release() {
+ if (disableDeleteForTesting) {
+ logger.atFine().log("DELETE %s %s DISABLED", runningDir, updateLog());
+ return;
+ }
+
+ try {
+ logger.atFine().log("DELETE %s %s", runningDir, updateLog());
+ Files.delete(runningDir);
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while releasing uri %s", uriKey);
+ }
+ }
+
+ private String updateLog() {
+ return String.format("(%s => %s)", update.project, update.uri);
+ }
+ }
+
private class Task {
public final ReplicateRefUpdate update;
public final String json;
@@ -179,12 +286,20 @@
public final Path running;
public final Path waiting;
- public Task(ReplicateRefUpdate update) {
- this.update = update;
+ public Task(ReplicateRefUpdate r) {
+ this(new UriLock(r), r.ref);
+ }
+
+ public Task(PushOne push, String ref) {
+ this(new UriLock(push), ref);
+ }
+
+ public Task(UriLock lock, String ref) {
+ update = new ReplicateRefUpdate(lock.update, ref);
json = GSON.toJson(update) + "\n";
String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
taskKey = sha1(key).name();
- running = createDir(runningUpdates).resolve(taskKey);
+ running = lock.runningDir.resolve(taskKey);
waiting = createDir(waitingUpdates).resolve(taskKey);
}
@@ -205,14 +320,19 @@
return taskKey;
}
- public void start() {
+ public boolean start() {
rename(waiting, running);
+ return Files.exists(running);
}
public void reset() {
rename(running, waiting);
}
+ public boolean isWaiting() {
+ return Files.exists(waiting);
+ }
+
public void finish() {
if (disableDeleteForTesting) {
logger.atFine().log("DELETE %s %s DISABLED", running, updateLog());
@@ -223,7 +343,7 @@
logger.atFine().log("DELETE %s %s", running, updateLog());
Files.delete(running);
} catch (IOException e) {
- logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+ logger.atSevere().withCause(e).log("Error while finishing task %s", taskKey);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
index bcb1e2f..f7071d8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
@@ -27,6 +27,10 @@
return new RunwayStatus(false, inFlightPushId);
}
+ public static RunwayStatus deniedExternal() {
+ return new RunwayStatus(false, -1);
+ }
+
private final boolean allowed;
private final int inFlightPushId;
@@ -43,6 +47,10 @@
return !allowed && inFlightPushId == 0;
}
+ public boolean isExternalInflight() {
+ return !allowed && inFlightPushId == -1;
+ }
+
public int getInFlightPushId() {
return inFlightPushId;
}
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 69a371b..adb8d4c 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -24,7 +24,7 @@
* `refs/users/*` (user branches)
* `refs/meta/external-ids` (external IDs)
-* `refs/starred-changes/*` (star labels)
+* `refs/starred-changes/*` (star labels, not needed for Gerrit slaves)
* `refs/sequences/accounts` (account sequence numbers, not needed for Gerrit
slaves)
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 7233061..19a478d 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -94,6 +94,20 @@
: Timeout for SSH connections. If 0, there is no timeout and
the client waits indefinitely. By default, 2 minutes.
+replication.distributionInterval
+: Interval in seconds for running the replication distributor. When
+ run, the replication distributor will add all persisted waiting tasks
+ to the queue to ensure that externally loaded tasks are visible to
+ the current process. If zero, turn off the replication distributor. By
+ default, zero.
+
+ Turning this on is likely only useful when there are other processes
+ (such as other masters in the same cluster) writing to the same
+ persistence store. To ensure that updates are seen well before their
+ replicationDelay expires when the distributor is used, the recommended
+ value for this is approximately the smallest remote.NAME.replicationDelay
+ divided by 5.
+
replication.lockErrorMaxRetries
: Number of times to retry a replication operation if a lock
error is detected.