Merge branch 'stable-3.8' into stable-3.9
* stable-3.8:
Do not propagate internal indexing operations
Change-Id: Ibe4cb5aaeaebe77c785fb42c09752a9a309edcce
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 5c443e7..e3f53ab 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -306,12 +306,15 @@
static final String INDEX_SECTION = "index";
static final String MAX_TRIES_KEY = "maxTries";
static final String RETRY_INTERVAL_KEY = "retryInterval";
+ static final String SYNCHRONIZE_FORCED_KEY = "synchronizeForced";
+ static final boolean DEFAULT_SYNCHRONIZE_FORCED = true;
private final int threadPoolSize;
private final int retryInterval;
private final int maxTries;
private final int numStripedLocks;
+ private final boolean synchronizeForced;
private Index(Supplier<Config> cfg) {
super(cfg, INDEX_SECTION);
@@ -322,6 +325,8 @@
maxTries = getInt(cfg, INDEX_SECTION, null, MAX_TRIES_KEY, DEFAULT_INDEX_MAX_TRIES);
numStripedLocks =
getInt(cfg, INDEX_SECTION, null, NUM_STRIPED_LOCKS, DEFAULT_NUM_STRIPED_LOCKS);
+ synchronizeForced =
+ getBoolean(cfg, INDEX_SECTION, null, SYNCHRONIZE_FORCED_KEY, DEFAULT_SYNCHRONIZE_FORCED);
}
public int threadPoolSize() {
@@ -339,6 +344,10 @@
public int numStripedLocks() {
return numStripedLocks;
}
+
+ public boolean synchronizeForced() {
+ return synchronizeForced;
+ }
}
public static class Broker {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/CurrentRequestContext.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/CurrentRequestContext.java
new file mode 100644
index 0000000..d6598b9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/CurrentRequestContext.java
@@ -0,0 +1,58 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.index;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import java.util.function.Consumer;
+
+@Singleton
+public class CurrentRequestContext {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private ThreadLocalRequestContext threadLocalCtx;
+ private Configuration cfg;
+ private OneOffRequestContext oneOffCtx;
+
+ @Inject
+ public CurrentRequestContext(
+ ThreadLocalRequestContext threadLocalCtx, Configuration cfg, OneOffRequestContext oneOffCtx) {
+ this.threadLocalCtx = threadLocalCtx;
+ this.cfg = cfg;
+ this.oneOffCtx = oneOffCtx;
+ }
+
+ public void onlyWithContext(Consumer<RequestContext> body) {
+ RequestContext ctx = threadLocalCtx.getContext();
+ if (ctx == null && !cfg.index().synchronizeForced()) {
+ logger.atFine().log("No context, skipping event (index.synchronizeForced is false)");
+ return;
+ }
+
+ if (ctx == null) {
+ try (ManualRequestContext manualCtx = oneOffCtx.open()) {
+ body.accept(manualCtx);
+ }
+ } else {
+ body.accept(ctx);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index 4369619..9aee56e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -48,6 +48,7 @@
private final ChangeCheckerImpl.Factory changeChecker;
private final GroupChecker groupChecker;
private final String instanceId;
+ private final CurrentRequestContext currCtx;
@Inject
IndexEventHandler(
@@ -55,27 +56,32 @@
DynamicSet<IndexEventForwarder> forwarders,
ChangeCheckerImpl.Factory changeChecker,
GroupChecker groupChecker,
- @GerritInstanceId String instanceId) {
+ @GerritInstanceId String instanceId,
+ CurrentRequestContext currCtx) {
this.forwarders = forwarders;
this.executor = executor;
this.changeChecker = changeChecker;
this.groupChecker = groupChecker;
this.instanceId = instanceId;
+ this.currCtx = currCtx;
}
@Override
public void onAccountIndexed(int id) {
- if (!Context.isForwardedEvent()) {
- IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id, instanceId));
- if (queuedTasks.add(task)) {
- executor.execute(task);
- }
- }
+ currCtx.onlyWithContext(
+ (ctx) -> {
+ if (!Context.isForwardedEvent()) {
+ IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id, instanceId));
+ if (queuedTasks.add(task)) {
+ executor.execute(task);
+ }
+ }
+ });
}
@Override
public void onChangeIndexed(String projectName, int id) {
- executeIndexChangeTask(projectName, id);
+ currCtx.onlyWithContext((ctx) -> executeIndexChangeTask(projectName, id));
}
@Override
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 0cfd6ea..3baa911 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -44,6 +44,11 @@
service such as ElasticSearch.
Defaults to true.
+```index.synchronizeForced```
+: Whether to synchronize forced index events. E.g. on-line reindex
+automatically triggered upon version upgrades.
+Defaults to true.
+
```index.threadPoolSize```
: Maximum number of threads used to send index events to the target instance.
Defaults to 4.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
index 0863f55..3a24773 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
@@ -21,7 +21,9 @@
import static com.googlesource.gerrit.plugins.multisite.Configuration.Event.EVENT_SECTION;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.DEFAULT_SYNCHRONIZE;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.SYNCHRONIZE_KEY;
+import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.DEFAULT_SYNCHRONIZE_FORCED;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.INDEX_SECTION;
+import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.SYNCHRONIZE_FORCED_KEY;
import static com.googlesource.gerrit.plugins.multisite.Configuration.THREAD_POOL_SIZE_KEY;
import com.google.common.collect.ImmutableList;
@@ -129,4 +131,13 @@
replicationConfig.setBoolean("gerrit", null, "replicateOnStartup", true);
assertThat(new Configuration(globalPluginConfig, replicationConfig).validate()).isNotEmpty();
}
+
+ @Test
+ public void testGetIndexSynchronizeForced() throws Exception {
+ assertThat(getConfiguration().index().synchronizeForced())
+ .isEqualTo(DEFAULT_SYNCHRONIZE_FORCED);
+
+ globalPluginConfig.setBoolean(INDEX_SECTION, null, SYNCHRONIZE_FORCED_KEY, false);
+ assertThat(getConfiguration().index().synchronizeForced()).isFalse();
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
index ae5851d..7e27eeb 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
@@ -15,14 +15,26 @@
package com.googlesource.gerrit.plugins.multisite.index;
import static com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationEndpoints.APPLY_OBJECT_API_ENDPOINT;
-import static org.mockito.Mockito.anyString;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -33,21 +45,34 @@
public class IndexEventHandlerTest {
private static final String INSTANCE_ID = "instance-id";
+ private static final String PROJECT_NAME = "test_project";
+ private static final int CHANGE_ID = 1;
private IndexEventHandler eventHandler;
@Mock private IndexEventForwarder forwarder;
@Mock private ChangeCheckerImpl.Factory changeChecker;
+ @Mock private ChangeChecker changeCheckerMock;
+ @Mock private RequestContext mockCtx;
+
+ private CurrentRequestContext currCtx =
+ new CurrentRequestContext(null, null, null) {
+ @Override
+ public void onlyWithContext(Consumer<RequestContext> body) {
+ body.accept(mockCtx);
+ }
+ };
@Before
- public void setUp() {
+ public void setUp() throws IOException {
eventHandler =
new IndexEventHandler(
MoreExecutors.directExecutor(),
asDynamicSet(forwarder),
changeChecker,
new TestGroupChecker(true),
- INSTANCE_ID);
+ INSTANCE_ID,
+ currCtx);
}
private DynamicSet<IndexEventForwarder> asDynamicSet(IndexEventForwarder forwarder) {
@@ -62,17 +87,54 @@
String currentThreadName = Thread.currentThread().getName();
try {
Thread.currentThread().setName("pull-replication~" + APPLY_OBJECT_API_ENDPOINT);
- int changeId = 1;
Context.setForwardedEvent(false);
lenient()
.when(changeChecker.create(anyString()))
.thenThrow(
new IllegalStateException("Change indexing event should have not been triggered"));
- eventHandler.onChangeIndexed("test_project", changeId);
+ eventHandler.onChangeIndexed(PROJECT_NAME, CHANGE_ID);
verifyNoInteractions(changeChecker);
} finally {
Thread.currentThread().setName(currentThreadName);
}
}
+
+ @Test
+ public void shouldNotForwardIndexChangeWhenContextIsMissingAndForcedIndexingDisabled()
+ throws Exception {
+ eventHandler = createIndexEventHandler(changeChecker, false);
+ eventHandler.onChangeIndexed(PROJECT_NAME, CHANGE_ID);
+ verifyNoInteractions(changeChecker);
+ verifyNoInteractions(forwarder);
+ }
+
+ @Test
+ public void shouldForwardIndexChangeWhenContextIsMissingAndForcedIndexingEnabled()
+ throws Exception {
+ when(changeChecker.create(any())).thenReturn(changeCheckerMock);
+ when(changeCheckerMock.newIndexEvent(PROJECT_NAME, CHANGE_ID, false))
+ .thenReturn(Optional.of(new ChangeIndexEvent(PROJECT_NAME, CHANGE_ID, false, INSTANCE_ID)));
+ eventHandler = createIndexEventHandler(changeChecker, true);
+ eventHandler.onChangeIndexed(PROJECT_NAME, CHANGE_ID);
+ verify(changeCheckerMock).newIndexEvent(PROJECT_NAME, CHANGE_ID, false);
+ verify(forwarder).index(any(), any());
+ }
+
+ private IndexEventHandler createIndexEventHandler(
+ ChangeCheckerImpl.Factory changeChecker, boolean synchronizeForced) {
+ ThreadLocalRequestContext threadLocalCtxMock = mock(ThreadLocalRequestContext.class);
+ OneOffRequestContext oneOffCtxMock = mock(OneOffRequestContext.class);
+ Configuration cfgMock = mock(Configuration.class);
+ Configuration.Index cfgIndex = mock(Configuration.Index.class);
+ when(cfgMock.index()).thenReturn(cfgIndex);
+ when(cfgIndex.synchronizeForced()).thenReturn(synchronizeForced);
+ return new IndexEventHandler(
+ MoreExecutors.directExecutor(),
+ asDynamicSet(forwarder),
+ changeChecker,
+ new TestGroupChecker(true),
+ INSTANCE_ID,
+ new CurrentRequestContext(threadLocalCtxMock, cfgMock, oneOffCtxMock));
+ }
}