Add local projects filtering to the message receivers
On stable-3.5 global/local projects filtering is moved from producer to
receiver side. This means that messages are sent for both local and
global projects. To allow rolling upgrades add global/local projects
filtering on receiving side. This ensure that messages produced for
local projects on stable-3.5 node are filtered out on stable-3.4 as well.
Bug: Issue 14907
Change-Id: I95dc0e71c823ff18a962c1d11daedb75c45e01bc
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 7a20c7f..1f3b092 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -58,6 +58,8 @@
protected abstract EventTopic getTopic();
+ protected abstract Boolean shouldConsumeEvent(Event event);
+
public Consumer<Event> getConsumer() {
return this::processRecord;
}
@@ -65,12 +67,13 @@
private void processRecord(Event event) {
String sourceInstanceId = event.instanceId;
- if (Strings.isNullOrEmpty(sourceInstanceId) || instanceId.equals(sourceInstanceId)) {
+ if ((Strings.isNullOrEmpty(sourceInstanceId) || instanceId.equals(sourceInstanceId))
+ || !shouldConsumeEvent(event)) {
if (Strings.isNullOrEmpty(sourceInstanceId)) {
logger.atWarning().log(
String.format(
"Dropping event %s because sourceInstanceId cannot be null", event.toString()));
- } else {
+ } else if (instanceId.equals(sourceInstanceId)) {
logger.atFiner().log(
String.format(
"Dropping event %s produced by our instanceId %s", event.toString(), instanceId));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
index 80f61f6..cdbf220 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
@@ -14,17 +14,23 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
@Singleton
public class BatchIndexEventSubscriber extends AbstractSubcriber {
+ private final ProjectsFilter projectsFilter;
+
@Inject
public BatchIndexEventSubscriber(
IndexEventRouter eventRouter,
@@ -32,12 +38,25 @@
@GerritInstanceId String instanceId,
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics,
- Configuration cfg) {
+ Configuration cfg,
+ ProjectsFilter projectsFilter) {
super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+ this.projectsFilter = projectsFilter;
}
@Override
protected EventTopic getTopic() {
return EventTopic.BATCH_INDEX_TOPIC;
}
+
+ @Override
+ protected Boolean shouldConsumeEvent(Event event) {
+ if (event instanceof ChangeIndexEvent) {
+ return projectsFilter.matches(((ChangeIndexEvent) event).projectName);
+ }
+ if (event instanceof ProjectIndexEvent) {
+ return projectsFilter.matches(((ProjectIndexEvent) event).projectName);
+ }
+ return true;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index 5f57156..d8342b0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -16,6 +16,7 @@
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.Configuration;
@@ -41,4 +42,9 @@
protected EventTopic getTopic() {
return EventTopic.CACHE_TOPIC;
}
+
+ @Override
+ protected Boolean shouldConsumeEvent(Event event) {
+ return true;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index 8809799..274c34f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -14,17 +14,23 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
@Singleton
public class IndexEventSubscriber extends AbstractSubcriber {
+ private final ProjectsFilter projectsFilter;
+
@Inject
public IndexEventSubscriber(
IndexEventRouter eventRouter,
@@ -32,12 +38,25 @@
@GerritInstanceId String instanceId,
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics,
- Configuration cfg) {
+ Configuration cfg,
+ ProjectsFilter projectsFilter) {
super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+ this.projectsFilter = projectsFilter;
}
@Override
protected EventTopic getTopic() {
return EventTopic.INDEX_TOPIC;
}
+
+ @Override
+ protected Boolean shouldConsumeEvent(Event event) {
+ if (event instanceof ChangeIndexEvent) {
+ return projectsFilter.matches(((ChangeIndexEvent) event).projectName);
+ }
+ if (event instanceof ProjectIndexEvent) {
+ return projectsFilter.matches(((ProjectIndexEvent) event).projectName);
+ }
+ return true;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index 239f3ac..069a516 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -14,17 +14,22 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
@Singleton
public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
+ private final ProjectsFilter projectsFilter;
+
@Inject
public ProjectUpdateEventSubscriber(
ProjectListUpdateRouter eventRouter,
@@ -32,12 +37,19 @@
@GerritInstanceId String instanceId,
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics,
- Configuration cfg) {
+ Configuration cfg,
+ ProjectsFilter projectsFilter) {
super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+ this.projectsFilter = projectsFilter;
}
@Override
protected EventTopic getTopic() {
return EventTopic.PROJECT_LIST_TOPIC;
}
+
+ @Override
+ protected Boolean shouldConsumeEvent(Event event) {
+ return projectsFilter.matches(((ProjectListUpdateEvent) event).projectName);
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index 57a3f51..ab64651 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -14,8 +14,11 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectEvent;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.Configuration;
@@ -25,6 +28,8 @@
@Singleton
public class StreamEventSubscriber extends AbstractSubcriber {
+ private final ProjectsFilter projectsFilter;
+
@Inject
public StreamEventSubscriber(
StreamEventRouter eventRouter,
@@ -32,12 +37,22 @@
@GerritInstanceId String instanceId,
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics,
- Configuration cfg) {
+ Configuration cfg,
+ ProjectsFilter projectsFilter) {
super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+ this.projectsFilter = projectsFilter;
}
@Override
protected EventTopic getTopic() {
return EventTopic.STREAM_EVENT_TOPIC;
}
+
+ @Override
+ protected Boolean shouldConsumeEvent(Event event) {
+ if (event instanceof ProjectEvent) {
+ return projectsFilter.matches(((ProjectEvent) event).getProjectNameKey().get());
+ }
+ return true;
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
new file mode 100644
index 0000000..14298ca
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
@@ -0,0 +1,136 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.Configuration.Broker;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+@Ignore
+public abstract class AbstractSubscriberTestBase {
+ protected static final String NODE_INSTANCE_ID = "node-instance-id";
+ protected static final String INSTANCE_ID = "other-node-instance-id";
+ protected static final String PROJECT_NAME = "project-name";
+
+ @Mock protected DroppedEventListener droppedEventListeners;
+ @Mock protected MessageLogger msgLog;
+ @Mock protected SubscriberMetrics subscriberMetrics;
+ @Mock protected Configuration cfg;
+ @Mock protected Broker brokerCfg;
+ @Mock protected ProjectsFilter projectsFilter;
+
+ @SuppressWarnings("rawtypes")
+ protected ForwardedEventRouter eventRouter;
+
+ protected AbstractSubcriber objectUnderTest;
+
+ @Before
+ public void setup() {
+ when(cfg.broker()).thenReturn(brokerCfg);
+ when(brokerCfg.getTopic(any(), any())).thenReturn("test-topic");
+ eventRouter = eventRouter();
+ objectUnderTest = objectUnderTest();
+ }
+
+ @Test
+ public void shouldConsumeEventsWhenNotFilteredByProjectName()
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ for (Event event : events()) {
+ when(projectsFilter.matches(any(String.class))).thenReturn(true);
+ objectUnderTest.getConsumer().accept(event);
+ verifyConsumed(event);
+ }
+ }
+
+ @Test
+ public void shouldSkipEventsWhenFilteredByProjectName()
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ for (Event event : events()) {
+ when(projectsFilter.matches(any(String.class))).thenReturn(false);
+ objectUnderTest.getConsumer().accept(event);
+ verifySkipped(event);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldSkipLocalEvents()
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ for (Event event : events()) {
+ event.instanceId = NODE_INSTANCE_ID;
+ when(projectsFilter.matches(any(String.class))).thenReturn(true);
+
+ objectUnderTest.getConsumer().accept(event);
+
+ verify(projectsFilter, never()).matches(PROJECT_NAME);
+ verify(eventRouter, never()).route(event);
+ verify(droppedEventListeners, times(1)).onEventDropped(event);
+ reset(projectsFilter, eventRouter, droppedEventListeners);
+ }
+ }
+
+ protected abstract AbstractSubcriber objectUnderTest();
+
+ protected abstract List<Event> events();
+
+ @SuppressWarnings("rawtypes")
+ protected abstract ForwardedEventRouter eventRouter();
+
+ @SuppressWarnings("unchecked")
+ protected void verifySkipped(Event event)
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ verify(projectsFilter, times(1)).matches(PROJECT_NAME);
+ verify(eventRouter, never()).route(event);
+ verify(droppedEventListeners, times(1)).onEventDropped(event);
+ reset(projectsFilter, eventRouter, droppedEventListeners);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void verifyConsumed(Event event)
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ verify(projectsFilter, times(1)).matches(PROJECT_NAME);
+ verify(eventRouter, times(1)).route(event);
+ verify(droppedEventListeners, never()).onEventDropped(event);
+ reset(projectsFilter, eventRouter, droppedEventListeners);
+ }
+
+ protected DynamicSet<DroppedEventListener> asDynamicSet(DroppedEventListener listener) {
+ DynamicSet<DroppedEventListener> result = new DynamicSet<>();
+ result.add("multi-site", listener);
+ return result;
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriberTest.java
new file mode 100644
index 0000000..5031f36
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriberTest.java
@@ -0,0 +1,77 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+
+public class BatchIndexEventSubscriberTest extends AbstractSubscriberTestBase {
+ private static final boolean DELETED = false;
+ private static final int CHANGE_ID = 1;
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldConsumeNonProjectAndNonChangeIndexingEventsTypes()
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ IndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
+
+ objectUnderTest.getConsumer().accept(event);
+
+ verify(projectsFilter, never()).matches(PROJECT_NAME);
+ verify(eventRouter, times(1)).route(event);
+ verify(droppedEventListeners, never()).onEventDropped(event);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ protected ForwardedEventRouter eventRouter() {
+ return mock(IndexEventRouter.class);
+ }
+
+ @Override
+ protected List<Event> events() {
+ return ImmutableList.of(
+ new ProjectIndexEvent(PROJECT_NAME, INSTANCE_ID),
+ new ChangeIndexEvent(PROJECT_NAME, CHANGE_ID, DELETED, INSTANCE_ID));
+ }
+
+ @Override
+ protected AbstractSubcriber objectUnderTest() {
+ return new BatchIndexEventSubscriber(
+ (IndexEventRouter) eventRouter,
+ asDynamicSet(droppedEventListeners),
+ NODE_INSTANCE_ID,
+ msgLog,
+ subscriberMetrics,
+ cfg,
+ projectsFilter);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
new file mode 100644
index 0000000..70f811b
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
@@ -0,0 +1,77 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+
+public class IndexEventSubscriberTest extends AbstractSubscriberTestBase {
+ private static final boolean DELETED = false;
+ private static final int CHANGE_ID = 1;
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldConsumeNonProjectAndNonChangeIndexingEventsTypes()
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ IndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
+
+ objectUnderTest.getConsumer().accept(event);
+
+ verify(projectsFilter, never()).matches(PROJECT_NAME);
+ verify(eventRouter, times(1)).route(event);
+ verify(droppedEventListeners, never()).onEventDropped(event);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ protected ForwardedEventRouter eventRouter() {
+ return mock(IndexEventRouter.class);
+ }
+
+ @Override
+ protected List<Event> events() {
+ return ImmutableList.of(
+ new ProjectIndexEvent(PROJECT_NAME, INSTANCE_ID),
+ new ChangeIndexEvent(PROJECT_NAME, CHANGE_ID, DELETED, INSTANCE_ID));
+ }
+
+ @Override
+ protected AbstractSubcriber objectUnderTest() {
+ return new IndexEventSubscriber(
+ (IndexEventRouter) eventRouter,
+ asDynamicSet(droppedEventListeners),
+ NODE_INSTANCE_ID,
+ msgLog,
+ subscriberMetrics,
+ cfg,
+ projectsFilter);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriberTest.java
new file mode 100644
index 0000000..201ee81
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriberTest.java
@@ -0,0 +1,50 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.mock;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
+import java.util.List;
+
+public class ProjectUpdateEventSubscriberTest extends AbstractSubscriberTestBase {
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ protected ForwardedEventRouter eventRouter() {
+ return mock(ProjectListUpdateRouter.class);
+ }
+
+ @Override
+ protected List<Event> events() {
+ return ImmutableList.of(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID));
+ }
+
+ @Override
+ protected AbstractSubcriber objectUnderTest() {
+ return new ProjectUpdateEventSubscriber(
+ (ProjectListUpdateRouter) eventRouter,
+ asDynamicSet(droppedEventListeners),
+ NODE_INSTANCE_ID,
+ msgLog,
+ subscriberMetrics,
+ cfg,
+ projectsFilter);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java
new file mode 100644
index 0000000..a0e97a8
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java
@@ -0,0 +1,93 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+import org.mockito.Mock;
+
+public class StreamEventSubscriberTest extends AbstractSubscriberTestBase {
+ private static final NameKey PROJECT_NAME_KEY = NameKey.parse(PROJECT_NAME);
+ private @Mock RefUpdatedEvent refUpdatedEvent;
+ private @Mock RefReplicationDoneEvent refReplicationDoneEvent;
+ private @Mock ReplicationScheduledEvent replicationScheduledEvent;
+ private @Mock RefReplicatedEvent refReplicatedEvent;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ protected ForwardedEventRouter eventRouter() {
+ return mock(StreamEventRouter.class);
+ }
+
+ @Override
+ protected List<Event> events() {
+ when(refUpdatedEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+ refUpdatedEvent.instanceId = INSTANCE_ID;
+ when(refReplicationDoneEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+ refReplicationDoneEvent.instanceId = INSTANCE_ID;
+ when(replicationScheduledEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+ replicationScheduledEvent.instanceId = INSTANCE_ID;
+ when(refReplicatedEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+ refReplicatedEvent.instanceId = INSTANCE_ID;
+
+ return ImmutableList.of(
+ refUpdatedEvent, refReplicationDoneEvent, replicationScheduledEvent, refReplicatedEvent);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldNotConsumeNonProjectEventTypeEvents()
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ IndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
+
+ objectUnderTest.getConsumer().accept(event);
+
+ verify(projectsFilter, never()).matches(PROJECT_NAME);
+ verify(eventRouter, times(1)).route(event);
+ verify(droppedEventListeners, never()).onEventDropped(event);
+ }
+
+ @Override
+ protected AbstractSubcriber objectUnderTest() {
+ return new StreamEventSubscriber(
+ (StreamEventRouter) eventRouter,
+ asDynamicSet(droppedEventListeners),
+ NODE_INSTANCE_ID,
+ msgLog,
+ subscriberMetrics,
+ cfg,
+ projectsFilter);
+ }
+}