Stream events listener should respect `remote.NAME.projects` param

remote.NAME.projects param specifies which repositories should be
replicated from the remote. Add filtering to the StreamEventListener
to make sure that pull-replication does not trigger fetch operations
for repositories outside of the list.

Bug: Issue 16751
Change-Id: I43de26602ae84a41631f6f8c6052389e959367ff
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
index 2961360..0477b1a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
@@ -27,12 +27,14 @@
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.events.ProjectEvent;
 import com.google.gerrit.server.events.RefUpdatedEvent;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.replication.pull.FetchOne;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
 import com.googlesource.gerrit.plugins.replication.pull.api.DeleteRefCommand;
 import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction;
 import com.googlesource.gerrit.plugins.replication.pull.api.FetchJob;
@@ -53,6 +55,7 @@
 
   private Factory fetchJobFactory;
   private final Provider<PullReplicationApiRequestMetrics> metricsProvider;
+  private final SourcesCollection sources;
 
   @Inject
   public StreamEventListener(
@@ -61,13 +64,15 @@
       ProjectInitializationAction projectInitializationAction,
       WorkQueue workQueue,
       FetchJob.Factory fetchJobFactory,
-      Provider<PullReplicationApiRequestMetrics> metricsProvider) {
+      Provider<PullReplicationApiRequestMetrics> metricsProvider,
+      SourcesCollection sources) {
     this.instanceId = instanceId;
     this.deleteCommand = deleteCommand;
     this.projectInitializationAction = projectInitializationAction;
     this.workQueue = workQueue;
     this.fetchJobFactory = fetchJobFactory;
     this.metricsProvider = metricsProvider;
+    this.sources = sources;
 
     requireNonNull(
         Strings.emptyToNull(this.instanceId), "gerrit.instanceId cannot be null or empty");
@@ -86,7 +91,7 @@
   }
 
   public void fetchRefsForEvent(Event event) throws AuthException, PermissionBackendException {
-    if (instanceId.equals(event.instanceId)) {
+    if (instanceId.equals(event.instanceId) || !shouldReplicateProject(event)) {
       return;
     }
 
@@ -138,6 +143,19 @@
     }
   }
 
+  private boolean shouldReplicateProject(Event event) {
+    if (!(event instanceof ProjectEvent)) {
+      return false;
+    }
+
+    ProjectEvent projectEvent = (ProjectEvent) event;
+    return sources.getAll().stream()
+        .filter(s -> s.getRemoteConfigName().equals(projectEvent.instanceId))
+        .findFirst()
+        .map(s -> s.wouldFetchProject(projectEvent.getProjectNameKey()))
+        .orElse(false);
+  }
+
   private boolean isRefDelete(RefUpdatedEvent event) {
     return ZERO_ID_NAME.equals(event.refUpdate.get().newRev);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
index b87d42c..9312432 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
@@ -21,6 +21,7 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.Lists;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.extensions.restapi.AuthException;
@@ -32,6 +33,8 @@
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.googlesource.gerrit.plugins.replication.pull.FetchOne;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
 import com.googlesource.gerrit.plugins.replication.pull.api.DeleteRefCommand;
 import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.Input;
 import com.googlesource.gerrit.plugins.replication.pull.api.FetchJob;
@@ -65,6 +68,8 @@
   @Mock private DeleteRefCommand deleteRefCommand;
   @Captor ArgumentCaptor<Input> inputCaptor;
   @Mock private PullReplicationApiRequestMetrics metrics;
+  @Mock private SourcesCollection sources;
+  @Mock private Source source;
 
   private StreamEventListener objectUnderTest;
 
@@ -73,6 +78,9 @@
     when(workQueue.getDefaultQueue()).thenReturn(executor);
     when(fetchJobFactory.create(eq(Project.nameKey(TEST_PROJECT)), any(), any()))
         .thenReturn(fetchJob);
+    when(sources.getAll()).thenReturn(Lists.newArrayList(source));
+    when(source.wouldFetchProject(any())).thenReturn(true);
+    when(source.getRemoteConfigName()).thenReturn(REMOTE_INSTANCE_ID);
     objectUnderTest =
         new StreamEventListener(
             INSTANCE_ID,
@@ -80,7 +88,8 @@
             projectInitializationAction,
             workQueue,
             fetchJobFactory,
-            () -> metrics);
+            () -> metrics,
+            sources);
   }
 
   @Test
@@ -90,6 +99,7 @@
     objectUnderTest.onEvent(event);
 
     verify(executor, never()).submit(any(Runnable.class));
+    verify(sources, never()).getAll();
   }
 
   @Test
@@ -109,6 +119,25 @@
   }
 
   @Test
+  public void shouldSkipEventWhenNotOnAllowedProjectsList() {
+    when(source.wouldFetchProject(any())).thenReturn(false);
+
+    RefUpdatedEvent event = new RefUpdatedEvent();
+    RefUpdateAttribute refUpdate = new RefUpdateAttribute();
+    refUpdate.refName = TEST_REF_NAME;
+    refUpdate.project = TEST_PROJECT;
+    refUpdate.oldRev = ObjectId.zeroId().getName();
+    refUpdate.newRev = NEW_REV;
+
+    event.instanceId = REMOTE_INSTANCE_ID;
+    event.refUpdate = () -> refUpdate;
+
+    objectUnderTest.onEvent(event);
+
+    verify(executor, never()).submit(any(Runnable.class));
+  }
+
+  @Test
   public void shouldDeleteRefForRefDeleteEvent() throws IOException, RestApiException {
     RefUpdatedEvent event = new RefUpdatedEvent();
     RefUpdateAttribute refUpdate = new RefUpdateAttribute();