Merge branch 'stable-3.0' into stable-3.1
* stable-3.0:
Index account asynchronously and flush upon replication
Use separate topic for interactive indexing
Use file-based replication in local test environment
Change-Id: Idd63ec57609e39c338b943878681b3fb34d97071
diff --git a/setup_local_env/configs/gerrit.config b/setup_local_env/configs/gerrit.config
index 6092cc1..f05184e 100644
--- a/setup_local_env/configs/gerrit.config
+++ b/setup_local_env/configs/gerrit.config
@@ -41,7 +41,7 @@
[plugin "kafka-events"]
bootstrapServers = localhost:$KAFKA_PORT
groupId = $KAFKA_GROUP_ID
- numberOfSubscribers = 5
+ numberOfSubscribers = 6
securityProtocol = PLAINTEXT
pollingIntervalMs = 1000
enableAutoCommit = true
diff --git a/setup_local_env/configs/multi-site.config b/setup_local_env/configs/multi-site.config
index 442c37e..9571513 100644
--- a/setup_local_env/configs/multi-site.config
+++ b/setup_local_env/configs/multi-site.config
@@ -4,6 +4,8 @@
[broker]
indexEventTopic = gerrit_index
+ batchIndexEventTopic = gerrit_batch_index
streamEventTopic = gerrit_stream
projectListEventTopic = gerrit_list_project
cacheEventTopic = gerrit_cache_eviction
+
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
new file mode 100644
index 0000000..5bbaee0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
@@ -0,0 +1,44 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import java.util.UUID;
+
+@Singleton
+public class BatchIndexEventSubscriber extends AbstractSubcriber {
+ @Inject
+ public BatchIndexEventSubscriber(
+ IndexEventRouter eventRouter,
+ DynamicSet<DroppedEventListener> droppedEventListeners,
+ @InstanceId UUID instanceId,
+ MessageLogger msgLog,
+ SubscriberMetrics subscriberMetrics,
+ Configuration cfg) {
+ super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+ }
+
+ @Override
+ protected EventTopic getTopic() {
+ return EventTopic.BATCH_INDEX_TOPIC;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
index afd4d09..09adb18 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
@@ -28,6 +28,7 @@
DynamicSet.setOf(binder(), DroppedEventListener.class);
DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(BatchIndexEventSubscriber.class);
DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java
index c00c571..0934fd5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java
@@ -20,7 +20,11 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
/**
* Index an account using {@link AccountIndexer}. This class is meant to be used on the receiving
@@ -31,12 +35,15 @@
@Singleton
public class ForwardedIndexAccountHandler
extends ForwardedIndexingHandler<Account.Id, AccountIndexEvent> {
+
private final AccountIndexer indexer;
+ private Map<Account.Id, Operation> accountsToIndex;
@Inject
ForwardedIndexAccountHandler(AccountIndexer indexer, Configuration config) {
super(config.index().numStripedLocks());
this.indexer = indexer;
+ this.accountsToIndex = new HashMap<>();
}
@Override
@@ -49,4 +56,25 @@
protected void doDelete(Account.Id id, Optional<AccountIndexEvent> event) {
throw new UnsupportedOperationException("Delete from account index not supported");
}
+
+ public synchronized void indexAsync(Account.Id id, Operation operation) {
+ accountsToIndex.put(id, operation);
+ }
+
+ public synchronized void doAsyncIndex() {
+ accountsToIndex =
+ accountsToIndex.entrySet().stream()
+ .filter(e -> !checkedIndex(e))
+ .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+ }
+
+ private boolean checkedIndex(Map.Entry<Account.Id, Operation> account) {
+ try {
+ index(account.getKey(), account.getValue(), Optional.empty());
+ return true;
+ } catch (IOException e) {
+ log.error("Account {} index failed", account.getKey(), e);
+ return false;
+ }
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
index 701c2fe..5c1f444 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
@@ -19,10 +19,18 @@
public interface IndexEventForwarder {
/**
- * Publish an indexing event to the broker.
+ * Publish an indexing event to the broker using interactive topic.
*
* @param event the details of the index event.
* @return true if successful, otherwise false.
*/
boolean index(IndexEvent event);
+
+ /**
+ * Publish an indexing event to the broker using batch topic.
+ *
+ * @param event the details of the index event.
+ * @return true if successful, otherwise false.
+ */
+ boolean batchIndex(IndexEvent event);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
index 0b4252d..a86c62e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
@@ -35,4 +35,9 @@
public boolean index(IndexEvent event) {
return broker.send(EventTopic.INDEX_TOPIC.topic(cfg), event);
}
+
+ @Override
+ public boolean batchIndex(IndexEvent event) {
+ return broker.send(EventTopic.BATCH_INDEX_TOPIC.topic(cfg), event);
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index eaa2df9..4e7a781 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -18,6 +18,7 @@
public enum EventTopic {
INDEX_TOPIC("GERRIT.EVENT.INDEX", "indexEvent"),
+ BATCH_INDEX_TOPIC("GERRIT.EVENT.BATCH.INDEX", "batchIndexEvent"),
CACHE_TOPIC("GERRIT.EVENT.CACHE", "cacheEvent"),
PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST", "projectListEvent"),
STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM", "streamEvent");
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
index dcaaea2..9f659ec 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
@@ -18,6 +18,7 @@
import static com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler.Operation.INDEX;
import com.google.gerrit.entities.Account;
+import com.google.gerrit.server.config.AllUsersName;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexAccountHandler;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexChangeHandler;
@@ -29,6 +30,7 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
import java.io.IOException;
import java.util.Optional;
@@ -37,17 +39,20 @@
private final ForwardedIndexChangeHandler indexChangeHandler;
private final ForwardedIndexGroupHandler indexGroupHandler;
private final ForwardedIndexProjectHandler indexProjectHandler;
+ private final AllUsersName allUsersName;
@Inject
public IndexEventRouter(
ForwardedIndexAccountHandler indexAccountHandler,
ForwardedIndexChangeHandler indexChangeHandler,
ForwardedIndexGroupHandler indexGroupHandler,
- ForwardedIndexProjectHandler indexProjectHandler) {
+ ForwardedIndexProjectHandler indexProjectHandler,
+ AllUsersName allUsersName) {
this.indexAccountHandler = indexAccountHandler;
this.indexChangeHandler = indexChangeHandler;
this.indexGroupHandler = indexGroupHandler;
this.indexProjectHandler = indexProjectHandler;
+ this.allUsersName = allUsersName;
}
@Override
@@ -61,8 +66,7 @@
Optional.of(changeIndexEvent));
} else if (sourceEvent instanceof AccountIndexEvent) {
AccountIndexEvent accountIndexEvent = (AccountIndexEvent) sourceEvent;
- indexAccountHandler.index(
- Account.id(accountIndexEvent.accountId), INDEX, Optional.of(accountIndexEvent));
+ indexAccountHandler.indexAsync(Account.id(accountIndexEvent.accountId), INDEX);
} else if (sourceEvent instanceof GroupIndexEvent) {
GroupIndexEvent groupIndexEvent = (GroupIndexEvent) sourceEvent;
indexGroupHandler.index(groupIndexEvent.groupUUID, INDEX, Optional.of(groupIndexEvent));
@@ -75,4 +79,15 @@
String.format("Cannot route event %s", sourceEvent.getType()));
}
}
+
+ public void onRefReplicated(RefReplicationDoneEvent replicationEvent) throws IOException {
+ if (replicationEvent.getProjectNameKey().equals(allUsersName)) {
+ Account.Id accountId = Account.Id.fromRef(replicationEvent.getRefName());
+ if (accountId != null) {
+ indexAccountHandler.index(accountId, INDEX, Optional.empty());
+ } else {
+ indexAccountHandler.doAsyncIndex();
+ }
+ }
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
index d911c00..4ef3426 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
@@ -18,17 +18,32 @@
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
+import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
+import java.io.IOException;
public class StreamEventRouter implements ForwardedEventRouter<Event> {
private final ForwardedEventHandler streamEventHandler;
+ private final IndexEventRouter indexEventRouter;
@Inject
- public StreamEventRouter(ForwardedEventHandler streamEventHandler) {
+ public StreamEventRouter(
+ ForwardedEventHandler streamEventHandler, IndexEventRouter indexEventRouter) {
this.streamEventHandler = streamEventHandler;
+ this.indexEventRouter = indexEventRouter;
}
@Override
- public void route(Event sourceEvent) throws PermissionBackendException {
+ public void route(Event sourceEvent) throws PermissionBackendException, IOException {
+ if (RefReplicationDoneEvent.TYPE.equals(sourceEvent.getType())) {
+ /* TODO: We currently explicitly ignore the status and result of the replication
+ * event because there isn't a reliable way to understand if the current node was
+ * the replication target and was successful or not.
+ *
+ * It is better to risk to reindex once more rather than missing a reindexing event.
+ */
+ indexEventRouter.onRefReplicated((RefReplicationDoneEvent) sourceEvent);
+ }
+
streamEventHandler.dispatch(sourceEvent);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index ee3ddbc..ed33efa 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -98,10 +98,17 @@
private void executeIndexChangeTask(String projectName, int id) {
if (!Context.isForwardedEvent()) {
ChangeChecker checker = changeChecker.create(projectName + "~" + id);
+
try {
checker
.newIndexEvent(projectName, id, false)
- .map(event -> new IndexChangeTask(event))
+ .map(
+ event -> {
+ if (Thread.currentThread().getName().contains("Batch")) {
+ return new BatchIndexChangeTask(event);
+ }
+ return new IndexChangeTask(event);
+ })
.ifPresent(
task -> {
if (queuedTasks.add(task)) {
@@ -164,6 +171,37 @@
}
}
+ class BatchIndexChangeTask extends IndexTask {
+ private final ChangeIndexEvent changeIndexEvent;
+
+ BatchIndexChangeTask(ChangeIndexEvent changeIndexEvent) {
+ this.changeIndexEvent = changeIndexEvent;
+ }
+
+ @Override
+ public void execute() {
+ forwarders.forEach(f -> f.batchIndex(changeIndexEvent));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ IndexChangeTask that = (IndexChangeTask) o;
+ return Objects.equal(changeIndexEvent, that.changeIndexEvent);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(changeIndexEvent);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Index change %s in target instance", changeIndexEvent.changeId);
+ }
+ }
+
class IndexAccountTask extends IndexTask {
private final AccountIndexEvent accountIndexEvent;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
index 923abb2..31ca13d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
@@ -19,6 +19,8 @@
import static org.mockito.Mockito.verifyZeroInteractions;
import com.google.gerrit.entities.Account;
+import com.google.gerrit.server.config.AllUsersName;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexAccountHandler;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexChangeHandler;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexGroupHandler;
@@ -30,6 +32,8 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
@@ -45,12 +49,18 @@
@Mock private ForwardedIndexChangeHandler indexChangeHandler;
@Mock private ForwardedIndexGroupHandler indexGroupHandler;
@Mock private ForwardedIndexProjectHandler indexProjectHandler;
+ @Mock private ForwardedEventHandler forwardedEventHandler;
+ private AllUsersName allUsersName = new AllUsersName("All-Users");
@Before
public void setUp() {
router =
new IndexEventRouter(
- indexAccountHandler, indexChangeHandler, indexGroupHandler, indexProjectHandler);
+ indexAccountHandler,
+ indexChangeHandler,
+ indexGroupHandler,
+ indexProjectHandler,
+ allUsersName);
}
@Test
@@ -59,15 +69,30 @@
router.route(event);
verify(indexAccountHandler)
- .index(
- Account.id(event.accountId),
- ForwardedIndexingHandler.Operation.INDEX,
- Optional.of(event));
+ .indexAsync(Account.id(event.accountId), ForwardedIndexingHandler.Operation.INDEX);
verifyZeroInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
}
@Test
+ public void streamEventRouterShouldTriggerAccountIndexFlush() throws Exception {
+
+ StreamEventRouter streamEventRouter = new StreamEventRouter(forwardedEventHandler, router);
+
+ final AccountIndexEvent event = new AccountIndexEvent(1);
+ router.route(event);
+
+ verify(indexAccountHandler)
+ .indexAsync(Account.id(event.accountId), ForwardedIndexingHandler.Operation.INDEX);
+
+ verifyZeroInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
+
+ streamEventRouter.route(new RefReplicationDoneEvent(allUsersName.get(), "refs/any", 1));
+
+ verify(indexAccountHandler).doAsyncIndex();
+ }
+
+ @Test
public void routerShouldSendEventsToTheAppropriateHandler_GroupIndex() throws Exception {
final String groupId = "12";
final GroupIndexEvent event = new GroupIndexEvent(groupId);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
index 605eb45..3b4bec5 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
@@ -22,6 +22,7 @@
import com.google.gerrit.server.events.CommentAddedEvent;
import com.google.gerrit.server.util.time.TimeUtil;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
import org.junit.Before;
import org.junit.Test;
@@ -34,10 +35,11 @@
private StreamEventRouter router;
@Mock private ForwardedEventHandler streamEventHandler;
+ @Mock private IndexEventRouter indexEventRouter;
@Before
public void setUp() {
- router = new StreamEventRouter(streamEventHandler);
+ router = new StreamEventRouter(streamEventHandler, indexEventRouter);
}
@Test