Add local projects filtering to the message receivers

On stable-3.5 global/local projects filtering is moved from producer to
receiver side. This means that messages are sent for both local and
global projects. To allow rolling upgrades add global/local projects
filtering on receiving side. This ensure that messages produced for
local projects on stable-3.5 node are filtered out on stable-3.4 as well.

Bug: Issue 14907
Change-Id: I95dc0e71c823ff18a962c1d11daedb75c45e01bc
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 7a20c7f..1f3b092 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -58,6 +58,8 @@
 
   protected abstract EventTopic getTopic();
 
+  protected abstract Boolean shouldConsumeEvent(Event event);
+
   public Consumer<Event> getConsumer() {
     return this::processRecord;
   }
@@ -65,12 +67,13 @@
   private void processRecord(Event event) {
     String sourceInstanceId = event.instanceId;
 
-    if (Strings.isNullOrEmpty(sourceInstanceId) || instanceId.equals(sourceInstanceId)) {
+    if ((Strings.isNullOrEmpty(sourceInstanceId) || instanceId.equals(sourceInstanceId))
+        || !shouldConsumeEvent(event)) {
       if (Strings.isNullOrEmpty(sourceInstanceId)) {
         logger.atWarning().log(
             String.format(
                 "Dropping event %s because sourceInstanceId cannot be null", event.toString()));
-      } else {
+      } else if (instanceId.equals(sourceInstanceId)) {
         logger.atFiner().log(
             String.format(
                 "Dropping event %s produced by our instanceId %s", event.toString(), instanceId));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
index 80f61f6..cdbf220 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
@@ -14,17 +14,23 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
 
 @Singleton
 public class BatchIndexEventSubscriber extends AbstractSubcriber {
+  private final ProjectsFilter projectsFilter;
+
   @Inject
   public BatchIndexEventSubscriber(
       IndexEventRouter eventRouter,
@@ -32,12 +38,25 @@
       @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
-      Configuration cfg) {
+      Configuration cfg,
+      ProjectsFilter projectsFilter) {
     super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+    this.projectsFilter = projectsFilter;
   }
 
   @Override
   protected EventTopic getTopic() {
     return EventTopic.BATCH_INDEX_TOPIC;
   }
+
+  @Override
+  protected Boolean shouldConsumeEvent(Event event) {
+    if (event instanceof ChangeIndexEvent) {
+      return projectsFilter.matches(((ChangeIndexEvent) event).projectName);
+    }
+    if (event instanceof ProjectIndexEvent) {
+      return projectsFilter.matches(((ProjectIndexEvent) event).projectName);
+    }
+    return true;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index 5f57156..d8342b0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -16,6 +16,7 @@
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
@@ -41,4 +42,9 @@
   protected EventTopic getTopic() {
     return EventTopic.CACHE_TOPIC;
   }
+
+  @Override
+  protected Boolean shouldConsumeEvent(Event event) {
+    return true;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index 8809799..274c34f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -14,17 +14,23 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
 
 @Singleton
 public class IndexEventSubscriber extends AbstractSubcriber {
+  private final ProjectsFilter projectsFilter;
+
   @Inject
   public IndexEventSubscriber(
       IndexEventRouter eventRouter,
@@ -32,12 +38,25 @@
       @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
-      Configuration cfg) {
+      Configuration cfg,
+      ProjectsFilter projectsFilter) {
     super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+    this.projectsFilter = projectsFilter;
   }
 
   @Override
   protected EventTopic getTopic() {
     return EventTopic.INDEX_TOPIC;
   }
+
+  @Override
+  protected Boolean shouldConsumeEvent(Event event) {
+    if (event instanceof ChangeIndexEvent) {
+      return projectsFilter.matches(((ChangeIndexEvent) event).projectName);
+    }
+    if (event instanceof ProjectIndexEvent) {
+      return projectsFilter.matches(((ProjectIndexEvent) event).projectName);
+    }
+    return true;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index 239f3ac..069a516 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -14,17 +14,22 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
 
 @Singleton
 public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
+  private final ProjectsFilter projectsFilter;
+
   @Inject
   public ProjectUpdateEventSubscriber(
       ProjectListUpdateRouter eventRouter,
@@ -32,12 +37,19 @@
       @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
-      Configuration cfg) {
+      Configuration cfg,
+      ProjectsFilter projectsFilter) {
     super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+    this.projectsFilter = projectsFilter;
   }
 
   @Override
   protected EventTopic getTopic() {
     return EventTopic.PROJECT_LIST_TOPIC;
   }
+
+  @Override
+  protected Boolean shouldConsumeEvent(Event event) {
+    return projectsFilter.matches(((ProjectListUpdateEvent) event).projectName);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index 57a3f51..ab64651 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -14,8 +14,11 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectEvent;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
@@ -25,6 +28,8 @@
 
 @Singleton
 public class StreamEventSubscriber extends AbstractSubcriber {
+  private final ProjectsFilter projectsFilter;
+
   @Inject
   public StreamEventSubscriber(
       StreamEventRouter eventRouter,
@@ -32,12 +37,22 @@
       @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
-      Configuration cfg) {
+      Configuration cfg,
+      ProjectsFilter projectsFilter) {
     super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+    this.projectsFilter = projectsFilter;
   }
 
   @Override
   protected EventTopic getTopic() {
     return EventTopic.STREAM_EVENT_TOPIC;
   }
+
+  @Override
+  protected Boolean shouldConsumeEvent(Event event) {
+    if (event instanceof ProjectEvent) {
+      return projectsFilter.matches(((ProjectEvent) event).getProjectNameKey().get());
+    }
+    return true;
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
new file mode 100644
index 0000000..14298ca
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
@@ -0,0 +1,136 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.Configuration.Broker;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+@Ignore
+public abstract class AbstractSubscriberTestBase {
+  protected static final String NODE_INSTANCE_ID = "node-instance-id";
+  protected static final String INSTANCE_ID = "other-node-instance-id";
+  protected static final String PROJECT_NAME = "project-name";
+
+  @Mock protected DroppedEventListener droppedEventListeners;
+  @Mock protected MessageLogger msgLog;
+  @Mock protected SubscriberMetrics subscriberMetrics;
+  @Mock protected Configuration cfg;
+  @Mock protected Broker brokerCfg;
+  @Mock protected ProjectsFilter projectsFilter;
+
+  @SuppressWarnings("rawtypes")
+  protected ForwardedEventRouter eventRouter;
+
+  protected AbstractSubcriber objectUnderTest;
+
+  @Before
+  public void setup() {
+    when(cfg.broker()).thenReturn(brokerCfg);
+    when(brokerCfg.getTopic(any(), any())).thenReturn("test-topic");
+    eventRouter = eventRouter();
+    objectUnderTest = objectUnderTest();
+  }
+
+  @Test
+  public void shouldConsumeEventsWhenNotFilteredByProjectName()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    for (Event event : events()) {
+      when(projectsFilter.matches(any(String.class))).thenReturn(true);
+      objectUnderTest.getConsumer().accept(event);
+      verifyConsumed(event);
+    }
+  }
+
+  @Test
+  public void shouldSkipEventsWhenFilteredByProjectName()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    for (Event event : events()) {
+      when(projectsFilter.matches(any(String.class))).thenReturn(false);
+      objectUnderTest.getConsumer().accept(event);
+      verifySkipped(event);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldSkipLocalEvents()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    for (Event event : events()) {
+      event.instanceId = NODE_INSTANCE_ID;
+      when(projectsFilter.matches(any(String.class))).thenReturn(true);
+
+      objectUnderTest.getConsumer().accept(event);
+
+      verify(projectsFilter, never()).matches(PROJECT_NAME);
+      verify(eventRouter, never()).route(event);
+      verify(droppedEventListeners, times(1)).onEventDropped(event);
+      reset(projectsFilter, eventRouter, droppedEventListeners);
+    }
+  }
+
+  protected abstract AbstractSubcriber objectUnderTest();
+
+  protected abstract List<Event> events();
+
+  @SuppressWarnings("rawtypes")
+  protected abstract ForwardedEventRouter eventRouter();
+
+  @SuppressWarnings("unchecked")
+  protected void verifySkipped(Event event)
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    verify(projectsFilter, times(1)).matches(PROJECT_NAME);
+    verify(eventRouter, never()).route(event);
+    verify(droppedEventListeners, times(1)).onEventDropped(event);
+    reset(projectsFilter, eventRouter, droppedEventListeners);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected void verifyConsumed(Event event)
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    verify(projectsFilter, times(1)).matches(PROJECT_NAME);
+    verify(eventRouter, times(1)).route(event);
+    verify(droppedEventListeners, never()).onEventDropped(event);
+    reset(projectsFilter, eventRouter, droppedEventListeners);
+  }
+
+  protected DynamicSet<DroppedEventListener> asDynamicSet(DroppedEventListener listener) {
+    DynamicSet<DroppedEventListener> result = new DynamicSet<>();
+    result.add("multi-site", listener);
+    return result;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriberTest.java
new file mode 100644
index 0000000..5031f36
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriberTest.java
@@ -0,0 +1,77 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+
+public class BatchIndexEventSubscriberTest extends AbstractSubscriberTestBase {
+  private static final boolean DELETED = false;
+  private static final int CHANGE_ID = 1;
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldConsumeNonProjectAndNonChangeIndexingEventsTypes()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    IndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
+
+    objectUnderTest.getConsumer().accept(event);
+
+    verify(projectsFilter, never()).matches(PROJECT_NAME);
+    verify(eventRouter, times(1)).route(event);
+    verify(droppedEventListeners, never()).onEventDropped(event);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  protected ForwardedEventRouter eventRouter() {
+    return mock(IndexEventRouter.class);
+  }
+
+  @Override
+  protected List<Event> events() {
+    return ImmutableList.of(
+        new ProjectIndexEvent(PROJECT_NAME, INSTANCE_ID),
+        new ChangeIndexEvent(PROJECT_NAME, CHANGE_ID, DELETED, INSTANCE_ID));
+  }
+
+  @Override
+  protected AbstractSubcriber objectUnderTest() {
+    return new BatchIndexEventSubscriber(
+        (IndexEventRouter) eventRouter,
+        asDynamicSet(droppedEventListeners),
+        NODE_INSTANCE_ID,
+        msgLog,
+        subscriberMetrics,
+        cfg,
+        projectsFilter);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
new file mode 100644
index 0000000..70f811b
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
@@ -0,0 +1,77 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+
+public class IndexEventSubscriberTest extends AbstractSubscriberTestBase {
+  private static final boolean DELETED = false;
+  private static final int CHANGE_ID = 1;
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldConsumeNonProjectAndNonChangeIndexingEventsTypes()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    IndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
+
+    objectUnderTest.getConsumer().accept(event);
+
+    verify(projectsFilter, never()).matches(PROJECT_NAME);
+    verify(eventRouter, times(1)).route(event);
+    verify(droppedEventListeners, never()).onEventDropped(event);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  protected ForwardedEventRouter eventRouter() {
+    return mock(IndexEventRouter.class);
+  }
+
+  @Override
+  protected List<Event> events() {
+    return ImmutableList.of(
+        new ProjectIndexEvent(PROJECT_NAME, INSTANCE_ID),
+        new ChangeIndexEvent(PROJECT_NAME, CHANGE_ID, DELETED, INSTANCE_ID));
+  }
+
+  @Override
+  protected AbstractSubcriber objectUnderTest() {
+    return new IndexEventSubscriber(
+        (IndexEventRouter) eventRouter,
+        asDynamicSet(droppedEventListeners),
+        NODE_INSTANCE_ID,
+        msgLog,
+        subscriberMetrics,
+        cfg,
+        projectsFilter);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriberTest.java
new file mode 100644
index 0000000..201ee81
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriberTest.java
@@ -0,0 +1,50 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.mock;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
+import java.util.List;
+
+public class ProjectUpdateEventSubscriberTest extends AbstractSubscriberTestBase {
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  protected ForwardedEventRouter eventRouter() {
+    return mock(ProjectListUpdateRouter.class);
+  }
+
+  @Override
+  protected List<Event> events() {
+    return ImmutableList.of(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID));
+  }
+
+  @Override
+  protected AbstractSubcriber objectUnderTest() {
+    return new ProjectUpdateEventSubscriber(
+        (ProjectListUpdateRouter) eventRouter,
+        asDynamicSet(droppedEventListeners),
+        NODE_INSTANCE_ID,
+        msgLog,
+        subscriberMetrics,
+        cfg,
+        projectsFilter);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java
new file mode 100644
index 0000000..a0e97a8
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java
@@ -0,0 +1,93 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+import org.mockito.Mock;
+
+public class StreamEventSubscriberTest extends AbstractSubscriberTestBase {
+  private static final NameKey PROJECT_NAME_KEY = NameKey.parse(PROJECT_NAME);
+  private @Mock RefUpdatedEvent refUpdatedEvent;
+  private @Mock RefReplicationDoneEvent refReplicationDoneEvent;
+  private @Mock ReplicationScheduledEvent replicationScheduledEvent;
+  private @Mock RefReplicatedEvent refReplicatedEvent;
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  protected ForwardedEventRouter eventRouter() {
+    return mock(StreamEventRouter.class);
+  }
+
+  @Override
+  protected List<Event> events() {
+    when(refUpdatedEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+    refUpdatedEvent.instanceId = INSTANCE_ID;
+    when(refReplicationDoneEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+    refReplicationDoneEvent.instanceId = INSTANCE_ID;
+    when(replicationScheduledEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+    replicationScheduledEvent.instanceId = INSTANCE_ID;
+    when(refReplicatedEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+    refReplicatedEvent.instanceId = INSTANCE_ID;
+
+    return ImmutableList.of(
+        refUpdatedEvent, refReplicationDoneEvent, replicationScheduledEvent, refReplicatedEvent);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldNotConsumeNonProjectEventTypeEvents()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    IndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
+
+    objectUnderTest.getConsumer().accept(event);
+
+    verify(projectsFilter, never()).matches(PROJECT_NAME);
+    verify(eventRouter, times(1)).route(event);
+    verify(droppedEventListeners, never()).onEventDropped(event);
+  }
+
+  @Override
+  protected AbstractSubcriber objectUnderTest() {
+    return new StreamEventSubscriber(
+        (StreamEventRouter) eventRouter,
+        asDynamicSet(droppedEventListeners),
+        NODE_INSTANCE_ID,
+        msgLog,
+        subscriberMetrics,
+        cfg,
+        projectsFilter);
+  }
+}