Use Event.instanceId instead of forwarded event tagging with ThreadLocal
To avoid infinite events loop forwarded events are marked using
ThreadLocal. Currently all events which extends
com.google.gerrit.server.events.Event class contain gerrit.instanceId
property so tracking events with ThreadLocal in most cases is not
needed anymore because the event instanceId can always be compared
against the node instanceId. This allows to skip sending events which
are not produced by the current node. Also for project version ref
update event we can avoid consuming events created by other nodes.
Bug: Issue 14660
Change-Id: Ie455cf8d5c78476c0740087e1587c1dfc7b6a830
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index f58efa7..88422e1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -22,11 +22,11 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.config.GerritInstanceId;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
@@ -39,17 +39,20 @@
private final DynamicItem<BrokerApi> apiDelegate;
private final BrokerMetrics metrics;
private final MessageLogger msgLog;
+ private final String nodeInstanceId;
@Inject
public BrokerApiWrapper(
@BrokerExecutor Executor executor,
DynamicItem<BrokerApi> apiDelegate,
BrokerMetrics metrics,
- MessageLogger msgLog) {
+ MessageLogger msgLog,
+ @GerritInstanceId String instanceId) {
this.apiDelegate = apiDelegate;
this.executor = executor;
this.metrics = metrics;
this.msgLog = msgLog;
+ this.nodeInstanceId = instanceId;
}
public boolean sendSync(String topic, Event event) {
@@ -70,7 +73,7 @@
@Override
public ListenableFuture<Boolean> send(String topic, Event message) {
SettableFuture<Boolean> resultFuture = SettableFuture.create();
- if (Context.isForwardedEvent()) {
+ if (!nodeInstanceId.equals(message.instanceId)) {
resultFuture.set(true);
return resultFuture;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java
index b2efb80..7cb853a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java
@@ -16,11 +16,11 @@
import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventListener;
import com.google.gerrit.server.events.ProjectEvent;
import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
import java.util.concurrent.Executor;
@@ -28,20 +28,24 @@
private final Executor executor;
private final DynamicSet<StreamEventForwarder> forwarders;
private final ProjectsFilter projectsFilter;
+ private final String nodeInstanceId;
@Inject
EventHandler(
DynamicSet<StreamEventForwarder> forwarders,
@EventExecutor Executor executor,
- ProjectsFilter projectsFilter) {
+ ProjectsFilter projectsFilter,
+ @GerritInstanceId String nodeInstanceId) {
this.forwarders = forwarders;
this.executor = executor;
this.projectsFilter = projectsFilter;
+ this.nodeInstanceId = nodeInstanceId;
}
@Override
public void onEvent(Event event) {
- if (!Context.isForwardedEvent() && event instanceof ProjectEvent) {
+
+ if (nodeInstanceId.equals(event.instanceId) && event instanceof ProjectEvent) {
if (projectsFilter.matches(event)) {
executor.execute(new EventTask(event));
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java
index dcfd1b0..e2e3954 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java
@@ -51,11 +51,8 @@
*/
public void dispatch(Event event) throws PermissionBackendException {
try (ManualRequestContext ctx = oneOffCtx.open()) {
- Context.setForwardedEvent(true);
log.debug("dispatching event {}", event.getType());
dispatcher.get().postEvent(event);
- } finally {
- Context.unsetForwardedEvent();
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
index ea7dada..a23367b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
@@ -24,6 +24,7 @@
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.server.config.GerritInstanceId;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventListener;
import com.google.gerrit.server.events.RefUpdatedEvent;
@@ -33,7 +34,6 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import java.io.IOException;
import java.util.Optional;
import org.eclipse.jgit.errors.RepositoryNotFoundException;
@@ -59,6 +59,7 @@
private final GitReferenceUpdated gitReferenceUpdated;
private final ProjectVersionLogger verLogger;
private final ProjectsFilter projectsFilter;
+ private final String nodeInstanceId;
protected final SharedRefDatabaseWrapper sharedRefDb;
@@ -68,19 +69,21 @@
SharedRefDatabaseWrapper sharedRefDb,
GitReferenceUpdated gitReferenceUpdated,
ProjectVersionLogger verLogger,
- ProjectsFilter projectsFilter) {
+ ProjectsFilter projectsFilter,
+ @GerritInstanceId String nodeInstanceId) {
this.gitRepositoryManager = gitRepositoryManager;
this.sharedRefDb = sharedRefDb;
this.gitReferenceUpdated = gitReferenceUpdated;
this.verLogger = verLogger;
this.projectsFilter = projectsFilter;
+ this.nodeInstanceId = nodeInstanceId;
}
@Override
public void onEvent(Event event) {
logger.atFine().log("Processing event type: " + event.type);
// Producer of the Event use RefUpdatedEvent to trigger the version update
- if (!Context.isForwardedEvent() && event instanceof RefUpdatedEvent) {
+ if (nodeInstanceId.equals(event.instanceId) && event instanceof RefUpdatedEvent) {
if (projectsFilter.matches(event)) {
updateProducerProjectVersionUpdate((RefUpdatedEvent) event);
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
index 7d1751c..50f55b2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -22,6 +22,7 @@
@RunWith(MockitoJUnitRunner.class)
public class BrokerApiWrapperTest {
+ private static final String DEFAULT_INSTANCE_ID = "instance-id";
@Mock private BrokerMetrics brokerMetrics;
@Mock private BrokerApi brokerApi;
@Mock Event event;
@@ -32,13 +33,14 @@
@Before
public void setUp() {
- event.instanceId = "instance-id";
+ event.instanceId = DEFAULT_INSTANCE_ID;
objectUnderTest =
new BrokerApiWrapper(
MoreExecutors.directExecutor(),
DynamicItem.itemOf(BrokerApi.class, brokerApi),
brokerMetrics,
- msgLog);
+ msgLog,
+ DEFAULT_INSTANCE_ID);
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java
index b9b416d..2d73bf2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java
@@ -29,7 +29,6 @@
import com.google.gerrit.server.events.ProjectEvent;
import com.google.gerrit.server.events.RefUpdatedEvent;
import com.googlesource.gerrit.plugins.multisite.event.EventHandler.EventTask;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
import org.junit.Before;
import org.junit.Test;
@@ -39,6 +38,7 @@
@RunWith(MockitoJUnitRunner.class)
public class EventHandlerTest {
+ private static final String DEFAULT_INSTANCE_ID = "instance-id";
private EventHandler eventHandler;
@@ -49,7 +49,11 @@
public void setUp() {
when(projectsFilter.matches(any(ProjectEvent.class))).thenReturn(true);
eventHandler =
- new EventHandler(asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter);
+ new EventHandler(
+ asDynamicSet(forwarder),
+ MoreExecutors.directExecutor(),
+ projectsFilter,
+ DEFAULT_INSTANCE_ID);
}
private DynamicSet<StreamEventForwarder> asDynamicSet(StreamEventForwarder forwarder) {
@@ -61,6 +65,7 @@
@Test
public void shouldForwardAnyProjectEvent() throws Exception {
ProjectEvent event = mock(ProjectEvent.class);
+ event.instanceId = DEFAULT_INSTANCE_ID;
eventHandler.onEvent(event);
verify(forwarder).send(event);
}
@@ -73,9 +78,9 @@
@Test
public void shouldNotForwardIfAlreadyForwardedEvent() throws Exception {
- Context.setForwardedEvent(true);
- eventHandler.onEvent(mock(ProjectEvent.class));
- Context.unsetForwardedEvent();
+ Event event = mock(ProjectEvent.class);
+ event.instanceId = "instance-id-2";
+ eventHandler.onEvent(event);
verifyZeroInteractions(forwarder);
}
@@ -84,7 +89,7 @@
when(projectsFilter.matches(any(ProjectEvent.class))).thenReturn(false);
ProjectEvent event = mock(ProjectEvent.class);
-
+ event.instanceId = DEFAULT_INSTANCE_ID;
eventHandler.onEvent(event);
verify(forwarder, never()).send(event);
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java
index 704401b..73ef353 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java
@@ -14,13 +14,9 @@
package com.googlesource.gerrit.plugins.multisite.forwarder;
-import static com.google.common.truth.Truth.assertThat;
-import static com.google.gerrit.testing.GerritJUnit.assertThrows;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.google.gerrit.exceptions.StorageException;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventDispatcher;
@@ -33,7 +29,6 @@
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
@RunWith(MockitoJUnitRunner.class)
public class ForwardedEventHandlerTest {
@@ -56,45 +51,4 @@
handler.dispatch(event);
verify(dispatcherMock).postEvent(event);
}
-
- @Test
- public void shouldSetAndUnsetForwardedContext() throws Exception {
- Event event = new ProjectCreatedEvent();
- // this doAnswer is to allow to assert that context is set to forwarded
- // while cache eviction is called.
- doAnswer(
- (Answer<Void>)
- invocation -> {
- assertThat(Context.isForwardedEvent()).isTrue();
- return null;
- })
- .when(dispatcherMock)
- .postEvent(event);
-
- assertThat(Context.isForwardedEvent()).isFalse();
- handler.dispatch(event);
- assertThat(Context.isForwardedEvent()).isFalse();
-
- verify(dispatcherMock).postEvent(event);
- }
-
- @Test
- public void shouldSetAndUnsetForwardedContextEvenIfExceptionIsThrown() throws Exception {
- Event event = new ProjectCreatedEvent();
- doAnswer(
- (Answer<Void>)
- invocation -> {
- assertThat(Context.isForwardedEvent()).isTrue();
- throw new StorageException("someMessage");
- })
- .when(dispatcherMock)
- .postEvent(event);
-
- assertThat(Context.isForwardedEvent()).isFalse();
- StorageException thrown = assertThrows(StorageException.class, () -> handler.dispatch(event));
- assertThat(thrown).hasMessageThat().isEqualTo("someMessage");
- assertThat(Context.isForwardedEvent()).isFalse();
-
- verify(dispatcherMock).postEvent(event);
- }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
index 917c6bf..d1697de 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
@@ -36,7 +36,6 @@
import com.google.gerrit.testing.InMemoryTestEnvironment;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.RefFixture;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@@ -49,7 +48,6 @@
import org.eclipse.jgit.lib.ObjectLoader;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.revwalk.RevCommit;
-import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -60,6 +58,8 @@
@RunWith(MockitoJUnitRunner.class)
public class ProjectVersionRefUpdateTest implements RefFixture {
+ private static final String DEFAULT_INSTANCE_ID = "instance-id";
+
@Rule public InMemoryTestEnvironment testEnvironment = new InMemoryTestEnvironment();
@Mock RefUpdatedEvent refUpdatedEvent;
@@ -70,7 +70,6 @@
@Inject private ProjectConfig.Factory projectConfigFactory;
@Inject private InMemoryRepositoryManager repoManager;
-
private TestRepository<InMemoryRepository> repo;
private ProjectConfig project;
private RevCommit masterCommit;
@@ -78,6 +77,7 @@
@Before
public void setUp() throws Exception {
when(projectsFilter.matches(any(Event.class))).thenReturn(true);
+ refUpdatedEvent.instanceId = DEFAULT_INSTANCE_ID;
InMemoryRepository inMemoryRepo = repoManager.createRepository(A_TEST_PROJECT_NAME_KEY);
project = projectConfigFactory.create(A_TEST_PROJECT_NAME_KEY);
project.load(inMemoryRepo);
@@ -85,14 +85,8 @@
masterCommit = repo.branch("master").commit().create();
}
- @After
- public void tearDown() {
- Context.unsetForwardedEvent();
- }
-
@Test
public void producerShouldUpdateProjectVersionUponRefUpdatedEvent() throws IOException {
- Context.setForwardedEvent(false);
when(sharedRefDb.get(
A_TEST_PROJECT_NAME_KEY,
ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF,
@@ -111,7 +105,12 @@
when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
new ProjectVersionRefUpdate(
- repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+ repoManager,
+ sharedRefDb,
+ gitReferenceUpdated,
+ verLogger,
+ projectsFilter,
+ DEFAULT_INSTANCE_ID)
.onEvent(refUpdatedEvent);
Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -130,8 +129,6 @@
@Test
public void producerShouldUpdateProjectVersionUponForcedPushRefUpdatedEvent() throws Exception {
- Context.setForwardedEvent(false);
-
Thread.sleep(1000L);
RevCommit masterPlusOneCommit = repo.branch("master").commit().create();
@@ -156,7 +153,12 @@
when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
new ProjectVersionRefUpdate(
- repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+ repoManager,
+ sharedRefDb,
+ gitReferenceUpdated,
+ verLogger,
+ projectsFilter,
+ DEFAULT_INSTANCE_ID)
.onEvent(refUpdatedEvent);
Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -176,7 +178,6 @@
@Test
public void producerShouldCreateNewProjectVersionWhenMissingUponRefUpdatedEvent()
throws IOException {
- Context.setForwardedEvent(false);
when(sharedRefDb.get(
A_TEST_PROJECT_NAME_KEY,
ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF,
@@ -196,7 +197,12 @@
when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
new ProjectVersionRefUpdate(
- repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+ repoManager,
+ sharedRefDb,
+ gitReferenceUpdated,
+ verLogger,
+ projectsFilter,
+ DEFAULT_INSTANCE_ID)
.onEvent(refUpdatedEvent);
Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -233,13 +239,37 @@
private void producerShouldNotUpdateProjectVersionUponMagicRefUpdatedEvent(String magicRefPrefix)
throws Exception {
String magicRefName = magicRefPrefix + "/foo";
- Context.setForwardedEvent(false);
when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
when(refUpdatedEvent.getRefName()).thenReturn(magicRefName);
repo.branch(magicRefName).commit().create();
new ProjectVersionRefUpdate(
- repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+ repoManager,
+ sharedRefDb,
+ gitReferenceUpdated,
+ verLogger,
+ projectsFilter,
+ DEFAULT_INSTANCE_ID)
+ .onEvent(refUpdatedEvent);
+
+ Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
+ assertThat(ref).isNull();
+
+ verifyZeroInteractions(verLogger);
+ }
+
+ @Test
+ public void producerShouldNotUpdateProjectVersionUponForwardedRefUpdatedEvent()
+ throws IOException {
+ refUpdatedEvent.instanceId = "instance-id-2";
+
+ new ProjectVersionRefUpdate(
+ repoManager,
+ sharedRefDb,
+ gitReferenceUpdated,
+ verLogger,
+ projectsFilter,
+ DEFAULT_INSTANCE_ID)
.onEvent(refUpdatedEvent);
Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -250,12 +280,16 @@
@Test
public void shouldNotUpdateProjectVersionWhenProjectDoesntExist() throws IOException {
- Context.setForwardedEvent(false);
when(refUpdatedEvent.getProjectNameKey()).thenReturn(Project.nameKey("aNonExistentProject"));
when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
new ProjectVersionRefUpdate(
- repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+ repoManager,
+ sharedRefDb,
+ gitReferenceUpdated,
+ verLogger,
+ projectsFilter,
+ DEFAULT_INSTANCE_ID)
.onEvent(refUpdatedEvent);
Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -268,8 +302,6 @@
public void shouldNotUpdateProjectVersionWhenProjectFilteredOut() throws Exception {
when(projectsFilter.matches(any(Event.class))).thenReturn(false);
- Context.setForwardedEvent(false);
-
Thread.sleep(1000L);
repo.branch("master").commit().create();
@@ -277,7 +309,12 @@
repo.branch("master").update(masterCommit);
new ProjectVersionRefUpdate(
- repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+ repoManager,
+ sharedRefDb,
+ gitReferenceUpdated,
+ verLogger,
+ projectsFilter,
+ DEFAULT_INSTANCE_ID)
.onEvent(refUpdatedEvent);
Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -293,7 +330,12 @@
Optional<Long> version =
new ProjectVersionRefUpdate(
- repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+ repoManager,
+ sharedRefDb,
+ gitReferenceUpdated,
+ verLogger,
+ projectsFilter,
+ DEFAULT_INSTANCE_ID)
.getProjectRemoteVersion(A_TEST_PROJECT_NAME);
assertThat(version.isPresent()).isTrue();