Fix NPE issue from pull replication after configuration auto reload

Add missing work queue creation to solve NPE when configuration is
auto reloaded.

Feature: Issue 12729
Change-Id: Ica4df470ae3f9b3c6f9d50f942b1cb438b1defc3
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
index 81b77a3..7be4971 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
@@ -21,6 +21,7 @@
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.replication.ConfigParser;
 import com.googlesource.gerrit.plugins.replication.RemoteConfiguration;
@@ -36,17 +37,20 @@
   private final Source.Factory sourceFactory;
   private volatile List<Source> sources;
   private boolean shuttingDown;
+  private final Provider<ReplicationQueue> replicationQueue;
 
   @Inject
   public SourcesCollection(
       ReplicationConfig replicationConfig,
       ConfigParser configParser,
       Source.Factory sourceFactory,
-      EventBus eventBus)
+      EventBus eventBus,
+      Provider<ReplicationQueue> replicationQueue)
       throws ConfigInvalidException {
     this.sourceFactory = sourceFactory;
     this.sources =
         allSources(sourceFactory, configParser.parseRemotes(replicationConfig.getConfig()));
+    this.replicationQueue = replicationQueue;
     eventBus.register(this);
   }
 
@@ -109,8 +113,12 @@
       logger.atWarning().log("Shutting down: configuration reload ignored");
       return;
     }
-
-    sources = allSources(sourceFactory, sourceConfigurations);
-    logger.atInfo().log("Configuration reloaded: %d sources", getAll().size());
+    try {
+      replicationQueue.get().stop();
+      sources = allSources(sourceFactory, sourceConfigurations);
+      logger.atInfo().log("Configuration reloaded: %d sources", getAll().size());
+    } finally {
+      replicationQueue.get().start();
+    }
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
index fa99a62..cb5af42 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
@@ -122,6 +122,41 @@
   }
 
   @Test
+  public void shouldReplicateNewChangeRefAfterConfigReloaded() throws Exception {
+    testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+
+    // Trigger configuration autoreload
+    AutoReloadConfigDecorator autoReloadConfigDecorator =
+        getInstance(AutoReloadConfigDecorator.class);
+    SourcesCollection sources = getInstance(SourcesCollection.class);
+    remoteConfig.setInt("remote", null, "timeout", 1000);
+    remoteConfig.save();
+    autoReloadConfigDecorator.reload();
+    waitUntil(() -> !sources.getAll().isEmpty() && sources.getAll().get(0).getTimeout() == 1000);
+
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    final String sourceRef = pushResult.getPatchSet().refName();
+    ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
+    GitReferenceUpdatedListener.Event event =
+        new FakeGitReferenceUpdatedEvent(
+            project,
+            sourceRef,
+            ObjectId.zeroId().getName(),
+            sourceCommit.getId().getName(),
+            ReceiveCommand.Type.CREATE);
+    pullReplicationQueue.onGitReferenceUpdated(event);
+
+    try (Repository repo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  @Test
   public void shouldReplicateNewBranch() throws Exception {
     String testProjectName = project + TEST_REPLICATION_SUFFIX;
     createTestProject(testProjectName);