Remove EventConsumerIT test
The integration-test is way too flaky and has also
been abandoned in the subsequent releases of the multi-site
plugin.
Change-Id: Ifa27baf76735fa3c11157efbdf1e1f31b01a0721
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
deleted file mode 100644
index 98c4679..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ /dev/null
@@ -1,293 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import static com.google.common.truth.Truth.assertThat;
-import static java.util.stream.Collectors.toSet;
-
-import com.google.gerrit.acceptance.AbstractDaemonTest;
-import com.google.gerrit.acceptance.GerritConfig;
-import com.google.gerrit.acceptance.NoHttpd;
-import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.extensions.api.changes.ReviewInput;
-import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.gerrit.server.config.SitePaths;
-import com.google.gerrit.server.data.PatchSetAttribute;
-import com.google.gerrit.server.events.CommentAddedEvent;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.PatchSetCreatedEvent;
-import com.google.gerrit.server.events.RefUpdatedEvent;
-import com.google.gerrit.server.query.change.ChangeData;
-import com.google.gson.Gson;
-import com.google.inject.Inject;
-import com.google.inject.Key;
-import com.google.inject.Scopes;
-import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.Module;
-import com.googlesource.gerrit.plugins.multisite.NoteDbStatus;
-import com.googlesource.gerrit.plugins.multisite.PluginModule;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
-import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.eclipse.jgit.lib.Config;
-import org.eclipse.jgit.lib.ObjectId;
-import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.revwalk.RevCommit;
-import org.eclipse.jgit.revwalk.RevWalk;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.util.FS;
-import org.junit.Test;
-import org.testcontainers.containers.KafkaContainer;
-
-@NoHttpd
-@UseLocalDisk
-public class EventConsumerIT extends AbstractDaemonTest {
- public static final String GERRIT_CONFIG_KEY = "gerrit.installModule";
- public static final String GERRIT_CONFIG_VALUE =
- "com.googlesource.gerrit.plugins.multisite.kafka.consumer.EventConsumerIT$KafkaTestContainerModule";
- private static final int QUEUE_POLL_TIMEOUT_MSECS = 10000;
-
- static {
- System.setProperty("gerrit.notedb", "ON");
- }
-
- public static class KafkaTestContainerModule extends LifecycleModule {
-
- public static class KafkaStopAtShutdown implements LifecycleListener {
- private final KafkaContainer kafka;
-
- public KafkaStopAtShutdown(KafkaContainer kafka) {
- this.kafka = kafka;
- }
-
- @Override
- public void stop() {
- kafka.stop();
- }
-
- @Override
- public void start() {
- // Do nothing
- }
- }
-
- public static class TestBrokerModule extends BrokerModule {
- @Override
- protected void configure() {
- DynamicItem.itemOf(binder(), BrokerApi.class);
- bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
-
- install(new SubscriberModule());
- }
- }
-
- private final FileBasedConfig config;
- private final Module multiSiteModule;
- private final PluginModule pluginModule;
-
- @Inject
- public KafkaTestContainerModule(SitePaths sitePaths, NoteDbStatus noteDb) throws IOException {
- this.config =
- new FileBasedConfig(
- sitePaths.etc_dir.resolve(Configuration.MULTI_SITE_CONFIG).toFile(), FS.DETECTED);
- config.setBoolean("kafka", "publisher", "enabled", true);
- config.setBoolean("kafka", "subscriber", "enabled", true);
- config.setBoolean("ref-database", null, "enabled", false);
- config.save();
-
- Configuration multiSiteConfig = new Configuration(config, new Config());
- this.multiSiteModule = new Module(multiSiteConfig, noteDb, new TestBrokerModule(), true);
- this.pluginModule =
- new PluginModule(
- multiSiteConfig,
- new ZkValidationModule(multiSiteConfig),
- new KafkaBrokerModule(new KafkaConfiguration(multiSiteConfig)));
- }
-
- @Override
- protected void configure() {
- try {
- final KafkaContainer kafka = startAndConfigureKafkaConnection();
-
- listener().toInstance(new KafkaStopAtShutdown(kafka));
-
- install(multiSiteModule);
- install(pluginModule);
-
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
-
- private KafkaContainer startAndConfigureKafkaConnection() throws IOException {
- KafkaContainer kafkaContainer = new KafkaContainer();
- kafkaContainer.start();
-
- config.setString("kafka", null, "bootstrapServers", kafkaContainer.getBootstrapServers());
- config.save();
-
- return kafkaContainer;
- }
- }
-
- @Test
- @GerritConfig(name = GERRIT_CONFIG_KEY, value = GERRIT_CONFIG_VALUE)
- public void createChangeShouldPropagateChangeIndexAndRefUpdateStreamEvent() throws Exception {
- LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
- drainQueue(droppedEventsQueue);
-
- ChangeData change = createChange().getChange();
- String project = change.project().get();
- int changeNum = change.getId().get();
- String changeNotesRef = change.notes().getRefName();
- int patchsetNum = change.currentPatchSet().getPatchSetId();
- String patchsetRevision = change.currentPatchSet().getRevision().get();
- String patchsetRef = change.currentPatchSet().getRefName();
-
- Map<String, List<Event>> eventsByType = receiveEventsByType(droppedEventsQueue);
- assertThat(eventsByType).isNotEmpty();
-
- assertThat(eventsByType.get("change-index"))
- .containsExactly(createChangeIndexEvent(project, changeNum, getParentCommit(change)));
-
- assertThat(
- eventsByType.get("ref-updated").stream()
- .map(e -> ((RefUpdatedEvent) e).getRefName())
- .collect(toSet()))
- .containsAllOf(changeNotesRef, patchsetRef); // 'refs/sequences/changes'
- // not always updated thus
- // not checked
-
- List<Event> patchSetCreatedEvents = eventsByType.get("patchset-created");
- assertThat(patchSetCreatedEvents).hasSize(1);
- assertPatchSetAttributes(
- (PatchSetCreatedEvent) patchSetCreatedEvents.get(0),
- patchsetNum,
- patchsetRevision,
- patchsetRef);
- }
-
- private void assertPatchSetAttributes(
- PatchSetCreatedEvent patchSetCreated,
- int patchsetNum,
- String patchsetRevision,
- String patchsetRef) {
- PatchSetAttribute patchSetAttribute = patchSetCreated.patchSet.get();
- assertThat(patchSetAttribute.number).isEqualTo(patchsetNum);
- assertThat(patchSetAttribute.revision).isEqualTo(patchsetRevision);
- assertThat(patchSetAttribute.ref).isEqualTo(patchsetRef);
- }
-
- @Test
- @GerritConfig(name = GERRIT_CONFIG_KEY, value = GERRIT_CONFIG_VALUE)
- public void reviewChangeShouldPropagateChangeIndexAndCommentAdded() throws Exception {
- LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
- ChangeData change = createChange().getChange();
- String project = change.project().get();
- int changeNum = change.getId().get();
- drainQueue(droppedEventsQueue);
-
- ReviewInput in = ReviewInput.recommend();
- in.message = "LGTM";
- gApi.changes().id(changeNum).revision("current").review(in);
-
- Map<String, List<Event>> eventsByType = receiveEventsByType(droppedEventsQueue);
-
- assertThat(eventsByType).isNotEmpty();
-
- assertThat(eventsByType.get("change-index"))
- .containsExactly(createChangeIndexEvent(project, changeNum, getParentCommit(change)));
-
- List<Event> commentAddedEvents = eventsByType.get("comment-added");
- assertThat(commentAddedEvents).hasSize(1);
- assertThat(((CommentAddedEvent) commentAddedEvents.get(0)).comment)
- .isEqualTo("Patch Set 1: Code-Review+1\n\n" + in.message);
- }
-
- private String getParentCommit(ChangeData change) throws Exception {
- RevCommit parent;
- try (Repository repo = repoManager.openRepository(change.project());
- RevWalk walk = new RevWalk(repo)) {
- RevCommit commit =
- walk.parseCommit(ObjectId.fromString(change.currentPatchSet().getRevision().get()));
- parent = commit.getParent(0);
- }
- return parent.getId().name();
- }
-
- private ChangeIndexEvent createChangeIndexEvent(
- String projectName, int changeId, String targetSha1) {
- ChangeIndexEvent event = new ChangeIndexEvent(projectName, changeId, false);
- event.targetSha = targetSha1;
- return event;
- }
-
- private LinkedBlockingQueue<SourceAwareEventWrapper> captureDroppedEvents() throws Exception {
- LinkedBlockingQueue<SourceAwareEventWrapper> droppedEvents = new LinkedBlockingQueue<>();
-
- TypeLiteral<DynamicSet<DroppedEventListener>> type =
- new TypeLiteral<DynamicSet<DroppedEventListener>>() {};
- server
- .getTestInjector()
- .getInstance(Key.get(type))
- .add(
- "multi-site",
- new DroppedEventListener() {
- @Override
- public void onEventDropped(SourceAwareEventWrapper event) {
- droppedEvents.offer(event);
- }
- });
- return droppedEvents;
- }
-
- private Map<String, List<Event>> receiveEventsByType(
- LinkedBlockingQueue<SourceAwareEventWrapper> queue) throws InterruptedException {
- return drainQueue(queue).stream()
- .sorted(Comparator.comparing(e -> e.type))
- .collect(Collectors.groupingBy(e -> e.type));
- }
-
- private List<Event> drainQueue(LinkedBlockingQueue<SourceAwareEventWrapper> queue)
- throws InterruptedException {
- Gson gson = server.getTestInjector().getInstance(Key.get(Gson.class, BrokerGson.class));
- SourceAwareEventWrapper event;
- List<Event> eventsList = new ArrayList<>();
- while ((event = queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS)) != null) {
- eventsList.add(event.getEventBody(gson));
- }
- return eventsList;
- }
-}