Refactor ReplicationQueue to pass around ReferenceUpdatedEvent
Change-Id: I758a0766156e5563998d97f4c52ee10ab5e88b93
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index f5bb513..f5b27ee 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -156,11 +156,7 @@
event.getRefName(),
event.getOldObjectId(),
event.getNewObjectId());
- fire(
- event.getProjectName(),
- ObjectId.fromString(event.getNewObjectId()),
- event.getRefName(),
- event.isDelete());
+ fire(ReferenceUpdatedEvent.from(event));
}
}
@@ -189,24 +185,18 @@
return !refsFilter.match(refName);
}
- private void fire(String projectName, ObjectId objectId, String refName, boolean isDelete) {
+ private void fire(ReferenceUpdatedEvent event) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
- fire(Project.nameKey(projectName), objectId, refName, isDelete, state);
+ fire(event, state);
state.markAllFetchTasksScheduled();
}
- private void fire(
- Project.NameKey project,
- ObjectId objectId,
- String refName,
- boolean isDelete,
- ReplicationState state) {
+ private void fire(ReferenceUpdatedEvent event, ReplicationState state) {
if (!running) {
stateLog.warn(
"Replication plugin did not finish startup before event, event replication is postponed",
state);
- beforeStartupEventsQueue.add(
- ReferenceUpdatedEvent.create(project.get(), refName, objectId, isDelete));
+ beforeStartupEventsQueue.add(event);
return;
}
ForkJoinPool fetchCallsPool = null;
@@ -220,7 +210,12 @@
fetchCallsPool = new ForkJoinPool(numSources);
final Consumer<Source> callFunction =
- callFunction(project, objectId, refName, isDelete, state);
+ callFunction(
+ Project.nameKey(event.projectName()),
+ event.objectId(),
+ event.refName(),
+ event.isDelete(),
+ state);
fetchCallsPool
.submit(() -> allSources.parallelStream().forEach(callFunction))
.get(fetchCallsTimeout, TimeUnit.MILLISECONDS);
@@ -497,7 +492,7 @@
String eventKey = String.format("%s:%s", event.projectName(), event.refName());
if (!eventsReplayed.contains(eventKey)) {
repLog.info("Firing pending task {}", event);
- fire(event.projectName(), event.objectId(), event.refName(), event.isDelete());
+ fire(event);
eventsReplayed.add(eventKey);
}
}
@@ -523,6 +518,14 @@
projectName, refName, objectId, isDelete);
}
+ static ReferenceUpdatedEvent from(GitReferenceUpdatedListener.Event event) {
+ return ReferenceUpdatedEvent.create(
+ event.getProjectName(),
+ event.getRefName(),
+ ObjectId.fromString(event.getNewObjectId()),
+ event.isDelete());
+ }
+
public abstract String projectName();
public abstract String refName();