E2E integration test using Kafka docker container
The idea for the test is very simple:
* Set up junit test based on LightweightPluginDaemonTest
* Interfere the plugin boot process and start kafka docker container
using testcontainers library integration
* Boot the plugin with mostly default configuration, but overwrite the
kafka host and port that was exposed by kafka docker instance
* Inject drop event listener dynamic set in the test instance and
add dummy listener instance to interfer with dropped events
* Perform some actions and analyze the dropped events: it must be
exactly all events we expect to publish and receive for the operation
executed during the test method
Furture work: we should exercise all operations, so that all supported
events: index, cache eviction, stream events and project list update
events are fired and consumed.
It should be noted, that there is currently a difference if ReviewDb or
NoteDb backend is used during the test execution: the different kind of
events are fired.
When NoteDb mode is used more ref updated events are triggered.
Test Plan:
1. Run tests in ReviewDb mode:
$ bazel test plugins/multi-site:multi_site_tests
2. Run tests in NoteDb mode:
$ bazel test --test_env=GERRIT_NOTEDB=READ_WRITE \
plugins/multi-site:multi_site_tests
Requires Gerrit change https://gerrit-review.googlesource.com/c/gerrit/+/215452
Change-Id: I8946104d8ff3c1a7c40dc811543a94f3132c3bc3
diff --git a/BUILD b/BUILD
index 08d9d5c..bb8d1f2 100644
--- a/BUILD
+++ b/BUILD
@@ -45,5 +45,6 @@
"@wiremock//jar",
"@kafka_client//jar",
"@testcontainers-kafka//jar",
+ "//lib/testcontainers",
],
)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 1cb7aef..acfaae2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.multisite;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
@@ -64,7 +65,11 @@
@Inject
Configuration(PluginConfigFactory pluginConfigFactory, @PluginName String pluginName) {
- Config cfg = pluginConfigFactory.getGlobalPluginConfig(pluginName);
+ this(pluginConfigFactory.getGlobalPluginConfig(pluginName));
+ }
+
+ @VisibleForTesting
+ public Configuration(Config cfg) {
kafka = new Kafka(cfg);
publisher = new KafkaPublisher(cfg);
subscriber = new KafkaSubscriber(cfg);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 0b4efaa..0a0fe98 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -38,12 +38,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class Module extends AbstractModule {
+public class Module extends AbstractModule {
private static final Logger log = LoggerFactory.getLogger(Module.class);
private final Configuration config;
@Inject
- Module(Configuration config) {
+ public Module(Configuration config) {
this.config = config;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
index 3481ac9..8a3b6dc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
@@ -14,10 +14,9 @@
package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
-import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gson.Gson;
-import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
@@ -30,7 +29,7 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-public class BrokerForwarderModule extends AbstractModule {
+public class BrokerForwarderModule extends LifecycleModule {
private final KafkaPublisher kafkaPublisher;
public BrokerForwarderModule(KafkaPublisher kafkaPublisher) {
@@ -40,7 +39,7 @@
@Override
protected void configure() {
bind(Gson.class).toProvider(GsonProvider.class).in(Singleton.class);
- DynamicSet.bind(binder(), LifecycleListener.class).to(BrokerPublisher.class);
+ listener().to(BrokerPublisher.class);
bind(BrokerSession.class).to(KafkaSession.class);
if (kafkaPublisher.enabledEvent(EventFamily.INDEX_EVENT)) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
new file mode 100644
index 0000000..98953bf
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -0,0 +1,154 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.LogThreshold;
+import com.google.gerrit.acceptance.NoHttpd;
+import com.google.gerrit.acceptance.PushOneCommit;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.extensions.api.changes.ReviewInput;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.Module;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jgit.lib.Config;
+import org.junit.Test;
+import org.testcontainers.containers.KafkaContainer;
+
+@NoHttpd
+@LogThreshold(level = "INFO")
+@TestPlugin(
+ name = "multi-site",
+ sysModule =
+ "com.googlesource.gerrit.plugins.multisite.kafka.consumer.EventConsumerIT$KafkaTestContainerModule")
+public class EventConsumerIT extends LightweightPluginDaemonTest {
+ private static final int QUEUE_POLL_TIMEOUT_MSECS = 2000;
+
+ public static class KafkaTestContainerModule extends LifecycleModule {
+
+ public class KafkaStopAtShutdown implements LifecycleListener {
+ private final KafkaContainer kafka;
+
+ public KafkaStopAtShutdown(KafkaContainer kafka) {
+ this.kafka = kafka;
+ }
+
+ @Override
+ public void stop() {
+ kafka.stop();
+ }
+
+ @Override
+ public void start() {
+ // Do nothing
+ }
+ }
+
+ @Override
+ protected void configure() {
+ final KafkaContainer kafka = new KafkaContainer();
+ kafka.start();
+
+ Config config = new Config();
+ config.setString("kafka", null, "bootstrapServers", kafka.getBootstrapServers());
+ config.setBoolean("kafka", "publisher", "enabled", true);
+ config.setBoolean("kafka", "subscriber", "enabled", true);
+ Configuration multiSiteConfig = new Configuration(config);
+ bind(Configuration.class).toInstance(multiSiteConfig);
+ install(new Module(multiSiteConfig));
+
+ listener().toInstance(new KafkaStopAtShutdown(kafka));
+ }
+ }
+
+ @Test
+ public void createChangeShouldPropagateChangeIndexAndRefUpdateStreamEvent() throws Exception {
+ LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
+ drainQueue(droppedEventsQueue);
+
+ createChange();
+ List<String> createdChangeEvents = receiveFromQueue(droppedEventsQueue, 2);
+ assertThat(createdChangeEvents).hasSize(2);
+
+ assertThat(createdChangeEvents).contains("change-index");
+ assertThat(createdChangeEvents).contains("ref-updated");
+ }
+
+ @Test
+ public void reviewChangeShouldPropagateChangeIndexAndCommentAdded() throws Exception {
+ LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
+ PushOneCommit.Result r = createChange();
+ drainQueue(droppedEventsQueue);
+
+ ReviewInput in = ReviewInput.recommend();
+ in.message = "LGTM";
+ gApi.changes().id(r.getChangeId()).revision("current").review(in);
+
+ List<String> createdChangeEvents = receiveFromQueue(droppedEventsQueue, 2);
+
+ assertThat(createdChangeEvents).hasSize(2);
+ assertThat(createdChangeEvents).contains("change-index");
+ assertThat(createdChangeEvents).contains("comment-added");
+ }
+
+ private LinkedBlockingQueue<SourceAwareEventWrapper> captureDroppedEvents() throws Exception {
+ LinkedBlockingQueue<SourceAwareEventWrapper> droppedEvents = new LinkedBlockingQueue<>();
+
+ TypeLiteral<DynamicSet<DroppedEventListener>> type =
+ new TypeLiteral<DynamicSet<DroppedEventListener>>() {};
+ plugin
+ .getSysInjector()
+ .getInstance(Key.get(type))
+ .add(
+ "multi-site",
+ new DroppedEventListener() {
+ @Override
+ public void onEventDropped(SourceAwareEventWrapper event) {
+ droppedEvents.offer(event);
+ }
+ });
+ return droppedEvents;
+ }
+
+ private List<String> receiveFromQueue(
+ LinkedBlockingQueue<SourceAwareEventWrapper> queue, int numEvents)
+ throws InterruptedException {
+ List<String> eventsList = new ArrayList<>();
+ for (int i = 0; i < numEvents; i++) {
+ SourceAwareEventWrapper event = queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS);
+ if (event != null) {
+ eventsList.add(event.getHeader().getEventType());
+ }
+ }
+ return eventsList;
+ }
+
+ private void drainQueue(LinkedBlockingQueue<SourceAwareEventWrapper> queue)
+ throws InterruptedException {
+ while (queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS) != null) {
+ // Just consume the event
+ }
+ }
+}