Merge branch 'stable-3.8' into stable-3.10
* stable-3.8:
PushOne: Remove unused addRef() method
TasksStorage: Remove synchronized from methods
Place the replaying flag clearing in a finally
Demote delete errors when distributor is enabled
distributor: Reduce log level for no-op consolidations
Change-Id: I15eb65d0f379a6e562d3c86e1089cfb7d9e6752c
Release-Notes: skip
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 6054a4a..c186493 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -392,8 +392,23 @@
schedule(project, refs, uri, state, false);
}
+ void scheduleFromStorage(
+ Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state) {
+ schedule(project, refs, uri, state, false, true);
+ }
+
void schedule(
Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state, boolean now) {
+ schedule(project, refs, uri, state, now, false);
+ }
+
+ void schedule(
+ Project.NameKey project,
+ Set<String> refs,
+ URIish uri,
+ ReplicationState state,
+ boolean now,
+ boolean fromStorage) {
ImmutableSet.Builder<String> toSchedule = ImmutableSet.builder();
for (String ref : refs) {
if (!shouldReplicate(project, ref, state)) {
@@ -445,11 +460,14 @@
"scheduled %s:%s => %s to run %s",
project, refsToSchedule, task, now ? "now" : "after " + config.getDelay() + "s");
} else {
- task.addRefBatch(refsToSchedule);
+ boolean added = task.addRefBatch(refsToSchedule);
task.addState(refsToSchedule, state);
- repLog.atInfo().log(
- "consolidated %s:%s => %s with an existing pending push",
- project, refsToSchedule, task);
+ String message = "consolidated %s:%s => %s with an existing pending push";
+ if (added || !fromStorage) {
+ repLog.atInfo().log(message, project, refsToSchedule, task);
+ } else {
+ repLog.atFine().log(message, project, refsToSchedule, task);
+ }
}
for (String ref : refsToSchedule) {
state.increasePushTaskCount(project.get(), ref);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 9d152f6..2b60145 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -289,18 +289,20 @@
return uri;
}
- void addRef(String ref) {
- addRefBatch(ImmutableSet.of(ref));
- }
-
- void addRefBatch(ImmutableSet<String> refBatch) {
+ /** Returns false if all refs were already included in the push, true otherwise */
+ boolean addRefBatch(ImmutableSet<String> refBatch) {
if (refBatch.size() == 1 && refBatch.contains(ALL_REFS)) {
refBatchesToPush.clear();
+ boolean pushAllRefsChanged = !pushAllRefs;
pushAllRefs = true;
repLog.atFinest().log("Added all refs for replication to %s", uri);
- } else if (!pushAllRefs && refBatchesToPush.add(refBatch)) {
- repLog.atFinest().log("Added ref %s for replication to %s", refBatch, uri);
+ return pushAllRefsChanged;
}
+ if (!pushAllRefs && refBatchesToPush.add(refBatch)) {
+ repLog.atFinest().log("Added ref %s for replication to %s", refBatch, uri);
+ return true;
+ }
+ return false;
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 666eb64..267ba4e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -176,10 +176,10 @@
}
}
- private void fire(URIish uri, Project.NameKey project, ImmutableSet<String> refNames) {
+ private void fireFromStorage(URIish uri, Project.NameKey project, ImmutableSet<String> refNames) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
for (Destination dest : destinations.get().getDestinations(uri, project, refNames)) {
- dest.schedule(project, refNames, uri, state);
+ dest.scheduleFromStorage(project, refNames, uri, state);
}
state.markAllPushTasksScheduled();
}
@@ -237,7 +237,7 @@
@Override
public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
try {
- fire(new URIish(u.uri()), Project.nameKey(u.project()), u.refs());
+ fireFromStorage(new URIish(u.uri()), Project.nameKey(u.project()), u.refs());
if (Prune.TRUE.equals(prune)) {
taskNamesByReplicateRefUpdate.remove(u);
}
@@ -251,10 +251,13 @@
@Override
public void onDone() {
- if (Prune.TRUE.equals(prune)) {
- pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values()));
+ try {
+ if (Prune.TRUE.equals(prune)) {
+ pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values()));
+ }
+ } finally {
+ replaying.set(false);
}
- replaying.set(false);
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index aa711b6..40f5278 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -168,11 +168,11 @@
return isMultiPrimary;
}
- public synchronized String create(ReplicateRefUpdate r) {
+ public String create(ReplicateRefUpdate r) {
return new Task(r).create();
}
- public synchronized Set<ImmutableSet<String>> start(UriUpdates uriUpdates) {
+ public Set<ImmutableSet<String>> start(UriUpdates uriUpdates) {
Set<ImmutableSet<String>> startedRefs = new HashSet<>();
for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
Task t = new Task(update);
@@ -183,13 +183,13 @@
return startedRefs;
}
- public synchronized void reset(UriUpdates uriUpdates) {
+ public void reset(UriUpdates uriUpdates) {
for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
new Task(update).reset();
}
}
- public synchronized void recoverAll() {
+ public void recoverAll() {
streamRunning().forEach(r -> new Task(r).recover());
}
@@ -401,7 +401,15 @@
logger.atFine().log("DELETE %s %s", running, updateLog());
Files.delete(running);
} catch (IOException e) {
- logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+ String message = "Error while deleting task %s";
+ if (isMultiPrimary() && e instanceof NoSuchFileException) {
+ logger.atFine().log(
+ message
+ + " (expected after recovery from another node's startup with multi-primaries and distributor enabled)",
+ taskKey);
+ } else {
+ logger.atSevere().withCause(e).log(message, taskKey);
+ }
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index 4670dff..aa29846 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -204,7 +204,7 @@
PushOne pushOne = createPushOne(replicationPushFilter);
- pushOne.addRef(PushOne.ALL_REFS);
+ pushOne.addRefBatch(ImmutableSet.of(PushOne.ALL_REFS));
pushOne.run();
isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
@@ -226,7 +226,7 @@
PushOne pushOne = createPushOne(replicationPushFilter);
- pushOne.addRef(PushOne.ALL_REFS);
+ pushOne.addRefBatch(ImmutableSet.of(PushOne.ALL_REFS));
pushOne.run();
isCallFinished.await(10, TimeUnit.SECONDS);
@@ -353,7 +353,7 @@
when(gitRepositoryManagerMock.openRepository(projectNameKey))
.thenThrow(new RepositoryNotFoundException("not found"));
PushOne pushOne = createPushOne(null);
- pushOne.addRef(PushOne.ALL_REFS);
+ pushOne.addRefBatch(ImmutableSet.of(PushOne.ALL_REFS));
pushOne.setToRetry();
pushOne.run();
assertThat(pushOne.isRetrying()).isFalse();
@@ -365,7 +365,7 @@
NEW, "bar", ObjectId.fromString("0000000000000000000000000000000000000001"));
localRefs.add(barLocalRef);
- pushOne.addRef(PushOne.ALL_REFS);
+ pushOne.addRefBatch(ImmutableSet.of(PushOne.ALL_REFS));
pushOne.run();
isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);