Merge branch 'stable-3.8' into stable-3.10 * stable-3.8: PushOne: Remove unused addRef() method TasksStorage: Remove synchronized from methods Place the replaying flag clearing in a finally Demote delete errors when distributor is enabled distributor: Reduce log level for no-op consolidations Change-Id: I15eb65d0f379a6e562d3c86e1089cfb7d9e6752c Release-Notes: skip
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index 6054a4a..c186493 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -392,8 +392,23 @@ schedule(project, refs, uri, state, false); } + void scheduleFromStorage( + Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state) { + schedule(project, refs, uri, state, false, true); + } + void schedule( Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state, boolean now) { + schedule(project, refs, uri, state, now, false); + } + + void schedule( + Project.NameKey project, + Set<String> refs, + URIish uri, + ReplicationState state, + boolean now, + boolean fromStorage) { ImmutableSet.Builder<String> toSchedule = ImmutableSet.builder(); for (String ref : refs) { if (!shouldReplicate(project, ref, state)) { @@ -445,11 +460,14 @@ "scheduled %s:%s => %s to run %s", project, refsToSchedule, task, now ? "now" : "after " + config.getDelay() + "s"); } else { - task.addRefBatch(refsToSchedule); + boolean added = task.addRefBatch(refsToSchedule); task.addState(refsToSchedule, state); - repLog.atInfo().log( - "consolidated %s:%s => %s with an existing pending push", - project, refsToSchedule, task); + String message = "consolidated %s:%s => %s with an existing pending push"; + if (added || !fromStorage) { + repLog.atInfo().log(message, project, refsToSchedule, task); + } else { + repLog.atFine().log(message, project, refsToSchedule, task); + } } for (String ref : refsToSchedule) { state.increasePushTaskCount(project.get(), ref);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java index 9d152f6..2b60145 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -289,18 +289,20 @@ return uri; } - void addRef(String ref) { - addRefBatch(ImmutableSet.of(ref)); - } - - void addRefBatch(ImmutableSet<String> refBatch) { + /** Returns false if all refs were already included in the push, true otherwise */ + boolean addRefBatch(ImmutableSet<String> refBatch) { if (refBatch.size() == 1 && refBatch.contains(ALL_REFS)) { refBatchesToPush.clear(); + boolean pushAllRefsChanged = !pushAllRefs; pushAllRefs = true; repLog.atFinest().log("Added all refs for replication to %s", uri); - } else if (!pushAllRefs && refBatchesToPush.add(refBatch)) { - repLog.atFinest().log("Added ref %s for replication to %s", refBatch, uri); + return pushAllRefsChanged; } + if (!pushAllRefs && refBatchesToPush.add(refBatch)) { + repLog.atFinest().log("Added ref %s for replication to %s", refBatch, uri); + return true; + } + return false; } @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index 666eb64..267ba4e 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -176,10 +176,10 @@ } } - private void fire(URIish uri, Project.NameKey project, ImmutableSet<String> refNames) { + private void fireFromStorage(URIish uri, Project.NameKey project, ImmutableSet<String> refNames) { ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get())); for (Destination dest : destinations.get().getDestinations(uri, project, refNames)) { - dest.schedule(project, refNames, uri, state); + dest.scheduleFromStorage(project, refNames, uri, state); } state.markAllPushTasksScheduled(); } @@ -237,7 +237,7 @@ @Override public void run(ReplicationTasksStorage.ReplicateRefUpdate u) { try { - fire(new URIish(u.uri()), Project.nameKey(u.project()), u.refs()); + fireFromStorage(new URIish(u.uri()), Project.nameKey(u.project()), u.refs()); if (Prune.TRUE.equals(prune)) { taskNamesByReplicateRefUpdate.remove(u); } @@ -251,10 +251,13 @@ @Override public void onDone() { - if (Prune.TRUE.equals(prune)) { - pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values())); + try { + if (Prune.TRUE.equals(prune)) { + pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values())); + } + } finally { + replaying.set(false); } - replaying.set(false); } @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java index aa711b6..40f5278 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -168,11 +168,11 @@ return isMultiPrimary; } - public synchronized String create(ReplicateRefUpdate r) { + public String create(ReplicateRefUpdate r) { return new Task(r).create(); } - public synchronized Set<ImmutableSet<String>> start(UriUpdates uriUpdates) { + public Set<ImmutableSet<String>> start(UriUpdates uriUpdates) { Set<ImmutableSet<String>> startedRefs = new HashSet<>(); for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) { Task t = new Task(update); @@ -183,13 +183,13 @@ return startedRefs; } - public synchronized void reset(UriUpdates uriUpdates) { + public void reset(UriUpdates uriUpdates) { for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) { new Task(update).reset(); } } - public synchronized void recoverAll() { + public void recoverAll() { streamRunning().forEach(r -> new Task(r).recover()); } @@ -401,7 +401,15 @@ logger.atFine().log("DELETE %s %s", running, updateLog()); Files.delete(running); } catch (IOException e) { - logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey); + String message = "Error while deleting task %s"; + if (isMultiPrimary() && e instanceof NoSuchFileException) { + logger.atFine().log( + message + + " (expected after recovery from another node's startup with multi-primaries and distributor enabled)", + taskKey); + } else { + logger.atSevere().withCause(e).log(message, taskKey); + } } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java index 4670dff..aa29846 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -204,7 +204,7 @@ PushOne pushOne = createPushOne(replicationPushFilter); - pushOne.addRef(PushOne.ALL_REFS); + pushOne.addRefBatch(ImmutableSet.of(PushOne.ALL_REFS)); pushOne.run(); isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS); @@ -226,7 +226,7 @@ PushOne pushOne = createPushOne(replicationPushFilter); - pushOne.addRef(PushOne.ALL_REFS); + pushOne.addRefBatch(ImmutableSet.of(PushOne.ALL_REFS)); pushOne.run(); isCallFinished.await(10, TimeUnit.SECONDS); @@ -353,7 +353,7 @@ when(gitRepositoryManagerMock.openRepository(projectNameKey)) .thenThrow(new RepositoryNotFoundException("not found")); PushOne pushOne = createPushOne(null); - pushOne.addRef(PushOne.ALL_REFS); + pushOne.addRefBatch(ImmutableSet.of(PushOne.ALL_REFS)); pushOne.setToRetry(); pushOne.run(); assertThat(pushOne.isRetrying()).isFalse(); @@ -365,7 +365,7 @@ NEW, "bar", ObjectId.fromString("0000000000000000000000000000000000000001")); localRefs.add(barLocalRef); - pushOne.addRef(PushOne.ALL_REFS); + pushOne.addRefBatch(ImmutableSet.of(PushOne.ALL_REFS)); pushOne.run(); isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);