Merge branch 'stable-3.5' into stable-3.8 * stable-3.5: TasksStorage: Remove synchronized from methods Place the replaying flag clearing in a finally Demote delete errors when distributor is enabled distributor: Reduce log level for no-op consolidations Change-Id: Iaa6d4a59d5cb26879500f2d36852eaa30efc5d7b Release-Notes: skip
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index 04af9db..1f01e2a 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -392,8 +392,23 @@ schedule(project, refs, uri, state, false); } + void scheduleFromStorage( + Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state) { + schedule(project, refs, uri, state, false, true); + } + void schedule( Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state, boolean now) { + schedule(project, refs, uri, state, now, false); + } + + void schedule( + Project.NameKey project, + Set<String> refs, + URIish uri, + ReplicationState state, + boolean now, + boolean fromStorage) { Set<String> refsToSchedule = new HashSet<>(); for (String ref : refs) { if (!shouldReplicate(project, ref, state)) { @@ -443,11 +458,14 @@ "scheduled %s:%s => %s to run %s", project, refsToSchedule, task, now ? "now" : "after " + config.getDelay() + "s"); } else { - addRefs(task, ImmutableSet.copyOf(refsToSchedule)); + boolean added = addRefs(task, ImmutableSet.copyOf(refsToSchedule)); task.addState(refsToSchedule, state); - repLog.atInfo().log( - "consolidated %s:%s => %s with an existing pending push", - project, refsToSchedule, task); + String message = "consolidated %s:%s => %s with an existing pending push"; + if (added || !fromStorage) { + repLog.atInfo().log(message, project, refsToSchedule, task); + } else { + repLog.atFine().log(message, project, refsToSchedule, task); + } } for (String ref : refsToSchedule) { state.increasePushTaskCount(project.get(), ref); @@ -486,9 +504,10 @@ pool.schedule(updateHeadFactory.create(uri, project, newHead), 0, TimeUnit.SECONDS); } - private void addRefs(PushOne e, ImmutableSet<String> refs) { - e.addRefBatch(refs); + private boolean addRefs(PushOne e, ImmutableSet<String> refs) { + boolean added = e.addRefBatch(refs); postReplicationScheduledEvent(e, refs); + return added; } /**
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java index 6feba66..f9947ae 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -287,14 +287,20 @@ return uri; } - void addRefBatch(ImmutableSet<String> refBatch) { + /** Returns false if all refs were already included in the push, true otherwise */ + boolean addRefBatch(ImmutableSet<String> refBatch) { if (refBatch.size() == 1 && refBatch.contains(ALL_REFS)) { refBatchesToPush.clear(); + boolean pushAllRefsChanged = !pushAllRefs; pushAllRefs = true; repLog.atFinest().log("Added all refs for replication to %s", uri); - } else if (!pushAllRefs && refBatchesToPush.add(refBatch)) { - repLog.atFinest().log("Added ref %s for replication to %s", refBatch, uri); + return pushAllRefsChanged; } + if (!pushAllRefs && refBatchesToPush.add(refBatch)) { + repLog.atFinest().log("Added ref %s for replication to %s", refBatch, uri); + return true; + } + return false; } @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index 5691ac2..aa00634 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -175,10 +175,10 @@ } } - private void fire(URIish uri, Project.NameKey project, ImmutableSet<String> refNames) { + private void fireFromStorage(URIish uri, Project.NameKey project, ImmutableSet<String> refNames) { ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get())); for (Destination dest : destinations.get().getDestinations(uri, project, refNames)) { - dest.schedule(project, refNames, uri, state); + dest.scheduleFromStorage(project, refNames, uri, state); } state.markAllPushTasksScheduled(); } @@ -236,7 +236,7 @@ @Override public void run(ReplicationTasksStorage.ReplicateRefUpdate u) { try { - fire(new URIish(u.uri()), Project.nameKey(u.project()), u.refs()); + fireFromStorage(new URIish(u.uri()), Project.nameKey(u.project()), u.refs()); if (Prune.TRUE.equals(prune)) { taskNamesByReplicateRefUpdate.remove(u); } @@ -250,10 +250,13 @@ @Override public void onDone() { - if (Prune.TRUE.equals(prune)) { - pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values())); + try { + if (Prune.TRUE.equals(prune)) { + pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values())); + } + } finally { + replaying.set(false); } - replaying.set(false); } @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java index 3f92983..2e4b619 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -167,11 +167,11 @@ return isMultiPrimary; } - public synchronized String create(ReplicateRefUpdate r) { + public String create(ReplicateRefUpdate r) { return new Task(r).create(); } - public synchronized Set<ImmutableSet<String>> start(UriUpdates uriUpdates) { + public Set<ImmutableSet<String>> start(UriUpdates uriUpdates) { Set<ImmutableSet<String>> startedRefs = new HashSet<>(); for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) { Task t = new Task(update); @@ -182,13 +182,13 @@ return startedRefs; } - public synchronized void reset(UriUpdates uriUpdates) { + public void reset(UriUpdates uriUpdates) { for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) { new Task(update).reset(); } } - public synchronized void recoverAll() { + public void recoverAll() { streamRunning().forEach(r -> new Task(r).recover()); } @@ -400,7 +400,15 @@ logger.atFine().log("DELETE %s %s", running, updateLog()); Files.delete(running); } catch (IOException e) { - logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey); + String message = "Error while deleting task %s"; + if (isMultiPrimary() && e instanceof NoSuchFileException) { + logger.atFine().log( + message + + " (expected after recovery from another node's startup with multi-primaries and distributor enabled)", + taskKey); + } else { + logger.atSevere().withCause(e).log(message, taskKey); + } } }