Merge branch 'stable-3.4'

* stable-3.4:
  Always update replication status metrics

Change-Id: I3d367402a13963057e2bffccf8254c00afa34617
diff --git a/e2e-tests/test.sh b/e2e-tests/test.sh
index b16bf03..abdc5c5 100755
--- a/e2e-tests/test.sh
+++ b/e2e-tests/test.sh
@@ -16,7 +16,7 @@
 
 LOCATION="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 LOCAL_ENV="$( cd "${LOCATION}/../setup_local_env" >/dev/null 2>&1 && pwd )"
-GERRIT_BRANCH=stable-3.4
+GERRIT_BRANCH=master
 GERRIT_CI=https://gerrit-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
 LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
 DEF_MULTISITE_LOCATION=${LOCATION}/../../../bazel-bin/plugins/multi-site/multi-site.jar
@@ -232,7 +232,7 @@
   -O $COMMON_PLUGINS/websession-broker.jar || { echo >&2 "Cannot download websession-broker plugin: Check internet connection. Aborting"; exit 1; }
 
 echo "Downloading healthcheck plugin $GERRIT_BRANCH"
-wget $GERRIT_CI/plugin-healthcheck-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/healthcheck/healthcheck.jar \
+wget $GERRIT_CI/plugin-healthcheck-bazel-master/$LAST_BUILD/healthcheck/healthcheck.jar \
   -O $COMMON_PLUGINS/healthcheck.jar || { echo >&2 "Cannot download healthcheck plugin: Check internet connection. Aborting"; exit 1; }
 
 echo "Downloading zookeeper plugin $GERRIT_BRANCH"
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index b91d0d7..bfffbfe 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -9,6 +9,7 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.4.0.4",
-        sha1 = "8d361d863382290e33828116e65698190118d0f1",
+        artifact = "com.gerritforge:events-broker:3.5.0-alpha-202108301155",
+        sha1 = "ef4d94bb4ba1d136cd90ea901776f03a25bcb517",
     )
+
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 81e1c12..8497882 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -57,6 +57,8 @@
   private static final String NUM_STRIPED_LOCKS = "numStripedLocks";
   private static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
 
+  private static final long DEFALULT_STREAM_EVENT_PUBLISH_TIMEOUT = 30000;
+
   private final Supplier<Cache> cache;
   private final Supplier<Event> event;
   private final Supplier<Index> index;
@@ -190,6 +192,17 @@
     }
   }
 
+  private static long getLong(
+      Supplier<Config> cfg, String section, String subSection, String name, long defaultValue) {
+    try {
+      return cfg.get().getLong(section, subSection, name, defaultValue);
+    } catch (IllegalArgumentException e) {
+      log.error("invalid value for {}; using default value {}", name, defaultValue);
+      log.debug("Failed to retrieve long value: {}", e.getMessage(), e);
+      return defaultValue;
+    }
+  }
+
   public static class Projects {
     public static final String SECTION = "projects";
     public static final String PATTERN_KEY = "pattern";
@@ -293,15 +306,28 @@
 
   public static class Broker {
     static final String BROKER_SECTION = "broker";
+    static final String STREAM_EVENT_PUBLISH_TIMEOUT = "streamEventPublishTimeoutMs";
     private final Config cfg;
+    private long streamEventPublishTimeout;
 
     Broker(Supplier<Config> cfgSupplier) {
       cfg = cfgSupplier.get();
+      streamEventPublishTimeout =
+          getLong(
+              cfgSupplier,
+              BROKER_SECTION,
+              null,
+              STREAM_EVENT_PUBLISH_TIMEOUT,
+              DEFALULT_STREAM_EVENT_PUBLISH_TIMEOUT);
     }
 
     public String getTopic(String topicKey, String defValue) {
       return MoreObjects.firstNonNull(cfg.getString(BROKER_SECTION, null, topicKey), defValue);
     }
+
+    public long getStreamEventPublishTimeout() {
+      return streamEventPublishTimeout;
+    }
   }
 
   static boolean getBoolean(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index ad53449..69bd62f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -54,7 +54,7 @@
       install(new CacheModule());
     }
     if (config.event().synchronize()) {
-      install(new EventModule());
+      install(new EventModule(config));
     }
     if (config.index().synchronize()) {
       install(new IndexModule());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index f58efa7..88422e1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -22,11 +22,11 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
@@ -39,17 +39,20 @@
   private final DynamicItem<BrokerApi> apiDelegate;
   private final BrokerMetrics metrics;
   private final MessageLogger msgLog;
+  private final String nodeInstanceId;
 
   @Inject
   public BrokerApiWrapper(
       @BrokerExecutor Executor executor,
       DynamicItem<BrokerApi> apiDelegate,
       BrokerMetrics metrics,
-      MessageLogger msgLog) {
+      MessageLogger msgLog,
+      @GerritInstanceId String instanceId) {
     this.apiDelegate = apiDelegate;
     this.executor = executor;
     this.metrics = metrics;
     this.msgLog = msgLog;
+    this.nodeInstanceId = instanceId;
   }
 
   public boolean sendSync(String topic, Event event) {
@@ -70,7 +73,7 @@
   @Override
   public ListenableFuture<Boolean> send(String topic, Event message) {
     SettableFuture<Boolean> resultFuture = SettableFuture.create();
-    if (Context.isForwardedEvent()) {
+    if (!nodeInstanceId.equals(message.instanceId)) {
       resultFuture.set(true);
       return resultFuture;
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
index 44f8417..6a2c7a5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
@@ -14,7 +14,6 @@
 
 package com.googlesource.gerrit.plugins.multisite.cache;
 
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.gerrit.extensions.events.NewProjectCreatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.events.ProjectEvent;
@@ -33,18 +32,15 @@
 
   private final DynamicSet<ProjectListUpdateForwarder> forwarders;
   private final Executor executor;
-  private final ProjectsFilter projectsFilter;
   private final String instanceId;
 
   @Inject
   public ProjectListUpdateHandler(
       DynamicSet<ProjectListUpdateForwarder> forwarders,
       @CacheExecutor Executor executor,
-      ProjectsFilter filter,
       @GerritInstanceId String instanceId) {
     this.forwarders = forwarders;
     this.executor = executor;
-    this.projectsFilter = filter;
     this.instanceId = instanceId;
   }
 
@@ -61,7 +57,7 @@
   }
 
   private void process(ProjectEvent event, boolean delete) {
-    if (!Context.isForwardedEvent() && projectsFilter.matches(event.getProjectName())) {
+    if (!Context.isForwardedEvent()) {
       executor.execute(
           new ProjectListUpdateTask(
               new ProjectListUpdateEvent(event.getProjectName(), delete, instanceId)));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutor.java
deleted file mode 100644
index 07bbbf7..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutor.java
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.event;
-
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import com.google.inject.BindingAnnotation;
-import java.lang.annotation.Retention;
-
-@Retention(RUNTIME)
-@BindingAnnotation
-@interface EventExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProvider.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProvider.java
deleted file mode 100644
index e4979f9..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.event;
-
-import com.google.gerrit.server.git.WorkQueue;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.ExecutorProvider;
-
-@Singleton
-class EventExecutorProvider extends ExecutorProvider {
-
-  @Inject
-  EventExecutorProvider(WorkQueue workQueue) {
-    super(workQueue, 1, "Forward-Stream-Event");
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java
deleted file mode 100644
index b2efb80..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.event;
-
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventListener;
-import com.google.gerrit.server.events.ProjectEvent;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import java.util.concurrent.Executor;
-
-class EventHandler implements EventListener {
-  private final Executor executor;
-  private final DynamicSet<StreamEventForwarder> forwarders;
-  private final ProjectsFilter projectsFilter;
-
-  @Inject
-  EventHandler(
-      DynamicSet<StreamEventForwarder> forwarders,
-      @EventExecutor Executor executor,
-      ProjectsFilter projectsFilter) {
-    this.forwarders = forwarders;
-    this.executor = executor;
-    this.projectsFilter = projectsFilter;
-  }
-
-  @Override
-  public void onEvent(Event event) {
-    if (!Context.isForwardedEvent() && event instanceof ProjectEvent) {
-      if (projectsFilter.matches(event)) {
-        executor.execute(new EventTask(event));
-      }
-    }
-  }
-
-  class EventTask implements Runnable {
-    private final Event event;
-
-    EventTask(Event event) {
-      this.event = event;
-    }
-
-    @Override
-    public void run() {
-      forwarders.forEach(f -> f.send(event));
-    }
-
-    @Override
-    public String toString() {
-      return String.format("Send event '%s' to target instance", event.type);
-    }
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
index 1c0c644..b055f26 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
@@ -14,19 +14,34 @@
 
 package com.googlesource.gerrit.plugins.multisite.event;
 
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisherConfig;
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisherModule;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.gerrit.server.events.EventListener;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
-import java.util.concurrent.Executor;
 
 public class EventModule extends LifecycleModule {
+  private final Configuration configuration;
+
+  @Inject
+  public EventModule(Configuration configuration) {
+    this.configuration = configuration;
+  }
 
   @Override
   protected void configure() {
-    bind(Executor.class).annotatedWith(EventExecutor.class).toProvider(EventExecutorProvider.class);
-    listener().to(EventExecutorProvider.class);
-    DynamicSet.bind(binder(), EventListener.class).to(EventHandler.class);
     DynamicSet.bind(binder(), EventListener.class).to(ProjectVersionRefUpdate.class);
+
+    bind(StreamEventPublisherConfig.class)
+        .toInstance(
+            new StreamEventPublisherConfig(
+                EventTopic.STREAM_EVENT_TOPIC.topic(configuration),
+                configuration.broker().getStreamEventPublishTimeout()));
+
+    install(new StreamEventPublisherModule());
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java
index dcfd1b0..e2e3954 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java
@@ -51,11 +51,8 @@
    */
   public void dispatch(Event event) throws PermissionBackendException {
     try (ManualRequestContext ctx = oneOffCtx.open()) {
-      Context.setForwardedEvent(true);
       log.debug("dispatching event {}", event.getType());
       dispatcher.get().postEvent(event);
-    } finally {
-      Context.unsetForwardedEvent();
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderModule.java
index ca17004..1d36e23 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderModule.java
@@ -24,6 +24,5 @@
     DynamicSet.setOf(binder(), CacheEvictionForwarder.class);
     DynamicSet.setOf(binder(), IndexEventForwarder.class);
     DynamicSet.setOf(binder(), ProjectListUpdateForwarder.class);
-    DynamicSet.setOf(binder(), StreamEventForwarder.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/StreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/StreamEventForwarder.java
deleted file mode 100644
index 79a9af2..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/StreamEventForwarder.java
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.forwarder;
-
-import com.google.gerrit.server.events.Event;
-
-public interface StreamEventForwarder {
-  /**
-   * Forward a stream event to the other master.
-   *
-   * @param event the event to forward.
-   * @return true if successful, otherwise false.
-   */
-  boolean send(Event event);
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
index 6bd6437..d6c5658 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
@@ -19,7 +19,6 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
 
 public class BrokerForwarderModule extends LifecycleModule {
   @Override
@@ -28,6 +27,5 @@
     DynamicSet.bind(binder(), CacheEvictionForwarder.class).to(BrokerCacheEvictionForwarder.class);
     DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
         .to(BrokerProjectListUpdateForwarder.class);
-    DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
deleted file mode 100644
index 8a020fc..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
-
-import com.google.gerrit.server.events.Event;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-
-@Singleton
-public class BrokerStreamEventForwarder implements StreamEventForwarder {
-  private final BrokerApiWrapper broker;
-  private final Configuration cfg;
-
-  @Inject
-  BrokerStreamEventForwarder(BrokerApiWrapper broker, Configuration cfg) {
-    this.broker = broker;
-    this.cfg = cfg;
-  }
-
-  @Override
-  public boolean send(Event event) {
-    return broker.sendSync(EventTopic.STREAM_EVENT_TOPIC.topic(cfg), event);
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index 4e7a781..3255bcf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -21,7 +21,7 @@
   BATCH_INDEX_TOPIC("GERRIT.EVENT.BATCH.INDEX", "batchIndexEvent"),
   CACHE_TOPIC("GERRIT.EVENT.CACHE", "cacheEvent"),
   PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST", "projectListEvent"),
-  STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM", "streamEvent");
+  STREAM_EVENT_TOPIC("gerrit", "streamEvent");
 
   private final String topic;
   private final String aliasKey;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index 02f1b1c..ee16b07 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -14,7 +14,6 @@
 
 package com.googlesource.gerrit.plugins.multisite.index;
 
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.common.base.Objects;
 import com.google.gerrit.extensions.events.AccountIndexedListener;
 import com.google.gerrit.extensions.events.ChangeIndexedListener;
@@ -47,7 +46,6 @@
   private final DynamicSet<IndexEventForwarder> forwarders;
   private final Set<IndexTask> queuedTasks = Collections.newSetFromMap(new ConcurrentHashMap<>());
   private final ChangeCheckerImpl.Factory changeChecker;
-  private final ProjectsFilter projectsFilter;
   private final GroupChecker groupChecker;
   private final String instanceId;
 
@@ -56,13 +54,11 @@
       @IndexExecutor Executor executor,
       DynamicSet<IndexEventForwarder> forwarders,
       ChangeCheckerImpl.Factory changeChecker,
-      ProjectsFilter projectsFilter,
       GroupChecker groupChecker,
       @GerritInstanceId String instanceId) {
     this.forwarders = forwarders;
     this.executor = executor;
     this.changeChecker = changeChecker;
-    this.projectsFilter = projectsFilter;
     this.groupChecker = groupChecker;
     this.instanceId = instanceId;
   }
@@ -101,7 +97,7 @@
 
   @Override
   public void onProjectIndexed(String projectName) {
-    if (!Context.isForwardedEvent() && projectsFilter.matches(projectName)) {
+    if (!Context.isForwardedEvent()) {
       IndexProjectTask task = new IndexProjectTask(new ProjectIndexEvent(projectName, instanceId));
       if (queuedTasks.add(task)) {
         executor.execute(task);
@@ -110,7 +106,7 @@
   }
 
   private void executeIndexChangeTask(String projectName, int id) {
-    if (!Context.isForwardedEvent() && projectsFilter.matches(projectName)) {
+    if (!Context.isForwardedEvent()) {
       ChangeChecker checker = changeChecker.create(projectName + "~" + id);
 
       try {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
index ea7dada..11ea428 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
@@ -18,12 +18,12 @@
 import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
 
 import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gerrit.server.events.RefUpdatedEvent;
@@ -33,7 +33,6 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import java.io.IOException;
 import java.util.Optional;
 import org.eclipse.jgit.errors.RepositoryNotFoundException;
@@ -58,7 +57,7 @@
   private final GitRepositoryManager gitRepositoryManager;
   private final GitReferenceUpdated gitReferenceUpdated;
   private final ProjectVersionLogger verLogger;
-  private final ProjectsFilter projectsFilter;
+  private final String nodeInstanceId;
 
   protected final SharedRefDatabaseWrapper sharedRefDb;
 
@@ -68,22 +67,20 @@
       SharedRefDatabaseWrapper sharedRefDb,
       GitReferenceUpdated gitReferenceUpdated,
       ProjectVersionLogger verLogger,
-      ProjectsFilter projectsFilter) {
+      @GerritInstanceId String nodeInstanceId) {
     this.gitRepositoryManager = gitRepositoryManager;
     this.sharedRefDb = sharedRefDb;
     this.gitReferenceUpdated = gitReferenceUpdated;
     this.verLogger = verLogger;
-    this.projectsFilter = projectsFilter;
+    this.nodeInstanceId = nodeInstanceId;
   }
 
   @Override
   public void onEvent(Event event) {
     logger.atFine().log("Processing event type: " + event.type);
     // Producer of the Event use RefUpdatedEvent to trigger the version update
-    if (!Context.isForwardedEvent() && event instanceof RefUpdatedEvent) {
-      if (projectsFilter.matches(event)) {
-        updateProducerProjectVersionUpdate((RefUpdatedEvent) event);
-      }
+    if (nodeInstanceId.equals(event.instanceId) && event instanceof RefUpdatedEvent) {
+      updateProducerProjectVersionUpdate((RefUpdatedEvent) event);
     }
   }
 
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index a76545f..e217994 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -58,9 +58,9 @@
 :   Name of the topic to use for publishing indexing events
     Defaults to GERRIT.EVENT.INDEX
 
-```broker.streamEventTopic```
-:   Name of the topic to use for publishing stream events
-    Defaults to GERRIT.EVENT.STREAM
+`broker.streamEventTopic`
+:   Name of the topic to use for publishing all stream events.
+    Default: gerrit
 
 ```broker.cacheEventTopic```
 :   Name of the topic to use for publishing cache eviction events
@@ -70,6 +70,10 @@
 :   Name of the topic to use for publishing cache eviction events
     Defaults to GERRIT.EVENT.PROJECT.LIST
 
+```broker.streamEventPublishTimeoutMs```
+:   The timeout in milliseconds for publishing stream events.
+    Defaults to 30000 (30 seconds).
+
 ```ref-database.enabled```
 :   Enable the use of a shared ref-database
     Defaults: true
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
index 7d1751c..50f55b2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -22,6 +22,7 @@
 
 @RunWith(MockitoJUnitRunner.class)
 public class BrokerApiWrapperTest {
+  private static final String DEFAULT_INSTANCE_ID = "instance-id";
   @Mock private BrokerMetrics brokerMetrics;
   @Mock private BrokerApi brokerApi;
   @Mock Event event;
@@ -32,13 +33,14 @@
 
   @Before
   public void setUp() {
-    event.instanceId = "instance-id";
+    event.instanceId = DEFAULT_INSTANCE_ID;
     objectUnderTest =
         new BrokerApiWrapper(
             MoreExecutors.directExecutor(),
             DynamicItem.itemOf(BrokerApi.class, brokerApi),
             brokerMetrics,
-            msgLog);
+            msgLog,
+            DEFAULT_INSTANCE_ID);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
index c8216bd..69968b5 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
@@ -23,7 +23,6 @@
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gerrit.extensions.events.NewProjectCreatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
@@ -46,14 +45,12 @@
   private ProjectListUpdateHandler handler;
 
   @Mock private ProjectListUpdateForwarder forwarder;
-  @Mock private ProjectsFilter projectsFilter;
 
   @Before
   public void setUp() {
-    when(projectsFilter.matches(any(String.class))).thenReturn(true);
     handler =
         new ProjectListUpdateHandler(
-            asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter, INSTANCE_ID);
+            asDynamicSet(forwarder), MoreExecutors.directExecutor(), INSTANCE_ID);
   }
 
   private DynamicSet<ProjectListUpdateForwarder> asDynamicSet(
@@ -98,7 +95,6 @@
 
   @Test
   public void shouldNotForwardIfFilteredOutByProjectName() throws Exception {
-    when(projectsFilter.matches(any(String.class))).thenReturn(false);
     String projectName = "projectToAdd";
     NewProjectCreatedListener.Event event = mock(NewProjectCreatedListener.Event.class);
     when(event.getProjectName()).thenReturn(projectName);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProviderTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProviderTest.java
deleted file mode 100644
index a025dd0..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProviderTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.event;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.gerrit.server.git.WorkQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class EventExecutorProviderTest {
-  @Mock private ScheduledThreadPoolExecutor executorMock;
-  private EventExecutorProvider eventsExecutorProvider;
-
-  @Before
-  public void setUp() throws Exception {
-    WorkQueue workQueueMock = mock(WorkQueue.class);
-    when(workQueueMock.createQueue(1, "Forward-Stream-Event")).thenReturn(executorMock);
-    eventsExecutorProvider = new EventExecutorProvider(workQueueMock);
-  }
-
-  @Test
-  public void shouldReturnExecutor() throws Exception {
-    assertThat(eventsExecutorProvider.get()).isEqualTo(executorMock);
-  }
-
-  @Test
-  public void testStop() throws Exception {
-    eventsExecutorProvider.start();
-    assertThat(eventsExecutorProvider.get()).isEqualTo(executorMock);
-    eventsExecutorProvider.stop();
-    verify(executorMock).shutdown();
-    assertThat(eventsExecutorProvider.get()).isNull();
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java
deleted file mode 100644
index b9b416d..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.event;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.ProjectEvent;
-import com.google.gerrit.server.events.RefUpdatedEvent;
-import com.googlesource.gerrit.plugins.multisite.event.EventHandler.EventTask;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class EventHandlerTest {
-
-  private EventHandler eventHandler;
-
-  @Mock private StreamEventForwarder forwarder;
-  @Mock private ProjectsFilter projectsFilter;
-
-  @Before
-  public void setUp() {
-    when(projectsFilter.matches(any(ProjectEvent.class))).thenReturn(true);
-    eventHandler =
-        new EventHandler(asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter);
-  }
-
-  private DynamicSet<StreamEventForwarder> asDynamicSet(StreamEventForwarder forwarder) {
-    DynamicSet<StreamEventForwarder> result = new DynamicSet<>();
-    result.add("multi-site", forwarder);
-    return result;
-  }
-
-  @Test
-  public void shouldForwardAnyProjectEvent() throws Exception {
-    ProjectEvent event = mock(ProjectEvent.class);
-    eventHandler.onEvent(event);
-    verify(forwarder).send(event);
-  }
-
-  @Test
-  public void shouldNotForwardNonProjectEvent() throws Exception {
-    eventHandler.onEvent(mock(Event.class));
-    verifyZeroInteractions(forwarder);
-  }
-
-  @Test
-  public void shouldNotForwardIfAlreadyForwardedEvent() throws Exception {
-    Context.setForwardedEvent(true);
-    eventHandler.onEvent(mock(ProjectEvent.class));
-    Context.unsetForwardedEvent();
-    verifyZeroInteractions(forwarder);
-  }
-
-  @Test
-  public void shouldNotForwardIfFilteredOutByProjectName() throws Exception {
-    when(projectsFilter.matches(any(ProjectEvent.class))).thenReturn(false);
-
-    ProjectEvent event = mock(ProjectEvent.class);
-
-    eventHandler.onEvent(event);
-    verify(forwarder, never()).send(event);
-  }
-
-  @Test
-  public void tesEventTaskToString() throws Exception {
-    Event event = new RefUpdatedEvent();
-    EventTask task = eventHandler.new EventTask(event);
-    assertThat(task.toString())
-        .isEqualTo(String.format("Send event '%s' to target instance", event.type));
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java
index 704401b..73ef353 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java
@@ -14,13 +14,9 @@
 
 package com.googlesource.gerrit.plugins.multisite.forwarder;
 
-import static com.google.common.truth.Truth.assertThat;
-import static com.google.gerrit.testing.GerritJUnit.assertThrows;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.google.gerrit.exceptions.StorageException;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventDispatcher;
@@ -33,7 +29,6 @@
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
 
 @RunWith(MockitoJUnitRunner.class)
 public class ForwardedEventHandlerTest {
@@ -56,45 +51,4 @@
     handler.dispatch(event);
     verify(dispatcherMock).postEvent(event);
   }
-
-  @Test
-  public void shouldSetAndUnsetForwardedContext() throws Exception {
-    Event event = new ProjectCreatedEvent();
-    // this doAnswer is to allow to assert that context is set to forwarded
-    // while cache eviction is called.
-    doAnswer(
-            (Answer<Void>)
-                invocation -> {
-                  assertThat(Context.isForwardedEvent()).isTrue();
-                  return null;
-                })
-        .when(dispatcherMock)
-        .postEvent(event);
-
-    assertThat(Context.isForwardedEvent()).isFalse();
-    handler.dispatch(event);
-    assertThat(Context.isForwardedEvent()).isFalse();
-
-    verify(dispatcherMock).postEvent(event);
-  }
-
-  @Test
-  public void shouldSetAndUnsetForwardedContextEvenIfExceptionIsThrown() throws Exception {
-    Event event = new ProjectCreatedEvent();
-    doAnswer(
-            (Answer<Void>)
-                invocation -> {
-                  assertThat(Context.isForwardedEvent()).isTrue();
-                  throw new StorageException("someMessage");
-                })
-        .when(dispatcherMock)
-        .postEvent(event);
-
-    assertThat(Context.isForwardedEvent()).isFalse();
-    StorageException thrown = assertThrows(StorageException.class, () -> handler.dispatch(event));
-    assertThat(thrown).hasMessageThat().isEqualTo("someMessage");
-    assertThat(Context.isForwardedEvent()).isFalse();
-
-    verify(dispatcherMock).postEvent(event);
-  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
deleted file mode 100644
index 3fd13c6..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-// Copyright (C) 2020 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.index;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
-import com.googlesource.gerrit.plugins.multisite.index.IndexEventHandler.IndexProjectTask;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class IndexEventHandlerTest {
-
-  private static final String INSTANCE_ID = "instance-id";
-
-  private IndexEventHandler eventHandler;
-
-  @Mock private ProjectsFilter projectsFilter;
-  @Mock private IndexEventForwarder forwarder;
-  @Mock private ChangeCheckerImpl.Factory changeChecker;
-
-  @Before
-  public void setUp() {
-    eventHandler =
-        new IndexEventHandler(
-            MoreExecutors.directExecutor(),
-            asDynamicSet(forwarder),
-            changeChecker,
-            projectsFilter,
-            new TestGroupChecker(true),
-            INSTANCE_ID);
-  }
-
-  private DynamicSet<IndexEventForwarder> asDynamicSet(IndexEventForwarder forwarder) {
-    DynamicSet<IndexEventForwarder> result = new DynamicSet<>();
-    result.add("multi-site", forwarder);
-    return result;
-  }
-
-  @Test
-  public void shouldNotForwardProjectIndexedIfFilteredOutByProjectName() throws Exception {
-    when(projectsFilter.matches(any(String.class))).thenReturn(false);
-
-    eventHandler.onProjectIndexed("test_project");
-    verify(forwarder, never())
-        .index(any(IndexProjectTask.class), eq(new ProjectIndexEvent("test_project", INSTANCE_ID)));
-  }
-
-  @Test
-  public void shouldNotForwardIndexChangeIfFilteredOutByProjectName() throws Exception {
-    int changeId = 1;
-    when(projectsFilter.matches(any(String.class))).thenReturn(false);
-
-    eventHandler.onChangeIndexed("test_project", changeId);
-    verifyZeroInteractions(changeChecker);
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
index 917c6bf..384db84 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
@@ -24,11 +24,9 @@
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
-import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.RefUpdatedEvent;
 import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
 import com.google.gerrit.server.project.ProjectConfig;
@@ -36,7 +34,6 @@
 import com.google.gerrit.testing.InMemoryTestEnvironment;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.RefFixture;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -49,7 +46,6 @@
 import org.eclipse.jgit.lib.ObjectLoader;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.revwalk.RevCommit;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -60,24 +56,24 @@
 @RunWith(MockitoJUnitRunner.class)
 public class ProjectVersionRefUpdateTest implements RefFixture {
 
+  private static final String DEFAULT_INSTANCE_ID = "instance-id";
+
   @Rule public InMemoryTestEnvironment testEnvironment = new InMemoryTestEnvironment();
 
   @Mock RefUpdatedEvent refUpdatedEvent;
   @Mock SharedRefDatabaseWrapper sharedRefDb;
   @Mock GitReferenceUpdated gitReferenceUpdated;
   @Mock ProjectVersionLogger verLogger;
-  @Mock ProjectsFilter projectsFilter;
 
   @Inject private ProjectConfig.Factory projectConfigFactory;
   @Inject private InMemoryRepositoryManager repoManager;
-
   private TestRepository<InMemoryRepository> repo;
   private ProjectConfig project;
   private RevCommit masterCommit;
 
   @Before
   public void setUp() throws Exception {
-    when(projectsFilter.matches(any(Event.class))).thenReturn(true);
+    refUpdatedEvent.instanceId = DEFAULT_INSTANCE_ID;
     InMemoryRepository inMemoryRepo = repoManager.createRepository(A_TEST_PROJECT_NAME_KEY);
     project = projectConfigFactory.create(A_TEST_PROJECT_NAME_KEY);
     project.load(inMemoryRepo);
@@ -85,14 +81,8 @@
     masterCommit = repo.branch("master").commit().create();
   }
 
-  @After
-  public void tearDown() {
-    Context.unsetForwardedEvent();
-  }
-
   @Test
   public void producerShouldUpdateProjectVersionUponRefUpdatedEvent() throws IOException {
-    Context.setForwardedEvent(false);
     when(sharedRefDb.get(
             A_TEST_PROJECT_NAME_KEY,
             ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF,
@@ -111,7 +101,7 @@
     when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
 
     new ProjectVersionRefUpdate(
-            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, DEFAULT_INSTANCE_ID)
         .onEvent(refUpdatedEvent);
 
     Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -130,8 +120,6 @@
 
   @Test
   public void producerShouldUpdateProjectVersionUponForcedPushRefUpdatedEvent() throws Exception {
-    Context.setForwardedEvent(false);
-
     Thread.sleep(1000L);
     RevCommit masterPlusOneCommit = repo.branch("master").commit().create();
 
@@ -156,7 +144,7 @@
     when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
 
     new ProjectVersionRefUpdate(
-            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, DEFAULT_INSTANCE_ID)
         .onEvent(refUpdatedEvent);
 
     Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -176,7 +164,6 @@
   @Test
   public void producerShouldCreateNewProjectVersionWhenMissingUponRefUpdatedEvent()
       throws IOException {
-    Context.setForwardedEvent(false);
     when(sharedRefDb.get(
             A_TEST_PROJECT_NAME_KEY,
             ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF,
@@ -196,7 +183,7 @@
     when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
 
     new ProjectVersionRefUpdate(
-            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, DEFAULT_INSTANCE_ID)
         .onEvent(refUpdatedEvent);
 
     Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -233,13 +220,27 @@
   private void producerShouldNotUpdateProjectVersionUponMagicRefUpdatedEvent(String magicRefPrefix)
       throws Exception {
     String magicRefName = magicRefPrefix + "/foo";
-    Context.setForwardedEvent(false);
     when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
     when(refUpdatedEvent.getRefName()).thenReturn(magicRefName);
     repo.branch(magicRefName).commit().create();
 
     new ProjectVersionRefUpdate(
-            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, DEFAULT_INSTANCE_ID)
+        .onEvent(refUpdatedEvent);
+
+    Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
+    assertThat(ref).isNull();
+
+    verifyZeroInteractions(verLogger);
+  }
+
+  @Test
+  public void producerShouldNotUpdateProjectVersionUponForwardedRefUpdatedEvent()
+      throws IOException {
+    refUpdatedEvent.instanceId = "instance-id-2";
+
+    new ProjectVersionRefUpdate(
+            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, DEFAULT_INSTANCE_ID)
         .onEvent(refUpdatedEvent);
 
     Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -250,34 +251,11 @@
 
   @Test
   public void shouldNotUpdateProjectVersionWhenProjectDoesntExist() throws IOException {
-    Context.setForwardedEvent(false);
     when(refUpdatedEvent.getProjectNameKey()).thenReturn(Project.nameKey("aNonExistentProject"));
     when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
 
     new ProjectVersionRefUpdate(
-            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
-        .onEvent(refUpdatedEvent);
-
-    Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
-    assertThat(ref).isNull();
-
-    verifyZeroInteractions(verLogger);
-  }
-
-  @Test
-  public void shouldNotUpdateProjectVersionWhenProjectFilteredOut() throws Exception {
-    when(projectsFilter.matches(any(Event.class))).thenReturn(false);
-
-    Context.setForwardedEvent(false);
-
-    Thread.sleep(1000L);
-    repo.branch("master").commit().create();
-
-    Thread.sleep(1000L);
-    repo.branch("master").update(masterCommit);
-
-    new ProjectVersionRefUpdate(
-            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, DEFAULT_INSTANCE_ID)
         .onEvent(refUpdatedEvent);
 
     Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -293,7 +271,7 @@
 
     Optional<Long> version =
         new ProjectVersionRefUpdate(
-                repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+                repoManager, sharedRefDb, gitReferenceUpdated, verLogger, DEFAULT_INSTANCE_ID)
             .getProjectRemoteVersion(A_TEST_PROJECT_NAME);
 
     assertThat(version.isPresent()).isTrue();