Merge branch 'stable-3.0' into stable-3.1

* stable-3.0:
  Add 'Forwarded-BatchIndex-Event' to events skipped from high-availability
  Do not forward events from high-availability

Change-Id: Ib8764535ef9c2518d05c9e33fc5228a6ee73bed8
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
index 77c19c1..2990264 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
@@ -20,6 +20,7 @@
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
 import java.util.concurrent.Executor;
 
@@ -45,7 +46,7 @@
     }
   }
 
-  class CacheEvictionTask implements Runnable {
+  class CacheEvictionTask extends ForwarderTask {
     CacheEvictionEvent cacheEvictionEvent;
 
     CacheEvictionTask(CacheEvictionEvent cacheEvictionEvent) {
@@ -54,7 +55,7 @@
 
     @Override
     public void run() {
-      forwarders.forEach(f -> f.evict(cacheEvictionEvent));
+      forwarders.forEach(f -> f.evict(this, cacheEvictionEvent));
     }
 
     @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
index 5723fbe..1060977 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
@@ -21,6 +21,7 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
 import java.util.concurrent.Executor;
@@ -57,7 +58,7 @@
     }
   }
 
-  class ProjectListUpdateTask implements Runnable {
+  class ProjectListUpdateTask extends ForwarderTask {
     private final ProjectListUpdateEvent projectListUpdateEvent;
 
     ProjectListUpdateTask(ProjectListUpdateEvent projectListUpdateEvent) {
@@ -66,7 +67,7 @@
 
     @Override
     public void run() {
-      forwarders.forEach(f -> f.updateProjectList(projectListUpdateEvent));
+      forwarders.forEach(f -> f.updateProjectList(this, projectListUpdateEvent));
     }
 
     @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheEvictionForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheEvictionForwarder.java
index 68921e6..2a18759 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheEvictionForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheEvictionForwarder.java
@@ -20,8 +20,9 @@
   /**
    * Forward a cache eviction event to the other master.
    *
+   * @param task that triggered the forwarding of the cache event.
    * @param cacheEvictionEvent the details of the cache eviction event.
    * @return true if successful, otherwise false.
    */
-  boolean evict(CacheEvictionEvent cacheEvictionEvent);
+  boolean evict(ForwarderTask task, CacheEvictionEvent cacheEvictionEvent);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderTask.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderTask.java
new file mode 100644
index 0000000..329b5cb
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderTask.java
@@ -0,0 +1,23 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.forwarder;
+
+public abstract class ForwarderTask implements Runnable {
+  private final Thread callerThread = Thread.currentThread();
+
+  public Thread getCallerThread() {
+    return callerThread;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
index 5c1f444..73cd9e8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
@@ -21,16 +21,18 @@
   /**
    * Publish an indexing event to the broker using interactive topic.
    *
+   * @param task that triggered the forwarding of the index event.
    * @param event the details of the index event.
    * @return true if successful, otherwise false.
    */
-  boolean index(IndexEvent event);
+  boolean index(ForwarderTask task, IndexEvent event);
 
   /**
    * Publish an indexing event to the broker using batch topic.
    *
+   * @param task that triggered the forwarding of the index event.
    * @param event the details of the index event.
    * @return true if successful, otherwise false.
    */
-  boolean batchIndex(IndexEvent event);
+  boolean batchIndex(ForwarderTask task, IndexEvent event);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ProjectListUpdateForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ProjectListUpdateForwarder.java
index e40ff3f..b370b260d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ProjectListUpdateForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ProjectListUpdateForwarder.java
@@ -21,8 +21,9 @@
   /**
    * Forward an update the project list cache event to the other master.
    *
+   * @param task that triggered the forwarding of the project list event.
    * @param projectListUpdateEvent the content of project list update event
    * @return true if successful, otherwise false.
    */
-  boolean updateProjectList(ProjectListUpdateEvent projectListUpdateEvent);
+  boolean updateProjectList(ForwarderTask task, ProjectListUpdateEvent projectListUpdateEvent);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
index b32e5ae..1e2a48f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
@@ -19,22 +19,21 @@
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 
 @Singleton
-public class BrokerCacheEvictionForwarder implements CacheEvictionForwarder {
-  private final BrokerApiWrapper broker;
-  private final Configuration cfg;
+public class BrokerCacheEvictionForwarder extends BrokerForwarder
+    implements CacheEvictionForwarder {
 
   @Inject
   BrokerCacheEvictionForwarder(BrokerApiWrapper broker, Configuration cfg) {
-    this.broker = broker;
-    this.cfg = cfg;
+    super(broker, cfg);
   }
 
   @Override
-  public boolean evict(CacheEvictionEvent event) {
-    return broker.send(EventTopic.CACHE_TOPIC.topic(cfg), event);
+  public boolean evict(ForwarderTask task, CacheEvictionEvent event) {
+    return send(task, EventTopic.CACHE_TOPIC, event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
new file mode 100644
index 0000000..3522731
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
@@ -0,0 +1,54 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
+
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
+
+public abstract class BrokerForwarder {
+  private static final CharSequence HIGH_AVAILABILITY_PLUGIN = "/plugins/high-availability/";
+  private static final CharSequence HIGH_AVAILABILITY_FORWARDER = "Forwarded-Index-Event";
+  private static final CharSequence HIGH_AVAILABILITY_BATCH_FORWARDER =
+      "Forwarded-BatchIndex-Event";
+
+  private final BrokerApiWrapper broker;
+  private final Configuration cfg;
+
+  protected BrokerForwarder(BrokerApiWrapper broker, Configuration cfg) {
+    this.broker = broker;
+    this.cfg = cfg;
+  }
+
+  protected boolean currentThreadBelongsToHighAvailabilityPlugin(ForwarderTask task) {
+    String currentThreadName = task.getCallerThread().getName();
+
+    return currentThreadName.contains(HIGH_AVAILABILITY_PLUGIN)
+        || currentThreadName.contains(HIGH_AVAILABILITY_FORWARDER)
+        || currentThreadName.contains(HIGH_AVAILABILITY_BATCH_FORWARDER);
+  }
+
+  protected boolean send(ForwarderTask task, EventTopic eventTopic, MultiSiteEvent event) {
+    // Events generated by the high-availability plugin should be
+    // discarded. Sending them around would cause infinite loops.
+    if (currentThreadBelongsToHighAvailabilityPlugin(task)) {
+      return true;
+    }
+
+    return broker.send(eventTopic.topic(cfg), event);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
index a86c62e..8321b94 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
@@ -17,27 +17,25 @@
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
 import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 
-public class BrokerIndexEventForwarder implements IndexEventForwarder {
-  private final BrokerApiWrapper broker;
-  private final Configuration cfg;
+public class BrokerIndexEventForwarder extends BrokerForwarder implements IndexEventForwarder {
 
   @Inject
   BrokerIndexEventForwarder(BrokerApiWrapper broker, Configuration cfg) {
-    this.broker = broker;
-    this.cfg = cfg;
+    super(broker, cfg);
   }
 
   @Override
-  public boolean index(IndexEvent event) {
-    return broker.send(EventTopic.INDEX_TOPIC.topic(cfg), event);
+  public boolean index(ForwarderTask task, IndexEvent event) {
+    return send(task, EventTopic.INDEX_TOPIC, event);
   }
 
   @Override
-  public boolean batchIndex(IndexEvent event) {
-    return broker.send(EventTopic.BATCH_INDEX_TOPIC.topic(cfg), event);
+  public boolean batchIndex(ForwarderTask task, IndexEvent event) {
+    return send(task, EventTopic.BATCH_INDEX_TOPIC, event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
index 34e0300..dc8139d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
@@ -20,22 +20,21 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
 
 @Singleton
-public class BrokerProjectListUpdateForwarder implements ProjectListUpdateForwarder {
-  private final BrokerApiWrapper broker;
-  private final Configuration cfg;
+public class BrokerProjectListUpdateForwarder extends BrokerForwarder
+    implements ProjectListUpdateForwarder {
 
   @Inject
   BrokerProjectListUpdateForwarder(BrokerApiWrapper broker, Configuration cfg) {
-    this.broker = broker;
-    this.cfg = cfg;
+    super(broker, cfg);
   }
 
   @Override
-  public boolean updateProjectList(ProjectListUpdateEvent event) {
-    return broker.send(PROJECT_LIST_TOPIC.topic(cfg), event);
+  public boolean updateProjectList(ForwarderTask task, ProjectListUpdateEvent event) {
+    return send(task, PROJECT_LIST_TOPIC, event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index 649074b..fdc3b28 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -22,6 +22,7 @@
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
 import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
@@ -134,7 +135,8 @@
     }
   }
 
-  abstract class IndexTask implements Runnable {
+  abstract class IndexTask extends ForwarderTask {
+
     @Override
     public void run() {
       queuedTasks.remove(this);
@@ -153,7 +155,7 @@
 
     @Override
     public void execute() {
-      forwarders.forEach(f -> f.index(changeIndexEvent));
+      forwarders.forEach(f -> f.index(this, changeIndexEvent));
     }
 
     @Override
@@ -184,7 +186,7 @@
 
     @Override
     public void execute() {
-      forwarders.forEach(f -> f.batchIndex(changeIndexEvent));
+      forwarders.forEach(f -> f.batchIndex(this, changeIndexEvent));
     }
 
     @Override
@@ -215,7 +217,7 @@
 
     @Override
     public void execute() {
-      forwarders.forEach(f -> f.index(accountIndexEvent));
+      forwarders.forEach(f -> f.index(this, accountIndexEvent));
     }
 
     @Override
@@ -246,7 +248,7 @@
 
     @Override
     public void execute() {
-      forwarders.forEach(f -> f.index(groupIndexEvent));
+      forwarders.forEach(f -> f.index(this, groupIndexEvent));
     }
 
     @Override
@@ -277,7 +279,7 @@
 
     @Override
     public void execute() {
-      forwarders.forEach(f -> f.index(projectIndexEvent));
+      forwarders.forEach(f -> f.index(this, projectIndexEvent));
     }
 
     @Override
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
index bf4b4cf..b86f8af 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
@@ -15,6 +15,8 @@
 package com.googlesource.gerrit.plugins.multisite.cache;
 
 import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
@@ -59,7 +61,9 @@
     NewProjectCreatedListener.Event event = mock(NewProjectCreatedListener.Event.class);
     when(event.getProjectName()).thenReturn(projectName);
     handler.onNewProjectCreated(event);
-    verify(forwarder).updateProjectList(new ProjectListUpdateEvent(projectName, false));
+    verify(forwarder)
+        .updateProjectList(
+            any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, false)));
   }
 
   @Test
@@ -68,7 +72,9 @@
     ProjectDeletedListener.Event event = mock(ProjectDeletedListener.Event.class);
     when(event.getProjectName()).thenReturn(projectName);
     handler.onProjectDeleted(event);
-    verify(forwarder).updateProjectList(new ProjectListUpdateEvent(projectName, true));
+    verify(forwarder)
+        .updateProjectList(
+            any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, true)));
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
new file mode 100644
index 0000000..e3d1ae0
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
@@ -0,0 +1,140 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.forwarder;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.eclipse.jgit.lib.Config;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerForwarderTest {
+  private static final String HIGH_AVAILABILITY_PLUGIN = "/plugins/high-availability/";
+  private static final String HIGH_AVAILABILITY_FORWARDED = "Forwarded-Index-Event";
+  private static final String HIGH_AVAILABILITY_BATCH_FORWARDED = "Forwarded-BatchIndex-Event";
+  private static final long TEST_TIMEOUT_SEC = 5L;
+
+  @Mock private BrokerApiWrapper brokerMock;
+
+  private TestBrokerForwarder brokerForwarder;
+
+  private Configuration cfg;
+
+  private EventTopic testTopic;
+
+  private String testTopicName;
+
+  private TestEvent testEvent;
+
+  private ExecutorService executor;
+
+  public class TestBrokerForwarder extends BrokerForwarder {
+
+    TestBrokerForwarder() {
+      super(brokerMock, cfg);
+    }
+
+    public void send(ForwarderTask task, EventTopic eventTopic, TestEvent testEvent) {
+      super.send(task, eventTopic, testEvent);
+    }
+  }
+
+  public class TestEvent extends MultiSiteEvent {
+
+    protected TestEvent() {
+      super("test");
+    }
+  }
+
+  @Before
+  public void setup() {
+    cfg = new Configuration(new Config(), new Config());
+    testTopic = EventTopic.INDEX_TOPIC;
+    testTopicName = testTopic.topic(cfg);
+    testEvent = new TestEvent();
+    brokerForwarder = new TestBrokerForwarder();
+    executor = Executors.newSingleThreadExecutor();
+  }
+
+  @After
+  public void teardown() {
+    executor.shutdown();
+  }
+
+  @Test
+  public void shouldSendEventToBrokerFromGenericSourceThread() {
+    brokerForwarder.send(newForwarderTask(), testTopic, testEvent);
+    verify(brokerMock).send(eq(testTopicName), eq(testEvent));
+  }
+
+  @Test
+  public void shouldSkipEventFromHighAvailabilityPluginThread() {
+    brokerForwarder.send(newForwarderTask(HIGH_AVAILABILITY_PLUGIN), testTopic, testEvent);
+    verifyZeroInteractions(brokerMock);
+  }
+
+  @Test
+  public void shouldSkipEventFromHighAvailabilityPluginForwardedThread() {
+    brokerForwarder.send(newForwarderTask(HIGH_AVAILABILITY_FORWARDED), testTopic, testEvent);
+
+    verifyZeroInteractions(brokerMock);
+  }
+
+  @Test
+  public void shouldSkipEventFromHighAvailabilityPluginBatchForwardedThread() {
+    brokerForwarder.send(newForwarderTask(HIGH_AVAILABILITY_BATCH_FORWARDED), testTopic, testEvent);
+
+    verifyZeroInteractions(brokerMock);
+  }
+
+  private ForwarderTask newForwarderTask(String threadName) {
+    try {
+      return executor
+          .submit(
+              () -> {
+                Thread.currentThread().setName(threadName);
+                return newForwarderTask();
+              })
+          .get(TEST_TIMEOUT_SEC, TimeUnit.SECONDS);
+    } catch (InterruptedException | ExecutionException | TimeoutException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ForwarderTask newForwarderTask() {
+    return new ForwarderTask() {
+
+      @Override
+      public void run() {}
+    };
+  }
+}