Merge branch 'stable-2.16' into stable-3.0
* stable-2.16:
Do not propagate internal indexing operations
Upgrade bazlets to latest stable-2.16 to build with 2.16.13 API
Upgrade bazlets to latest stable-2.16
Change-Id: Id7db07ce562b8f812ebf94cddc1aaf98e9beac43
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
index b834b49..58a21b6 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
@@ -447,12 +447,14 @@
static final String INDEX_SECTION = "index";
static final String MAX_TRIES_KEY = "maxTries";
static final String RETRY_INTERVAL_KEY = "retryInterval";
+ static final String SYNCHRONIZE_FORCED_KEY = "synchronizeForced";
+ static final boolean DEFAULT_SYNCHRONIZE_FORCED = true;
private final int threadPoolSize;
private final int retryInterval;
private final int maxTries;
-
private final int numStripedLocks;
+ private final boolean synchronizeForced;
private Index(Config cfg) {
super(cfg, INDEX_SECTION);
@@ -460,6 +462,8 @@
numStripedLocks = getInt(cfg, INDEX_SECTION, NUM_STRIPED_LOCKS, DEFAULT_NUM_STRIPED_LOCKS);
retryInterval = getInt(cfg, INDEX_SECTION, RETRY_INTERVAL_KEY, DEFAULT_INDEX_RETRY_INTERVAL);
maxTries = getInt(cfg, INDEX_SECTION, MAX_TRIES_KEY, DEFAULT_INDEX_MAX_TRIES);
+ synchronizeForced =
+ cfg.getBoolean(INDEX_SECTION, SYNCHRONIZE_FORCED_KEY, DEFAULT_SYNCHRONIZE_FORCED);
}
public int threadPoolSize() {
@@ -477,6 +481,10 @@
public int maxTries() {
return maxTries;
}
+
+ public boolean synchronizeForced() {
+ return synchronizeForced;
+ }
}
public static class Websession extends Forwarding {
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/IndexTs.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/IndexTs.java
index 54fe546..6255986 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/IndexTs.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/IndexTs.java
@@ -16,6 +16,7 @@
import com.ericsson.gerrit.plugins.highavailability.forwarder.rest.AbstractIndexRestApiServlet;
import com.ericsson.gerrit.plugins.highavailability.forwarder.rest.AbstractIndexRestApiServlet.IndexName;
+import com.ericsson.gerrit.plugins.highavailability.index.CurrentRequestContext;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.annotations.PluginData;
import com.google.gerrit.extensions.events.AccountIndexedListener;
@@ -49,6 +50,7 @@
private final ScheduledExecutorService exec;
private final FlusherRunner flusher;
private final ChangeFinder changeFinder;
+ private final CurrentRequestContext currCtx;
private volatile LocalDateTime changeTs;
private volatile LocalDateTime accountTs;
@@ -79,45 +81,53 @@
}
@Inject
- public IndexTs(@PluginData Path dataDir, WorkQueue queue, ChangeFinder changeFinder) {
+ public IndexTs(
+ @PluginData Path dataDir,
+ WorkQueue queue,
+ ChangeFinder changeFinder,
+ CurrentRequestContext currCtx) {
this.dataDir = dataDir;
this.exec = queue.getDefaultQueue();
this.flusher = new FlusherRunner();
this.changeFinder = changeFinder;
+ this.currCtx = currCtx;
}
@Override
public void onProjectIndexed(String project) {
- update(IndexName.PROJECT, LocalDateTime.now());
+ currCtx.onlyWithContext((ctx) -> update(IndexName.PROJECT, LocalDateTime.now()));
}
@Override
public void onGroupIndexed(String uuid) {
- update(IndexName.GROUP, LocalDateTime.now());
+ currCtx.onlyWithContext((ctx) -> update(IndexName.GROUP, LocalDateTime.now()));
}
@Override
public void onAccountIndexed(int id) {
- update(IndexName.ACCOUNT, LocalDateTime.now());
+ currCtx.onlyWithContext((ctx) -> update(IndexName.ACCOUNT, LocalDateTime.now()));
}
@Override
public void onChangeIndexed(String projectName, int id) {
- try {
- ChangeNotes changeNotes = changeFinder.findOne(projectName + "~" + id);
- update(
- IndexName.CHANGE,
- changeNotes == null
- ? LocalDateTime.now()
- : changeNotes.getChange().getLastUpdatedOn().toLocalDateTime());
- } catch (Exception e) {
- log.atWarning().withCause(e).log("Unable to update the latest TS for change %d", id);
- }
+ currCtx.onlyWithContext(
+ (ctx) -> {
+ try {
+ ChangeNotes changeNotes = changeFinder.findOne(projectName + "~" + id);
+ update(
+ IndexName.CHANGE,
+ changeNotes == null
+ ? LocalDateTime.now()
+ : changeNotes.getChange().getLastUpdatedOn().toLocalDateTime());
+ } catch (Exception e) {
+ log.atWarning().withCause(e).log("Unable to update the latest TS for change %d", id);
+ }
+ });
}
@Override
public void onChangeDeleted(int id) {
- update(IndexName.CHANGE, LocalDateTime.now());
+ currCtx.onlyWithContext((ctx) -> update(IndexName.CHANGE, LocalDateTime.now()));
}
public Optional<LocalDateTime> getUpdateTs(AbstractIndexRestApiServlet.IndexName index) {
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/CurrentRequestContext.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/CurrentRequestContext.java
new file mode 100644
index 0000000..46aadbb
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/CurrentRequestContext.java
@@ -0,0 +1,58 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.function.Consumer;
+
+@Singleton
+public class CurrentRequestContext {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private ThreadLocalRequestContext threadLocalCtx;
+ private Configuration cfg;
+ private OneOffRequestContext oneOffCtx;
+
+ @Inject
+ public CurrentRequestContext(
+ ThreadLocalRequestContext threadLocalCtx, Configuration cfg, OneOffRequestContext oneOffCtx) {
+ this.threadLocalCtx = threadLocalCtx;
+ this.cfg = cfg;
+ this.oneOffCtx = oneOffCtx;
+ }
+
+ public void onlyWithContext(Consumer<RequestContext> body) {
+ RequestContext ctx = threadLocalCtx.getContext();
+ if (ctx == null && !cfg.index().synchronizeForced()) {
+ logger.atFine().log("No context, skipping event (index.synchronizeForced is false)");
+ return;
+ }
+
+ if (ctx == null) {
+ try (ManualRequestContext manualCtx = oneOffCtx.open()) {
+ body.accept(manualCtx);
+ }
+ } else {
+ body.accept(ctx);
+ }
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
index 273013d..0697ad7 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
@@ -41,31 +41,42 @@
private final String pluginName;
private final Set<IndexTask> queuedTasks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ChangeCheckerImpl.Factory changeChecker;
+ private final CurrentRequestContext currCtx;
@Inject
IndexEventHandler(
@IndexExecutor Executor executor,
@PluginName String pluginName,
Forwarder forwarder,
- ChangeCheckerImpl.Factory changeChecker) {
+ ChangeCheckerImpl.Factory changeChecker,
+ CurrentRequestContext currCtx) {
this.forwarder = forwarder;
this.executor = executor;
this.pluginName = pluginName;
this.changeChecker = changeChecker;
+ this.currCtx = currCtx;
}
@Override
public void onAccountIndexed(int id) {
- if (!Context.isForwardedEvent()) {
- IndexAccountTask task = new IndexAccountTask(id);
- if (queuedTasks.add(task)) {
- executor.execute(task);
- }
- }
+ currCtx.onlyWithContext(
+ (ctx) -> {
+ if (!Context.isForwardedEvent()) {
+ IndexAccountTask task = new IndexAccountTask(id);
+ if (queuedTasks.add(task)) {
+ executor.execute(task);
+ }
+ }
+ });
}
@Override
public void onChangeIndexed(String projectName, int id) {
+ currCtx.onlyWithContext((ctx) -> executeIndexChangeTask(projectName, id));
+ }
+
+ private void executeIndexChangeTask(String projectName, int id) {
+
if (!Context.isForwardedEvent()) {
String changeId = projectName + "~" + id;
try {
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index bb72501..e79cbdd 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -191,6 +191,11 @@
: Whether to synchronize secondary indexes.
Defaults to true.
+```index.synchronizeForced```
+: Whether to synchronize forced index events. E.g. on-line reindex
+ automatically triggered upon version upgrades.
+ Defaults to true.
+
```index.threadPoolSize```
: Maximum number of threads used to send index events to the target instance.
Defaults to 4.
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
index 1b67a3a..e28f8c9 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
@@ -34,7 +34,9 @@
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.RETRY_INTERVAL_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.SOCKET_TIMEOUT_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.USER_KEY;
+import static com.ericsson.gerrit.plugins.highavailability.Configuration.Index.DEFAULT_SYNCHRONIZE_FORCED;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Index.INDEX_SECTION;
+import static com.ericsson.gerrit.plugins.highavailability.Configuration.Index.SYNCHRONIZE_FORCED_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.JGroups.CLUSTER_NAME_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.JGroups.DEFAULT_CLUSTER_NAME;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.JGroups.DEFAULT_SKIP_INTERFACE_LIST;
@@ -387,4 +389,13 @@
globalPluginConfig.setInt(INDEX_SECTION, null, NUM_STRIPED_LOCKS, 100);
assertThat(getConfiguration().index().numStripedLocks()).isEqualTo(100);
}
+
+ @Test
+ public void testGetIndexSynchronizeForced() throws Exception {
+ assertThat(getConfiguration().index().synchronizeForced())
+ .isEqualTo(DEFAULT_SYNCHRONIZE_FORCED);
+
+ globalPluginConfig.setBoolean(INDEX_SECTION, null, SYNCHRONIZE_FORCED_KEY, false);
+ assertThat(getConfiguration().index().synchronizeForced()).isFalse();
+ }
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
index d9e1b22..2d12ca8 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
@@ -18,11 +18,13 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Context;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
@@ -34,8 +36,12 @@
import com.google.gerrit.reviewdb.client.Account;
import com.google.gerrit.reviewdb.client.AccountGroup;
import com.google.gerrit.reviewdb.client.Change;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.server.util.ThreadLocalRequestContext;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -58,6 +64,15 @@
private Change.Id changeId;
private Account.Id accountId;
private AccountGroup.UUID accountGroupUUID;
+ @Mock private RequestContext mockCtx;
+
+ private CurrentRequestContext currCtx =
+ new CurrentRequestContext(null, null, null) {
+ @Override
+ public void onlyWithContext(Consumer<RequestContext> body) {
+ body.accept(mockCtx);
+ }
+ };
@Before
public void setUpMocks() throws Exception {
@@ -66,9 +81,18 @@
accountGroupUUID = new AccountGroup.UUID(UUID);
when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerMock);
when(changeCheckerMock.newIndexEvent()).thenReturn(Optional.of(new IndexEvent()));
+
+ setUpIndexEventHandler(currCtx);
+ }
+
+ public void setUpIndexEventHandler(CurrentRequestContext currCtx) throws Exception {
indexEventHandler =
new IndexEventHandler(
- MoreExecutors.directExecutor(), PLUGIN_NAME, forwarder, changeCheckerFactoryMock);
+ MoreExecutors.directExecutor(),
+ PLUGIN_NAME,
+ forwarder,
+ changeCheckerFactoryMock,
+ currCtx);
}
@Test
@@ -78,6 +102,33 @@
}
@Test
+ public void shouldNotIndexInRemoteWhenContextIsMissing() throws Exception {
+ ThreadLocalRequestContext threadLocalCtxMock = mock(ThreadLocalRequestContext.class);
+ OneOffRequestContext oneOffCtxMock = mock(OneOffRequestContext.class);
+ Configuration cfgMock = mock(Configuration.class);
+ Configuration.Index cfgIndex = mock(Configuration.Index.class);
+ when(cfgMock.index()).thenReturn(cfgIndex);
+
+ setUpIndexEventHandler(new CurrentRequestContext(threadLocalCtxMock, cfgMock, oneOffCtxMock));
+ indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
+ verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
+ }
+
+ @Test
+ public void shouldReindexInRemoteWhenContextIsMissingButForcedIndexingEnabled() throws Exception {
+ ThreadLocalRequestContext threadLocalCtxMock = mock(ThreadLocalRequestContext.class);
+ OneOffRequestContext oneOffCtxMock = mock(OneOffRequestContext.class);
+ Configuration cfgMock = mock(Configuration.class);
+ Configuration.Index cfgIndex = mock(Configuration.Index.class);
+ when(cfgMock.index()).thenReturn(cfgIndex);
+ when(cfgIndex.synchronizeForced()).thenReturn(true);
+
+ setUpIndexEventHandler(new CurrentRequestContext(threadLocalCtxMock, cfgMock, oneOffCtxMock));
+ indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
+ verify(forwarder).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
+ }
+
+ @Test
public void shouldIndexInRemoteOnAccountIndexedEvent() throws Exception {
indexEventHandler.onAccountIndexed(accountId.get());
verify(forwarder).indexAccount(eq(ACCOUNT_ID), any());
@@ -128,7 +179,7 @@
public void duplicateChangeEventOfAQueuedEventShouldGetDiscarded() {
ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
indexEventHandler =
- new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock);
+ new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock, currCtx);
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
verify(poolMock, times(1))
@@ -139,7 +190,7 @@
public void duplicateAccountEventOfAQueuedEventShouldGetDiscarded() {
ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
indexEventHandler =
- new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock);
+ new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock, currCtx);
indexEventHandler.onAccountIndexed(accountId.get());
indexEventHandler.onAccountIndexed(accountId.get());
verify(poolMock, times(1)).execute(indexEventHandler.new IndexAccountTask(ACCOUNT_ID));
@@ -149,7 +200,7 @@
public void duplicateGroupEventOfAQueuedEventShouldGetDiscarded() {
ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
indexEventHandler =
- new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock);
+ new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock, currCtx);
indexEventHandler.onGroupIndexed(accountGroupUUID.get());
indexEventHandler.onGroupIndexed(accountGroupUUID.get());
verify(poolMock, times(1)).execute(indexEventHandler.new IndexGroupTask(UUID));