Merge branch 'stable-3.0' into stable-3.1
* stable-3.0:
Add 'Forwarded-BatchIndex-Event' to events skipped from high-availability
Do not forward events from high-availability
Change-Id: Ib8764535ef9c2518d05c9e33fc5228a6ee73bed8
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
index 77c19c1..2990264 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
@@ -20,6 +20,7 @@
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
import java.util.concurrent.Executor;
@@ -45,7 +46,7 @@
}
}
- class CacheEvictionTask implements Runnable {
+ class CacheEvictionTask extends ForwarderTask {
CacheEvictionEvent cacheEvictionEvent;
CacheEvictionTask(CacheEvictionEvent cacheEvictionEvent) {
@@ -54,7 +55,7 @@
@Override
public void run() {
- forwarders.forEach(f -> f.evict(cacheEvictionEvent));
+ forwarders.forEach(f -> f.evict(this, cacheEvictionEvent));
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
index 5723fbe..1060977 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
@@ -21,6 +21,7 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
import java.util.concurrent.Executor;
@@ -57,7 +58,7 @@
}
}
- class ProjectListUpdateTask implements Runnable {
+ class ProjectListUpdateTask extends ForwarderTask {
private final ProjectListUpdateEvent projectListUpdateEvent;
ProjectListUpdateTask(ProjectListUpdateEvent projectListUpdateEvent) {
@@ -66,7 +67,7 @@
@Override
public void run() {
- forwarders.forEach(f -> f.updateProjectList(projectListUpdateEvent));
+ forwarders.forEach(f -> f.updateProjectList(this, projectListUpdateEvent));
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheEvictionForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheEvictionForwarder.java
index 68921e6..2a18759 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheEvictionForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheEvictionForwarder.java
@@ -20,8 +20,9 @@
/**
* Forward a cache eviction event to the other master.
*
+ * @param task that triggered the forwarding of the cache event.
* @param cacheEvictionEvent the details of the cache eviction event.
* @return true if successful, otherwise false.
*/
- boolean evict(CacheEvictionEvent cacheEvictionEvent);
+ boolean evict(ForwarderTask task, CacheEvictionEvent cacheEvictionEvent);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderTask.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderTask.java
new file mode 100644
index 0000000..329b5cb
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderTask.java
@@ -0,0 +1,23 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.forwarder;
+
+public abstract class ForwarderTask implements Runnable {
+ private final Thread callerThread = Thread.currentThread();
+
+ public Thread getCallerThread() {
+ return callerThread;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
index 5c1f444..73cd9e8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
@@ -21,16 +21,18 @@
/**
* Publish an indexing event to the broker using interactive topic.
*
+ * @param task that triggered the forwarding of the index event.
* @param event the details of the index event.
* @return true if successful, otherwise false.
*/
- boolean index(IndexEvent event);
+ boolean index(ForwarderTask task, IndexEvent event);
/**
* Publish an indexing event to the broker using batch topic.
*
+ * @param task that triggered the forwarding of the index event.
* @param event the details of the index event.
* @return true if successful, otherwise false.
*/
- boolean batchIndex(IndexEvent event);
+ boolean batchIndex(ForwarderTask task, IndexEvent event);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ProjectListUpdateForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ProjectListUpdateForwarder.java
index e40ff3f..b370b260d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ProjectListUpdateForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ProjectListUpdateForwarder.java
@@ -21,8 +21,9 @@
/**
* Forward an update the project list cache event to the other master.
*
+ * @param task that triggered the forwarding of the project list event.
* @param projectListUpdateEvent the content of project list update event
* @return true if successful, otherwise false.
*/
- boolean updateProjectList(ProjectListUpdateEvent projectListUpdateEvent);
+ boolean updateProjectList(ForwarderTask task, ProjectListUpdateEvent projectListUpdateEvent);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
index b32e5ae..1e2a48f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
@@ -19,22 +19,21 @@
import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
@Singleton
-public class BrokerCacheEvictionForwarder implements CacheEvictionForwarder {
- private final BrokerApiWrapper broker;
- private final Configuration cfg;
+public class BrokerCacheEvictionForwarder extends BrokerForwarder
+ implements CacheEvictionForwarder {
@Inject
BrokerCacheEvictionForwarder(BrokerApiWrapper broker, Configuration cfg) {
- this.broker = broker;
- this.cfg = cfg;
+ super(broker, cfg);
}
@Override
- public boolean evict(CacheEvictionEvent event) {
- return broker.send(EventTopic.CACHE_TOPIC.topic(cfg), event);
+ public boolean evict(ForwarderTask task, CacheEvictionEvent event) {
+ return send(task, EventTopic.CACHE_TOPIC, event);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
new file mode 100644
index 0000000..3522731
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
@@ -0,0 +1,54 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
+
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
+
+public abstract class BrokerForwarder {
+ private static final CharSequence HIGH_AVAILABILITY_PLUGIN = "/plugins/high-availability/";
+ private static final CharSequence HIGH_AVAILABILITY_FORWARDER = "Forwarded-Index-Event";
+ private static final CharSequence HIGH_AVAILABILITY_BATCH_FORWARDER =
+ "Forwarded-BatchIndex-Event";
+
+ private final BrokerApiWrapper broker;
+ private final Configuration cfg;
+
+ protected BrokerForwarder(BrokerApiWrapper broker, Configuration cfg) {
+ this.broker = broker;
+ this.cfg = cfg;
+ }
+
+ protected boolean currentThreadBelongsToHighAvailabilityPlugin(ForwarderTask task) {
+ String currentThreadName = task.getCallerThread().getName();
+
+ return currentThreadName.contains(HIGH_AVAILABILITY_PLUGIN)
+ || currentThreadName.contains(HIGH_AVAILABILITY_FORWARDER)
+ || currentThreadName.contains(HIGH_AVAILABILITY_BATCH_FORWARDER);
+ }
+
+ protected boolean send(ForwarderTask task, EventTopic eventTopic, MultiSiteEvent event) {
+ // Events generated by the high-availability plugin should be
+ // discarded. Sending them around would cause infinite loops.
+ if (currentThreadBelongsToHighAvailabilityPlugin(task)) {
+ return true;
+ }
+
+ return broker.send(eventTopic.topic(cfg), event);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
index a86c62e..8321b94 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
@@ -17,27 +17,25 @@
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
-public class BrokerIndexEventForwarder implements IndexEventForwarder {
- private final BrokerApiWrapper broker;
- private final Configuration cfg;
+public class BrokerIndexEventForwarder extends BrokerForwarder implements IndexEventForwarder {
@Inject
BrokerIndexEventForwarder(BrokerApiWrapper broker, Configuration cfg) {
- this.broker = broker;
- this.cfg = cfg;
+ super(broker, cfg);
}
@Override
- public boolean index(IndexEvent event) {
- return broker.send(EventTopic.INDEX_TOPIC.topic(cfg), event);
+ public boolean index(ForwarderTask task, IndexEvent event) {
+ return send(task, EventTopic.INDEX_TOPIC, event);
}
@Override
- public boolean batchIndex(IndexEvent event) {
- return broker.send(EventTopic.BATCH_INDEX_TOPIC.topic(cfg), event);
+ public boolean batchIndex(ForwarderTask task, IndexEvent event) {
+ return send(task, EventTopic.BATCH_INDEX_TOPIC, event);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
index 34e0300..dc8139d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
@@ -20,22 +20,21 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
@Singleton
-public class BrokerProjectListUpdateForwarder implements ProjectListUpdateForwarder {
- private final BrokerApiWrapper broker;
- private final Configuration cfg;
+public class BrokerProjectListUpdateForwarder extends BrokerForwarder
+ implements ProjectListUpdateForwarder {
@Inject
BrokerProjectListUpdateForwarder(BrokerApiWrapper broker, Configuration cfg) {
- this.broker = broker;
- this.cfg = cfg;
+ super(broker, cfg);
}
@Override
- public boolean updateProjectList(ProjectListUpdateEvent event) {
- return broker.send(PROJECT_LIST_TOPIC.topic(cfg), event);
+ public boolean updateProjectList(ForwarderTask task, ProjectListUpdateEvent event) {
+ return send(task, PROJECT_LIST_TOPIC, event);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index 649074b..fdc3b28 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -22,6 +22,7 @@
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
@@ -134,7 +135,8 @@
}
}
- abstract class IndexTask implements Runnable {
+ abstract class IndexTask extends ForwarderTask {
+
@Override
public void run() {
queuedTasks.remove(this);
@@ -153,7 +155,7 @@
@Override
public void execute() {
- forwarders.forEach(f -> f.index(changeIndexEvent));
+ forwarders.forEach(f -> f.index(this, changeIndexEvent));
}
@Override
@@ -184,7 +186,7 @@
@Override
public void execute() {
- forwarders.forEach(f -> f.batchIndex(changeIndexEvent));
+ forwarders.forEach(f -> f.batchIndex(this, changeIndexEvent));
}
@Override
@@ -215,7 +217,7 @@
@Override
public void execute() {
- forwarders.forEach(f -> f.index(accountIndexEvent));
+ forwarders.forEach(f -> f.index(this, accountIndexEvent));
}
@Override
@@ -246,7 +248,7 @@
@Override
public void execute() {
- forwarders.forEach(f -> f.index(groupIndexEvent));
+ forwarders.forEach(f -> f.index(this, groupIndexEvent));
}
@Override
@@ -277,7 +279,7 @@
@Override
public void execute() {
- forwarders.forEach(f -> f.index(projectIndexEvent));
+ forwarders.forEach(f -> f.index(this, projectIndexEvent));
}
@Override
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
index bf4b4cf..b86f8af 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
@@ -15,6 +15,8 @@
package com.googlesource.gerrit.plugins.multisite.cache;
import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
@@ -59,7 +61,9 @@
NewProjectCreatedListener.Event event = mock(NewProjectCreatedListener.Event.class);
when(event.getProjectName()).thenReturn(projectName);
handler.onNewProjectCreated(event);
- verify(forwarder).updateProjectList(new ProjectListUpdateEvent(projectName, false));
+ verify(forwarder)
+ .updateProjectList(
+ any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, false)));
}
@Test
@@ -68,7 +72,9 @@
ProjectDeletedListener.Event event = mock(ProjectDeletedListener.Event.class);
when(event.getProjectName()).thenReturn(projectName);
handler.onProjectDeleted(event);
- verify(forwarder).updateProjectList(new ProjectListUpdateEvent(projectName, true));
+ verify(forwarder)
+ .updateProjectList(
+ any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, true)));
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
new file mode 100644
index 0000000..e3d1ae0
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
@@ -0,0 +1,140 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.forwarder;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.eclipse.jgit.lib.Config;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerForwarderTest {
+ private static final String HIGH_AVAILABILITY_PLUGIN = "/plugins/high-availability/";
+ private static final String HIGH_AVAILABILITY_FORWARDED = "Forwarded-Index-Event";
+ private static final String HIGH_AVAILABILITY_BATCH_FORWARDED = "Forwarded-BatchIndex-Event";
+ private static final long TEST_TIMEOUT_SEC = 5L;
+
+ @Mock private BrokerApiWrapper brokerMock;
+
+ private TestBrokerForwarder brokerForwarder;
+
+ private Configuration cfg;
+
+ private EventTopic testTopic;
+
+ private String testTopicName;
+
+ private TestEvent testEvent;
+
+ private ExecutorService executor;
+
+ public class TestBrokerForwarder extends BrokerForwarder {
+
+ TestBrokerForwarder() {
+ super(brokerMock, cfg);
+ }
+
+ public void send(ForwarderTask task, EventTopic eventTopic, TestEvent testEvent) {
+ super.send(task, eventTopic, testEvent);
+ }
+ }
+
+ public class TestEvent extends MultiSiteEvent {
+
+ protected TestEvent() {
+ super("test");
+ }
+ }
+
+ @Before
+ public void setup() {
+ cfg = new Configuration(new Config(), new Config());
+ testTopic = EventTopic.INDEX_TOPIC;
+ testTopicName = testTopic.topic(cfg);
+ testEvent = new TestEvent();
+ brokerForwarder = new TestBrokerForwarder();
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @After
+ public void teardown() {
+ executor.shutdown();
+ }
+
+ @Test
+ public void shouldSendEventToBrokerFromGenericSourceThread() {
+ brokerForwarder.send(newForwarderTask(), testTopic, testEvent);
+ verify(brokerMock).send(eq(testTopicName), eq(testEvent));
+ }
+
+ @Test
+ public void shouldSkipEventFromHighAvailabilityPluginThread() {
+ brokerForwarder.send(newForwarderTask(HIGH_AVAILABILITY_PLUGIN), testTopic, testEvent);
+ verifyZeroInteractions(brokerMock);
+ }
+
+ @Test
+ public void shouldSkipEventFromHighAvailabilityPluginForwardedThread() {
+ brokerForwarder.send(newForwarderTask(HIGH_AVAILABILITY_FORWARDED), testTopic, testEvent);
+
+ verifyZeroInteractions(brokerMock);
+ }
+
+ @Test
+ public void shouldSkipEventFromHighAvailabilityPluginBatchForwardedThread() {
+ brokerForwarder.send(newForwarderTask(HIGH_AVAILABILITY_BATCH_FORWARDED), testTopic, testEvent);
+
+ verifyZeroInteractions(brokerMock);
+ }
+
+ private ForwarderTask newForwarderTask(String threadName) {
+ try {
+ return executor
+ .submit(
+ () -> {
+ Thread.currentThread().setName(threadName);
+ return newForwarderTask();
+ })
+ .get(TEST_TIMEOUT_SEC, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private ForwarderTask newForwarderTask() {
+ return new ForwarderTask() {
+
+ @Override
+ public void run() {}
+ };
+ }
+}