Refactor ReplicationQueue to pass around ReferenceUpdatedEvent Change-Id: I758a0766156e5563998d97f4c52ee10ab5e88b93
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java index f5bb513..f5b27ee 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -156,11 +156,7 @@ event.getRefName(), event.getOldObjectId(), event.getNewObjectId()); - fire( - event.getProjectName(), - ObjectId.fromString(event.getNewObjectId()), - event.getRefName(), - event.isDelete()); + fire(ReferenceUpdatedEvent.from(event)); } } @@ -189,24 +185,18 @@ return !refsFilter.match(refName); } - private void fire(String projectName, ObjectId objectId, String refName, boolean isDelete) { + private void fire(ReferenceUpdatedEvent event) { ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get())); - fire(Project.nameKey(projectName), objectId, refName, isDelete, state); + fire(event, state); state.markAllFetchTasksScheduled(); } - private void fire( - Project.NameKey project, - ObjectId objectId, - String refName, - boolean isDelete, - ReplicationState state) { + private void fire(ReferenceUpdatedEvent event, ReplicationState state) { if (!running) { stateLog.warn( "Replication plugin did not finish startup before event, event replication is postponed", state); - beforeStartupEventsQueue.add( - ReferenceUpdatedEvent.create(project.get(), refName, objectId, isDelete)); + beforeStartupEventsQueue.add(event); return; } ForkJoinPool fetchCallsPool = null; @@ -220,7 +210,12 @@ fetchCallsPool = new ForkJoinPool(numSources); final Consumer<Source> callFunction = - callFunction(project, objectId, refName, isDelete, state); + callFunction( + Project.nameKey(event.projectName()), + event.objectId(), + event.refName(), + event.isDelete(), + state); fetchCallsPool .submit(() -> allSources.parallelStream().forEach(callFunction)) .get(fetchCallsTimeout, TimeUnit.MILLISECONDS); @@ -497,7 +492,7 @@ String eventKey = String.format("%s:%s", event.projectName(), event.refName()); if (!eventsReplayed.contains(eventKey)) { repLog.info("Firing pending task {}", event); - fire(event.projectName(), event.objectId(), event.refName(), event.isDelete()); + fire(event); eventsReplayed.add(eventKey); } } @@ -523,6 +518,14 @@ projectName, refName, objectId, isDelete); } + static ReferenceUpdatedEvent from(GitReferenceUpdatedListener.Event event) { + return ReferenceUpdatedEvent.create( + event.getProjectName(), + event.getRefName(), + ObjectId.fromString(event.getNewObjectId()), + event.isDelete()); + } + public abstract String projectName(); public abstract String refName();