Only publish events generated by current Instance
When in multi-primary setup the event stream contains events from all
primaries, however each primary has an events-rabbitmq plugin and
should only publish events that originates from that primary.
Otherwise the events published to the exchange would be duplicated
by nbr_of_primaries^2.
Bug: Issue 333981067
Change-Id: I5e35605c61bb45079d96d94e7db3eb24c74cf464
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java
index bd6ea22..1915af7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/worker/DefaultEventWorker.java
@@ -15,8 +15,10 @@
package com.googlesource.gerrit.plugins.rabbitmq.worker;
import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.config.GerritInstanceId;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventListener;
+import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.rabbitmq.message.Publisher;
import java.util.Set;
@@ -28,6 +30,12 @@
private final FluentLogger logger = FluentLogger.forEnclosingClass();
private final Set<Publisher> publishers = new CopyOnWriteArraySet<>();
+ private final String instanceId;
+
+ @Inject
+ public DefaultEventWorker(@GerritInstanceId String instanceId) {
+ this.instanceId = instanceId;
+ }
@Override
public void addPublisher(Publisher publisher) {
@@ -51,8 +59,10 @@
@Override
public void onEvent(Event event) {
- for (Publisher publisher : publishers) {
- publisher.publish(event.type, event);
+ if (event.instanceId.equals(instanceId)) {
+ for (Publisher publisher : publishers) {
+ publisher.publish(event.type, event);
+ }
}
}
}