Merge branch 'stable-3.5' into stable-3.8
* stable-3.5:
TasksStorage: Remove synchronized from methods
Place the replaying flag clearing in a finally
Demote delete errors when distributor is enabled
distributor: Reduce log level for no-op consolidations
Change-Id: Iaa6d4a59d5cb26879500f2d36852eaa30efc5d7b
Release-Notes: skip
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 04af9db..1f01e2a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -392,8 +392,23 @@
schedule(project, refs, uri, state, false);
}
+ void scheduleFromStorage(
+ Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state) {
+ schedule(project, refs, uri, state, false, true);
+ }
+
void schedule(
Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state, boolean now) {
+ schedule(project, refs, uri, state, now, false);
+ }
+
+ void schedule(
+ Project.NameKey project,
+ Set<String> refs,
+ URIish uri,
+ ReplicationState state,
+ boolean now,
+ boolean fromStorage) {
Set<String> refsToSchedule = new HashSet<>();
for (String ref : refs) {
if (!shouldReplicate(project, ref, state)) {
@@ -443,11 +458,14 @@
"scheduled %s:%s => %s to run %s",
project, refsToSchedule, task, now ? "now" : "after " + config.getDelay() + "s");
} else {
- addRefs(task, ImmutableSet.copyOf(refsToSchedule));
+ boolean added = addRefs(task, ImmutableSet.copyOf(refsToSchedule));
task.addState(refsToSchedule, state);
- repLog.atInfo().log(
- "consolidated %s:%s => %s with an existing pending push",
- project, refsToSchedule, task);
+ String message = "consolidated %s:%s => %s with an existing pending push";
+ if (added || !fromStorage) {
+ repLog.atInfo().log(message, project, refsToSchedule, task);
+ } else {
+ repLog.atFine().log(message, project, refsToSchedule, task);
+ }
}
for (String ref : refsToSchedule) {
state.increasePushTaskCount(project.get(), ref);
@@ -486,9 +504,10 @@
pool.schedule(updateHeadFactory.create(uri, project, newHead), 0, TimeUnit.SECONDS);
}
- private void addRefs(PushOne e, ImmutableSet<String> refs) {
- e.addRefBatch(refs);
+ private boolean addRefs(PushOne e, ImmutableSet<String> refs) {
+ boolean added = e.addRefBatch(refs);
postReplicationScheduledEvent(e, refs);
+ return added;
}
/**
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 6feba66..f9947ae 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -287,14 +287,20 @@
return uri;
}
- void addRefBatch(ImmutableSet<String> refBatch) {
+ /** Returns false if all refs were already included in the push, true otherwise */
+ boolean addRefBatch(ImmutableSet<String> refBatch) {
if (refBatch.size() == 1 && refBatch.contains(ALL_REFS)) {
refBatchesToPush.clear();
+ boolean pushAllRefsChanged = !pushAllRefs;
pushAllRefs = true;
repLog.atFinest().log("Added all refs for replication to %s", uri);
- } else if (!pushAllRefs && refBatchesToPush.add(refBatch)) {
- repLog.atFinest().log("Added ref %s for replication to %s", refBatch, uri);
+ return pushAllRefsChanged;
}
+ if (!pushAllRefs && refBatchesToPush.add(refBatch)) {
+ repLog.atFinest().log("Added ref %s for replication to %s", refBatch, uri);
+ return true;
+ }
+ return false;
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 5691ac2..aa00634 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -175,10 +175,10 @@
}
}
- private void fire(URIish uri, Project.NameKey project, ImmutableSet<String> refNames) {
+ private void fireFromStorage(URIish uri, Project.NameKey project, ImmutableSet<String> refNames) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
for (Destination dest : destinations.get().getDestinations(uri, project, refNames)) {
- dest.schedule(project, refNames, uri, state);
+ dest.scheduleFromStorage(project, refNames, uri, state);
}
state.markAllPushTasksScheduled();
}
@@ -236,7 +236,7 @@
@Override
public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
try {
- fire(new URIish(u.uri()), Project.nameKey(u.project()), u.refs());
+ fireFromStorage(new URIish(u.uri()), Project.nameKey(u.project()), u.refs());
if (Prune.TRUE.equals(prune)) {
taskNamesByReplicateRefUpdate.remove(u);
}
@@ -250,10 +250,13 @@
@Override
public void onDone() {
- if (Prune.TRUE.equals(prune)) {
- pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values()));
+ try {
+ if (Prune.TRUE.equals(prune)) {
+ pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values()));
+ }
+ } finally {
+ replaying.set(false);
}
- replaying.set(false);
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 3f92983..2e4b619 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -167,11 +167,11 @@
return isMultiPrimary;
}
- public synchronized String create(ReplicateRefUpdate r) {
+ public String create(ReplicateRefUpdate r) {
return new Task(r).create();
}
- public synchronized Set<ImmutableSet<String>> start(UriUpdates uriUpdates) {
+ public Set<ImmutableSet<String>> start(UriUpdates uriUpdates) {
Set<ImmutableSet<String>> startedRefs = new HashSet<>();
for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
Task t = new Task(update);
@@ -182,13 +182,13 @@
return startedRefs;
}
- public synchronized void reset(UriUpdates uriUpdates) {
+ public void reset(UriUpdates uriUpdates) {
for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
new Task(update).reset();
}
}
- public synchronized void recoverAll() {
+ public void recoverAll() {
streamRunning().forEach(r -> new Task(r).recover());
}
@@ -400,7 +400,15 @@
logger.atFine().log("DELETE %s %s", running, updateLog());
Files.delete(running);
} catch (IOException e) {
- logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+ String message = "Error while deleting task %s";
+ if (isMultiPrimary() && e instanceof NoSuchFileException) {
+ logger.atFine().log(
+ message
+ + " (expected after recovery from another node's startup with multi-primaries and distributor enabled)",
+ taskKey);
+ } else {
+ logger.atSevere().withCause(e).log(message, taskKey);
+ }
}
}