Do not propagate internal indexing operations
This change brings a feature that was previously integrated into
the High Availability (HA) plugin as part of the change 245113.
Give the ability to block the propagation of the forced reindexing
events (accounts and changes) when there isn't a current request context
available.
The new behaviour is controlled by the index.synchronizeForced
configuration setting.
Bug: Issue 316786570
Change-Id: I1391d268d90b3bf1d4d43b6a9deb4ae64bb6bcda
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 5c443e7..e3f53ab 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -306,12 +306,15 @@
static final String INDEX_SECTION = "index";
static final String MAX_TRIES_KEY = "maxTries";
static final String RETRY_INTERVAL_KEY = "retryInterval";
+ static final String SYNCHRONIZE_FORCED_KEY = "synchronizeForced";
+ static final boolean DEFAULT_SYNCHRONIZE_FORCED = true;
private final int threadPoolSize;
private final int retryInterval;
private final int maxTries;
private final int numStripedLocks;
+ private final boolean synchronizeForced;
private Index(Supplier<Config> cfg) {
super(cfg, INDEX_SECTION);
@@ -322,6 +325,8 @@
maxTries = getInt(cfg, INDEX_SECTION, null, MAX_TRIES_KEY, DEFAULT_INDEX_MAX_TRIES);
numStripedLocks =
getInt(cfg, INDEX_SECTION, null, NUM_STRIPED_LOCKS, DEFAULT_NUM_STRIPED_LOCKS);
+ synchronizeForced =
+ getBoolean(cfg, INDEX_SECTION, null, SYNCHRONIZE_FORCED_KEY, DEFAULT_SYNCHRONIZE_FORCED);
}
public int threadPoolSize() {
@@ -339,6 +344,10 @@
public int numStripedLocks() {
return numStripedLocks;
}
+
+ public boolean synchronizeForced() {
+ return synchronizeForced;
+ }
}
public static class Broker {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/CurrentRequestContext.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/CurrentRequestContext.java
new file mode 100644
index 0000000..d6598b9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/CurrentRequestContext.java
@@ -0,0 +1,58 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.index;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import java.util.function.Consumer;
+
+@Singleton
+public class CurrentRequestContext {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private ThreadLocalRequestContext threadLocalCtx;
+ private Configuration cfg;
+ private OneOffRequestContext oneOffCtx;
+
+ @Inject
+ public CurrentRequestContext(
+ ThreadLocalRequestContext threadLocalCtx, Configuration cfg, OneOffRequestContext oneOffCtx) {
+ this.threadLocalCtx = threadLocalCtx;
+ this.cfg = cfg;
+ this.oneOffCtx = oneOffCtx;
+ }
+
+ public void onlyWithContext(Consumer<RequestContext> body) {
+ RequestContext ctx = threadLocalCtx.getContext();
+ if (ctx == null && !cfg.index().synchronizeForced()) {
+ logger.atFine().log("No context, skipping event (index.synchronizeForced is false)");
+ return;
+ }
+
+ if (ctx == null) {
+ try (ManualRequestContext manualCtx = oneOffCtx.open()) {
+ body.accept(manualCtx);
+ }
+ } else {
+ body.accept(ctx);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index 4369619..9aee56e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -48,6 +48,7 @@
private final ChangeCheckerImpl.Factory changeChecker;
private final GroupChecker groupChecker;
private final String instanceId;
+ private final CurrentRequestContext currCtx;
@Inject
IndexEventHandler(
@@ -55,27 +56,32 @@
DynamicSet<IndexEventForwarder> forwarders,
ChangeCheckerImpl.Factory changeChecker,
GroupChecker groupChecker,
- @GerritInstanceId String instanceId) {
+ @GerritInstanceId String instanceId,
+ CurrentRequestContext currCtx) {
this.forwarders = forwarders;
this.executor = executor;
this.changeChecker = changeChecker;
this.groupChecker = groupChecker;
this.instanceId = instanceId;
+ this.currCtx = currCtx;
}
@Override
public void onAccountIndexed(int id) {
- if (!Context.isForwardedEvent()) {
- IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id, instanceId));
- if (queuedTasks.add(task)) {
- executor.execute(task);
- }
- }
+ currCtx.onlyWithContext(
+ (ctx) -> {
+ if (!Context.isForwardedEvent()) {
+ IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id, instanceId));
+ if (queuedTasks.add(task)) {
+ executor.execute(task);
+ }
+ }
+ });
}
@Override
public void onChangeIndexed(String projectName, int id) {
- executeIndexChangeTask(projectName, id);
+ currCtx.onlyWithContext((ctx) -> executeIndexChangeTask(projectName, id));
}
@Override
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 0cfd6ea..3baa911 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -44,6 +44,11 @@
service such as ElasticSearch.
Defaults to true.
+```index.synchronizeForced```
+: Whether to synchronize forced index events. E.g. on-line reindex
+automatically triggered upon version upgrades.
+Defaults to true.
+
```index.threadPoolSize```
: Maximum number of threads used to send index events to the target instance.
Defaults to 4.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
index 0863f55..3a24773 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
@@ -21,7 +21,9 @@
import static com.googlesource.gerrit.plugins.multisite.Configuration.Event.EVENT_SECTION;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.DEFAULT_SYNCHRONIZE;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.SYNCHRONIZE_KEY;
+import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.DEFAULT_SYNCHRONIZE_FORCED;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.INDEX_SECTION;
+import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.SYNCHRONIZE_FORCED_KEY;
import static com.googlesource.gerrit.plugins.multisite.Configuration.THREAD_POOL_SIZE_KEY;
import com.google.common.collect.ImmutableList;
@@ -129,4 +131,13 @@
replicationConfig.setBoolean("gerrit", null, "replicateOnStartup", true);
assertThat(new Configuration(globalPluginConfig, replicationConfig).validate()).isNotEmpty();
}
+
+ @Test
+ public void testGetIndexSynchronizeForced() throws Exception {
+ assertThat(getConfiguration().index().synchronizeForced())
+ .isEqualTo(DEFAULT_SYNCHRONIZE_FORCED);
+
+ globalPluginConfig.setBoolean(INDEX_SECTION, null, SYNCHRONIZE_FORCED_KEY, false);
+ assertThat(getConfiguration().index().synchronizeForced()).isFalse();
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
index ae5851d..7e27eeb 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
@@ -15,14 +15,26 @@
package com.googlesource.gerrit.plugins.multisite.index;
import static com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationEndpoints.APPLY_OBJECT_API_ENDPOINT;
-import static org.mockito.Mockito.anyString;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -33,21 +45,34 @@
public class IndexEventHandlerTest {
private static final String INSTANCE_ID = "instance-id";
+ private static final String PROJECT_NAME = "test_project";
+ private static final int CHANGE_ID = 1;
private IndexEventHandler eventHandler;
@Mock private IndexEventForwarder forwarder;
@Mock private ChangeCheckerImpl.Factory changeChecker;
+ @Mock private ChangeChecker changeCheckerMock;
+ @Mock private RequestContext mockCtx;
+
+ private CurrentRequestContext currCtx =
+ new CurrentRequestContext(null, null, null) {
+ @Override
+ public void onlyWithContext(Consumer<RequestContext> body) {
+ body.accept(mockCtx);
+ }
+ };
@Before
- public void setUp() {
+ public void setUp() throws IOException {
eventHandler =
new IndexEventHandler(
MoreExecutors.directExecutor(),
asDynamicSet(forwarder),
changeChecker,
new TestGroupChecker(true),
- INSTANCE_ID);
+ INSTANCE_ID,
+ currCtx);
}
private DynamicSet<IndexEventForwarder> asDynamicSet(IndexEventForwarder forwarder) {
@@ -62,17 +87,54 @@
String currentThreadName = Thread.currentThread().getName();
try {
Thread.currentThread().setName("pull-replication~" + APPLY_OBJECT_API_ENDPOINT);
- int changeId = 1;
Context.setForwardedEvent(false);
lenient()
.when(changeChecker.create(anyString()))
.thenThrow(
new IllegalStateException("Change indexing event should have not been triggered"));
- eventHandler.onChangeIndexed("test_project", changeId);
+ eventHandler.onChangeIndexed(PROJECT_NAME, CHANGE_ID);
verifyNoInteractions(changeChecker);
} finally {
Thread.currentThread().setName(currentThreadName);
}
}
+
+ @Test
+ public void shouldNotForwardIndexChangeWhenContextIsMissingAndForcedIndexingDisabled()
+ throws Exception {
+ eventHandler = createIndexEventHandler(changeChecker, false);
+ eventHandler.onChangeIndexed(PROJECT_NAME, CHANGE_ID);
+ verifyNoInteractions(changeChecker);
+ verifyNoInteractions(forwarder);
+ }
+
+ @Test
+ public void shouldForwardIndexChangeWhenContextIsMissingAndForcedIndexingEnabled()
+ throws Exception {
+ when(changeChecker.create(any())).thenReturn(changeCheckerMock);
+ when(changeCheckerMock.newIndexEvent(PROJECT_NAME, CHANGE_ID, false))
+ .thenReturn(Optional.of(new ChangeIndexEvent(PROJECT_NAME, CHANGE_ID, false, INSTANCE_ID)));
+ eventHandler = createIndexEventHandler(changeChecker, true);
+ eventHandler.onChangeIndexed(PROJECT_NAME, CHANGE_ID);
+ verify(changeCheckerMock).newIndexEvent(PROJECT_NAME, CHANGE_ID, false);
+ verify(forwarder).index(any(), any());
+ }
+
+ private IndexEventHandler createIndexEventHandler(
+ ChangeCheckerImpl.Factory changeChecker, boolean synchronizeForced) {
+ ThreadLocalRequestContext threadLocalCtxMock = mock(ThreadLocalRequestContext.class);
+ OneOffRequestContext oneOffCtxMock = mock(OneOffRequestContext.class);
+ Configuration cfgMock = mock(Configuration.class);
+ Configuration.Index cfgIndex = mock(Configuration.Index.class);
+ when(cfgMock.index()).thenReturn(cfgIndex);
+ when(cfgIndex.synchronizeForced()).thenReturn(synchronizeForced);
+ return new IndexEventHandler(
+ MoreExecutors.directExecutor(),
+ asDynamicSet(forwarder),
+ changeChecker,
+ new TestGroupChecker(true),
+ INSTANCE_ID,
+ new CurrentRequestContext(threadLocalCtxMock, cfgMock, oneOffCtxMock));
+ }
}