Persist tasks before scheduling replication
Consolidate the replication firing paths and in doing so improve the
replicate-all path to persist events before scheduling them, thus
reducing the chance the task could be lost on server interruption. This
consolidation also has the side effect of adding replicate-all events to
the "beforeStartupEventsQueue" if they somehow happen to get fired
before replication is started.
Change-Id: Iaadbb89c4e5bc2f0eb751d26059868ba9cb7e9d7
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 3e17167..9b70a6c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -106,49 +106,43 @@
@VisibleForTesting
public void scheduleFullSync(
Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
- if (!running) {
- stateLog.warn("Replication plugin did not finish startup before event", state);
- return;
- }
-
- for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
- if (cfg.wouldPushProject(project)) {
- for (URIish uri : cfg.getURIs(project, urlMatch)) {
- cfg.schedule(project, PushOne.ALL_REFS, uri, state, now);
- replicationTasksStorage.persist(
- new ReplicateRefUpdate(
- project.get(), PushOne.ALL_REFS, uri, cfg.getRemoteConfigName()));
- }
- }
- }
+ fire(project, urlMatch, PushOne.ALL_REFS, state, now);
}
@Override
public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
- onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+ fire(event.getProjectName(), event.getRefName());
}
- private void onGitReferenceUpdated(String projectName, String refName) {
+ private void fire(String projectName, String refName) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+ fire(Project.nameKey(projectName), null, refName, state, false);
+ state.markAllPushTasksScheduled();
+ }
+
+ private void fire(
+ Project.NameKey project,
+ String urlMatch,
+ String refName,
+ ReplicationState state,
+ boolean now) {
if (!running) {
stateLog.warn(
"Replication plugin did not finish startup before event, event replication is postponed",
state);
- beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(projectName, refName));
+ beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(project.get(), refName));
return;
}
- Project.NameKey project = Project.nameKey(projectName);
for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
- for (URIish uri : cfg.getURIs(project, null)) {
+ for (URIish uri : cfg.getURIs(project, urlMatch)) {
replicationTasksStorage.persist(
- new ReplicateRefUpdate(projectName, refName, uri, cfg.getRemoteConfigName()));
- cfg.schedule(project, refName, uri, state);
+ new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+ cfg.schedule(project, refName, uri, state, now);
}
}
}
- state.markAllPushTasksScheduled();
}
private void firePendingEvents() {
@@ -160,7 +154,7 @@
String eventKey = String.format("%s:%s", t.project, t.ref);
if (!eventsReplayed.contains(eventKey)) {
repLog.info("Firing pending task {}", eventKey);
- onGitReferenceUpdated(t.project, t.ref);
+ fire(t.project, t.ref);
eventsReplayed.add(eventKey);
}
}
@@ -189,7 +183,7 @@
String eventKey = String.format("%s:%s", event.projectName(), event.refName());
if (!eventsReplayed.contains(eventKey)) {
repLog.info("Firing pending task {}", event);
- onGitReferenceUpdated(event.projectName(), event.refName());
+ fire(event.projectName(), event.refName());
eventsReplayed.add(eventKey);
}
}