Start dockerized kafka server from junit test using testcontainers

Use testcontainers to spawn dockerized Kafka server from within junit
test.

One challenge to solve is to bootstrap initialization properties from the
dockerized kafka server.  There is no way to inject custom values after
the docker instance was started by testcontainers library. To overcome,
set special system property in test environment and check for that
property in production code. If present, inject special properties in
Kafka server's test environment.

Test Plan:

1. Start docker daemon:

  $ sudo service docker start

2. Run tests:

  $ bazel test plugins/kafka-events:kafka_events_tests

Change-Id: Iaca0901410460bbbfa731b4da1625a9600402e08
diff --git a/BUILD b/BUILD
index dca6dc1..db2462f 100644
--- a/BUILD
+++ b/BUILD
@@ -27,7 +27,9 @@
     tags = ["kafka-events"],
     deps = [
         ":kafka-events__plugin_test_deps",
+        "//lib/testcontainers",
         "@kafka_client//jar",
+        "@testcontainers-kafka//jar",
     ],
 )
 
@@ -37,5 +39,7 @@
     visibility = ["//visibility:public"],
     exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
         ":kafka-events__plugin",
+        "@testcontainers-kafka//jar",
+        "//lib/testcontainers",
     ],
 )
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 33df632..6a51cf5 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -6,3 +6,9 @@
         artifact = "org.apache.kafka:kafka-clients:0.10.0.1",
         sha1 = "36ebf4044d0e546bf74c95629d736ca63320a323",
     )
+
+    maven_jar(
+        name = "testcontainers-kafka",
+        artifact = "org.testcontainers:kafka:1.8.0",
+        sha1 = "1d6f0a529dd87bc66bc68a09b8b58ed17ebf33f6",
+    )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index f5782b7..c01e34e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -15,11 +15,15 @@
 package com.googlesource.gerrit.plugins.kafka.config;
 
 import com.google.common.base.CaseFormat;
+import com.google.common.base.Strings;
 import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.server.config.PluginConfig;
 import com.google.gerrit.server.config.PluginConfigFactory;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import java.util.UUID;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 @Singleton
@@ -37,6 +41,7 @@
     PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
     topic = fromGerritConfig.getString("topic", "gerrit");
     applyConfig(fromGerritConfig);
+    initDockerizedKafkaServer();
   }
 
   private void setDefaults() {
@@ -59,6 +64,18 @@
     }
   }
 
+  /** Bootstrap initialization of dockerized Kafka server environment */
+  private void initDockerizedKafkaServer() {
+    String testBootstrapServer = System.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
+    if (!Strings.isNullOrEmpty(testBootstrapServer)) {
+      this.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, testBootstrapServer);
+      this.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+      this.put(ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID());
+      this.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000");
+      this.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    }
+  }
+
   public String getTopic() {
     return topic;
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 997428e..9007e91 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -39,21 +39,41 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.Test;
+import org.testcontainers.containers.KafkaContainer;
 
-/*
- * This tests assumes that Kafka server is running on: localhost:9092.
- * Alternatively, testcontainers library can be used to set up dockerized
- * Kafka instance from withing JUnit test.
- */
 @NoHttpd
 @TestPlugin(name = "kafka-events", sysModule = "com.googlesource.gerrit.plugins.kafka.Module")
 public class EventConsumerIT extends LightweightPluginDaemonTest {
   static final long KAFKA_POLL_TIMEOUT = 10000L;
 
+  private KafkaContainer kafka;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    try {
+      kafka = new KafkaContainer();
+      kafka.start();
+
+      System.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+    } catch (IllegalStateException e) {
+      fail("Cannot start container. Is docker daemon running?");
+    }
+
+    super.setUpTestPlugin();
+  }
+
+  @Override
+  public void tearDownTestPlugin() {
+    super.tearDownTestPlugin();
+    if (kafka != null) {
+      kafka.stop();
+    }
+  }
+
   @Test
   @UseLocalDisk
-  @GerritConfig(name = "plugin.kafka-events.bootstrapServers", value = "localhost:9092")
   @GerritConfig(name = "plugin.kafka-events.groupId", value = "test-consumer-group")
   @GerritConfig(
       name = "plugin.kafka-events.keyDeserializer",
@@ -73,8 +93,8 @@
     String expectedMessage = "Patch Set 1: Code-Review+1\n\nLGTM";
     assertThat(messages.get(1).message).isEqualTo(expectedMessage);
 
-    KafkaProperties kafkaProperties = kafkaProperties();
     List<String> events = new ArrayList<>();
+    KafkaProperties kafkaProperties = kafkaProperties();
     try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties)) {
       consumer.subscribe(Collections.singleton(kafkaProperties.getTopic()));
       ConsumerRecords<String, String> records = consumer.poll(KAFKA_POLL_TIMEOUT);