Merge branch 'stable-3.4' * stable-3.4: Fix merge issue for ForwardedIndexGroupHandlerTest class Add 'Forwarded-BatchIndex-Event' to events skipped from high-availability Do not forward events from high-availability Fix cache eviction for projects cache Adjust tests to reflect real life situation Honour index retries when indexing groups Change-Id: I7175e2fd8da2546e11655e63316720e562b14a2d
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java index f58efa7..88422e1 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java +++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -22,11 +22,11 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.gerrit.extensions.registration.DynamicItem; +import com.google.gerrit.server.config.GerritInstanceId; import com.google.gerrit.server.events.Event; import com.google.inject.Inject; import com.googlesource.gerrit.plugins.multisite.MessageLogger; import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction; -import com.googlesource.gerrit.plugins.multisite.forwarder.Context; import java.util.Set; import java.util.concurrent.Executor; import java.util.function.Consumer; @@ -39,17 +39,20 @@ private final DynamicItem<BrokerApi> apiDelegate; private final BrokerMetrics metrics; private final MessageLogger msgLog; + private final String nodeInstanceId; @Inject public BrokerApiWrapper( @BrokerExecutor Executor executor, DynamicItem<BrokerApi> apiDelegate, BrokerMetrics metrics, - MessageLogger msgLog) { + MessageLogger msgLog, + @GerritInstanceId String instanceId) { this.apiDelegate = apiDelegate; this.executor = executor; this.metrics = metrics; this.msgLog = msgLog; + this.nodeInstanceId = instanceId; } public boolean sendSync(String topic, Event event) { @@ -70,7 +73,7 @@ @Override public ListenableFuture<Boolean> send(String topic, Event message) { SettableFuture<Boolean> resultFuture = SettableFuture.create(); - if (Context.isForwardedEvent()) { + if (!nodeInstanceId.equals(message.instanceId)) { resultFuture.set(true); return resultFuture; }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java index b2efb80..7cb853a 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java +++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java
@@ -16,11 +16,11 @@ import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter; import com.google.gerrit.extensions.registration.DynamicSet; +import com.google.gerrit.server.config.GerritInstanceId; import com.google.gerrit.server.events.Event; import com.google.gerrit.server.events.EventListener; import com.google.gerrit.server.events.ProjectEvent; import com.google.inject.Inject; -import com.googlesource.gerrit.plugins.multisite.forwarder.Context; import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder; import java.util.concurrent.Executor; @@ -28,20 +28,24 @@ private final Executor executor; private final DynamicSet<StreamEventForwarder> forwarders; private final ProjectsFilter projectsFilter; + private final String nodeInstanceId; @Inject EventHandler( DynamicSet<StreamEventForwarder> forwarders, @EventExecutor Executor executor, - ProjectsFilter projectsFilter) { + ProjectsFilter projectsFilter, + @GerritInstanceId String nodeInstanceId) { this.forwarders = forwarders; this.executor = executor; this.projectsFilter = projectsFilter; + this.nodeInstanceId = nodeInstanceId; } @Override public void onEvent(Event event) { - if (!Context.isForwardedEvent() && event instanceof ProjectEvent) { + + if (nodeInstanceId.equals(event.instanceId) && event instanceof ProjectEvent) { if (projectsFilter.matches(event)) { executor.execute(new EventTask(event)); }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java index dcfd1b0..e2e3954 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java +++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java
@@ -51,11 +51,8 @@ */ public void dispatch(Event event) throws PermissionBackendException { try (ManualRequestContext ctx = oneOffCtx.open()) { - Context.setForwardedEvent(true); log.debug("dispatching event {}", event.getType()); dispatcher.get().postEvent(event); - } finally { - Context.unsetForwardedEvent(); } } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java index ea7dada..a23367b 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java +++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
@@ -24,6 +24,7 @@ import com.google.common.flogger.FluentLogger; import com.google.gerrit.entities.Project; import com.google.gerrit.entities.RefNames; +import com.google.gerrit.server.config.GerritInstanceId; import com.google.gerrit.server.events.Event; import com.google.gerrit.server.events.EventListener; import com.google.gerrit.server.events.RefUpdatedEvent; @@ -33,7 +34,6 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger; -import com.googlesource.gerrit.plugins.multisite.forwarder.Context; import java.io.IOException; import java.util.Optional; import org.eclipse.jgit.errors.RepositoryNotFoundException; @@ -59,6 +59,7 @@ private final GitReferenceUpdated gitReferenceUpdated; private final ProjectVersionLogger verLogger; private final ProjectsFilter projectsFilter; + private final String nodeInstanceId; protected final SharedRefDatabaseWrapper sharedRefDb; @@ -68,19 +69,21 @@ SharedRefDatabaseWrapper sharedRefDb, GitReferenceUpdated gitReferenceUpdated, ProjectVersionLogger verLogger, - ProjectsFilter projectsFilter) { + ProjectsFilter projectsFilter, + @GerritInstanceId String nodeInstanceId) { this.gitRepositoryManager = gitRepositoryManager; this.sharedRefDb = sharedRefDb; this.gitReferenceUpdated = gitReferenceUpdated; this.verLogger = verLogger; this.projectsFilter = projectsFilter; + this.nodeInstanceId = nodeInstanceId; } @Override public void onEvent(Event event) { logger.atFine().log("Processing event type: " + event.type); // Producer of the Event use RefUpdatedEvent to trigger the version update - if (!Context.isForwardedEvent() && event instanceof RefUpdatedEvent) { + if (nodeInstanceId.equals(event.instanceId) && event instanceof RefUpdatedEvent) { if (projectsFilter.matches(event)) { updateProducerProjectVersionUpdate((RefUpdatedEvent) event); }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java index 7d1751c..50f55b2 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -22,6 +22,7 @@ @RunWith(MockitoJUnitRunner.class) public class BrokerApiWrapperTest { + private static final String DEFAULT_INSTANCE_ID = "instance-id"; @Mock private BrokerMetrics brokerMetrics; @Mock private BrokerApi brokerApi; @Mock Event event; @@ -32,13 +33,14 @@ @Before public void setUp() { - event.instanceId = "instance-id"; + event.instanceId = DEFAULT_INSTANCE_ID; objectUnderTest = new BrokerApiWrapper( MoreExecutors.directExecutor(), DynamicItem.itemOf(BrokerApi.class, brokerApi), brokerMetrics, - msgLog); + msgLog, + DEFAULT_INSTANCE_ID); } @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java index b9b416d..2d73bf2 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java
@@ -29,7 +29,6 @@ import com.google.gerrit.server.events.ProjectEvent; import com.google.gerrit.server.events.RefUpdatedEvent; import com.googlesource.gerrit.plugins.multisite.event.EventHandler.EventTask; -import com.googlesource.gerrit.plugins.multisite.forwarder.Context; import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder; import org.junit.Before; import org.junit.Test; @@ -39,6 +38,7 @@ @RunWith(MockitoJUnitRunner.class) public class EventHandlerTest { + private static final String DEFAULT_INSTANCE_ID = "instance-id"; private EventHandler eventHandler; @@ -49,7 +49,11 @@ public void setUp() { when(projectsFilter.matches(any(ProjectEvent.class))).thenReturn(true); eventHandler = - new EventHandler(asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter); + new EventHandler( + asDynamicSet(forwarder), + MoreExecutors.directExecutor(), + projectsFilter, + DEFAULT_INSTANCE_ID); } private DynamicSet<StreamEventForwarder> asDynamicSet(StreamEventForwarder forwarder) { @@ -61,6 +65,7 @@ @Test public void shouldForwardAnyProjectEvent() throws Exception { ProjectEvent event = mock(ProjectEvent.class); + event.instanceId = DEFAULT_INSTANCE_ID; eventHandler.onEvent(event); verify(forwarder).send(event); } @@ -73,9 +78,9 @@ @Test public void shouldNotForwardIfAlreadyForwardedEvent() throws Exception { - Context.setForwardedEvent(true); - eventHandler.onEvent(mock(ProjectEvent.class)); - Context.unsetForwardedEvent(); + Event event = mock(ProjectEvent.class); + event.instanceId = "instance-id-2"; + eventHandler.onEvent(event); verifyZeroInteractions(forwarder); } @@ -84,7 +89,7 @@ when(projectsFilter.matches(any(ProjectEvent.class))).thenReturn(false); ProjectEvent event = mock(ProjectEvent.class); - + event.instanceId = DEFAULT_INSTANCE_ID; eventHandler.onEvent(event); verify(forwarder, never()).send(event); }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java index 704401b..73ef353 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java
@@ -14,13 +14,9 @@ package com.googlesource.gerrit.plugins.multisite.forwarder; -import static com.google.common.truth.Truth.assertThat; -import static com.google.gerrit.testing.GerritJUnit.assertThrows; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.gerrit.exceptions.StorageException; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.server.events.Event; import com.google.gerrit.server.events.EventDispatcher; @@ -33,7 +29,6 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; @RunWith(MockitoJUnitRunner.class) public class ForwardedEventHandlerTest { @@ -56,45 +51,4 @@ handler.dispatch(event); verify(dispatcherMock).postEvent(event); } - - @Test - public void shouldSetAndUnsetForwardedContext() throws Exception { - Event event = new ProjectCreatedEvent(); - // this doAnswer is to allow to assert that context is set to forwarded - // while cache eviction is called. - doAnswer( - (Answer<Void>) - invocation -> { - assertThat(Context.isForwardedEvent()).isTrue(); - return null; - }) - .when(dispatcherMock) - .postEvent(event); - - assertThat(Context.isForwardedEvent()).isFalse(); - handler.dispatch(event); - assertThat(Context.isForwardedEvent()).isFalse(); - - verify(dispatcherMock).postEvent(event); - } - - @Test - public void shouldSetAndUnsetForwardedContextEvenIfExceptionIsThrown() throws Exception { - Event event = new ProjectCreatedEvent(); - doAnswer( - (Answer<Void>) - invocation -> { - assertThat(Context.isForwardedEvent()).isTrue(); - throw new StorageException("someMessage"); - }) - .when(dispatcherMock) - .postEvent(event); - - assertThat(Context.isForwardedEvent()).isFalse(); - StorageException thrown = assertThrows(StorageException.class, () -> handler.dispatch(event)); - assertThat(thrown).hasMessageThat().isEqualTo("someMessage"); - assertThat(Context.isForwardedEvent()).isFalse(); - - verify(dispatcherMock).postEvent(event); - } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java index 917c6bf..d1697de 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
@@ -36,7 +36,6 @@ import com.google.gerrit.testing.InMemoryTestEnvironment; import com.google.inject.Inject; import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger; -import com.googlesource.gerrit.plugins.multisite.forwarder.Context; import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.RefFixture; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -49,7 +48,6 @@ import org.eclipse.jgit.lib.ObjectLoader; import org.eclipse.jgit.lib.Ref; import org.eclipse.jgit.revwalk.RevCommit; -import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -60,6 +58,8 @@ @RunWith(MockitoJUnitRunner.class) public class ProjectVersionRefUpdateTest implements RefFixture { + private static final String DEFAULT_INSTANCE_ID = "instance-id"; + @Rule public InMemoryTestEnvironment testEnvironment = new InMemoryTestEnvironment(); @Mock RefUpdatedEvent refUpdatedEvent; @@ -70,7 +70,6 @@ @Inject private ProjectConfig.Factory projectConfigFactory; @Inject private InMemoryRepositoryManager repoManager; - private TestRepository<InMemoryRepository> repo; private ProjectConfig project; private RevCommit masterCommit; @@ -78,6 +77,7 @@ @Before public void setUp() throws Exception { when(projectsFilter.matches(any(Event.class))).thenReturn(true); + refUpdatedEvent.instanceId = DEFAULT_INSTANCE_ID; InMemoryRepository inMemoryRepo = repoManager.createRepository(A_TEST_PROJECT_NAME_KEY); project = projectConfigFactory.create(A_TEST_PROJECT_NAME_KEY); project.load(inMemoryRepo); @@ -85,14 +85,8 @@ masterCommit = repo.branch("master").commit().create(); } - @After - public void tearDown() { - Context.unsetForwardedEvent(); - } - @Test public void producerShouldUpdateProjectVersionUponRefUpdatedEvent() throws IOException { - Context.setForwardedEvent(false); when(sharedRefDb.get( A_TEST_PROJECT_NAME_KEY, ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF, @@ -111,7 +105,12 @@ when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME); new ProjectVersionRefUpdate( - repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter) + repoManager, + sharedRefDb, + gitReferenceUpdated, + verLogger, + projectsFilter, + DEFAULT_INSTANCE_ID) .onEvent(refUpdatedEvent); Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF); @@ -130,8 +129,6 @@ @Test public void producerShouldUpdateProjectVersionUponForcedPushRefUpdatedEvent() throws Exception { - Context.setForwardedEvent(false); - Thread.sleep(1000L); RevCommit masterPlusOneCommit = repo.branch("master").commit().create(); @@ -156,7 +153,12 @@ when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME); new ProjectVersionRefUpdate( - repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter) + repoManager, + sharedRefDb, + gitReferenceUpdated, + verLogger, + projectsFilter, + DEFAULT_INSTANCE_ID) .onEvent(refUpdatedEvent); Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF); @@ -176,7 +178,6 @@ @Test public void producerShouldCreateNewProjectVersionWhenMissingUponRefUpdatedEvent() throws IOException { - Context.setForwardedEvent(false); when(sharedRefDb.get( A_TEST_PROJECT_NAME_KEY, ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF, @@ -196,7 +197,12 @@ when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME); new ProjectVersionRefUpdate( - repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter) + repoManager, + sharedRefDb, + gitReferenceUpdated, + verLogger, + projectsFilter, + DEFAULT_INSTANCE_ID) .onEvent(refUpdatedEvent); Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF); @@ -233,13 +239,37 @@ private void producerShouldNotUpdateProjectVersionUponMagicRefUpdatedEvent(String magicRefPrefix) throws Exception { String magicRefName = magicRefPrefix + "/foo"; - Context.setForwardedEvent(false); when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY); when(refUpdatedEvent.getRefName()).thenReturn(magicRefName); repo.branch(magicRefName).commit().create(); new ProjectVersionRefUpdate( - repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter) + repoManager, + sharedRefDb, + gitReferenceUpdated, + verLogger, + projectsFilter, + DEFAULT_INSTANCE_ID) + .onEvent(refUpdatedEvent); + + Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF); + assertThat(ref).isNull(); + + verifyZeroInteractions(verLogger); + } + + @Test + public void producerShouldNotUpdateProjectVersionUponForwardedRefUpdatedEvent() + throws IOException { + refUpdatedEvent.instanceId = "instance-id-2"; + + new ProjectVersionRefUpdate( + repoManager, + sharedRefDb, + gitReferenceUpdated, + verLogger, + projectsFilter, + DEFAULT_INSTANCE_ID) .onEvent(refUpdatedEvent); Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF); @@ -250,12 +280,16 @@ @Test public void shouldNotUpdateProjectVersionWhenProjectDoesntExist() throws IOException { - Context.setForwardedEvent(false); when(refUpdatedEvent.getProjectNameKey()).thenReturn(Project.nameKey("aNonExistentProject")); when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME); new ProjectVersionRefUpdate( - repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter) + repoManager, + sharedRefDb, + gitReferenceUpdated, + verLogger, + projectsFilter, + DEFAULT_INSTANCE_ID) .onEvent(refUpdatedEvent); Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF); @@ -268,8 +302,6 @@ public void shouldNotUpdateProjectVersionWhenProjectFilteredOut() throws Exception { when(projectsFilter.matches(any(Event.class))).thenReturn(false); - Context.setForwardedEvent(false); - Thread.sleep(1000L); repo.branch("master").commit().create(); @@ -277,7 +309,12 @@ repo.branch("master").update(masterCommit); new ProjectVersionRefUpdate( - repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter) + repoManager, + sharedRefDb, + gitReferenceUpdated, + verLogger, + projectsFilter, + DEFAULT_INSTANCE_ID) .onEvent(refUpdatedEvent); Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF); @@ -293,7 +330,12 @@ Optional<Long> version = new ProjectVersionRefUpdate( - repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter) + repoManager, + sharedRefDb, + gitReferenceUpdated, + verLogger, + projectsFilter, + DEFAULT_INSTANCE_ID) .getProjectRemoteVersion(A_TEST_PROJECT_NAME); assertThat(version.isPresent()).isTrue();