Add junit test for kafka events consumer

Use LightweightPluginDaemonTest class infrastructure to boot the
gerrit server and deploy the plugin. This test assumes that the full
version of Kafka server is up and running prior to test execution.

An alternative approach (not done in this change) is to set up
testcontainers library with provided KafkaContainer: [1] and start
dockerized Kafka instance from within the JUnit test's setUp method.

Test Plan:

1. Run Kafka Server:
  $ docker run -p 2181:2181 -p 9092:9092 -d -e ADVERTISED_HOST=127.0.1 -e ADVERTISED_PORT=9092 spotify/kafka
2. Run the test:
  $ bazel test plugins/kafka-events:kafka_events_tests

[1] https://github.com/testcontainers/testcontainers-java/blob/master\
/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java

Change-Id: Ia61868acc50d47b742126208fd078076dc2c2a94
diff --git a/BUILD b/BUILD
index c5ddedd..dca6dc1 100644
--- a/BUILD
+++ b/BUILD
@@ -1,4 +1,10 @@
-load("//tools/bzl:plugin.bzl", "gerrit_plugin")
+load("//tools/bzl:junit.bzl", "junit_tests")
+load(
+    "//tools/bzl:plugin.bzl",
+    "PLUGIN_DEPS",
+    "PLUGIN_TEST_DEPS",
+    "gerrit_plugin",
+)
 
 gerrit_plugin(
     name = "kafka-events",
@@ -14,3 +20,22 @@
         "@kafka_client//jar",
     ],
 )
+
+junit_tests(
+    name = "kafka_events_tests",
+    srcs = glob(["src/test/java/**/*.java"]),
+    tags = ["kafka-events"],
+    deps = [
+        ":kafka-events__plugin_test_deps",
+        "@kafka_client//jar",
+    ],
+)
+
+java_library(
+    name = "kafka-events__plugin_test_deps",
+    testonly = 1,
+    visibility = ["//visibility:public"],
+    exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
+        ":kafka-events__plugin",
+    ],
+)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index ad79bca..f5782b7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -47,6 +47,7 @@
     put("buffer.memory", 33554432);
     put("key.serializer", KAFKA_STRING_SERIALIZER);
     put("value.serializer", KAFKA_STRING_SERIALIZER);
+    put("reconnect.backoff.ms", 5000L);
   }
 
   private void applyConfig(PluginConfig config) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
new file mode 100644
index 0000000..997428e
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -0,0 +1,112 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.base.Supplier;
+import com.google.gerrit.acceptance.GerritConfig;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.NoHttpd;
+import com.google.gerrit.acceptance.PushOneCommit;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.extensions.api.changes.ReviewInput;
+import com.google.gerrit.extensions.common.ChangeMessageInfo;
+import com.google.gerrit.server.events.CommentAddedEvent;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventDeserializer;
+import com.google.gerrit.server.events.SupplierDeserializer;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Test;
+
+/*
+ * This tests assumes that Kafka server is running on: localhost:9092.
+ * Alternatively, testcontainers library can be used to set up dockerized
+ * Kafka instance from withing JUnit test.
+ */
+@NoHttpd
+@TestPlugin(name = "kafka-events", sysModule = "com.googlesource.gerrit.plugins.kafka.Module")
+public class EventConsumerIT extends LightweightPluginDaemonTest {
+  static final long KAFKA_POLL_TIMEOUT = 10000L;
+
+  @Test
+  @UseLocalDisk
+  @GerritConfig(name = "plugin.kafka-events.bootstrapServers", value = "localhost:9092")
+  @GerritConfig(name = "plugin.kafka-events.groupId", value = "test-consumer-group")
+  @GerritConfig(
+      name = "plugin.kafka-events.keyDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(
+      name = "plugin.kafka-events.valueDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  public void consumeEvents() throws Exception {
+    PushOneCommit.Result r = createChange();
+
+    ReviewInput in = ReviewInput.recommend();
+    in.message = "LGTM";
+    gApi.changes().id(r.getChangeId()).revision("current").review(in);
+    List<ChangeMessageInfo> messages =
+        new ArrayList<>(gApi.changes().id(r.getChangeId()).get().messages);
+    assertThat(messages).hasSize(2);
+    String expectedMessage = "Patch Set 1: Code-Review+1\n\nLGTM";
+    assertThat(messages.get(1).message).isEqualTo(expectedMessage);
+
+    KafkaProperties kafkaProperties = kafkaProperties();
+    List<String> events = new ArrayList<>();
+    try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties)) {
+      consumer.subscribe(Collections.singleton(kafkaProperties.getTopic()));
+      ConsumerRecords<String, String> records = consumer.poll(KAFKA_POLL_TIMEOUT);
+      for (ConsumerRecord<String, String> record : records) {
+        events.add(record.value());
+      }
+    }
+
+    // The received 6 events in order:
+    //
+    // 1. refUpdate:        ref: refs/sequences/changes
+    // 2. refUpdate:        ref: refs/changes/01/1/1
+    // 3. refUpdate:        ref: refs/changes/01/1/meta
+    // 4. patchset-created: ref: refs/changes/01/1/1
+    // 5. refUpdate:        ref: refs/changes/01/1/meta"
+    // 6. comment-added:    ref: refs/heads/master
+    assertThat(events).hasSize(6);
+    String commentAddedEventJson = events.get(5);
+
+    Gson gson =
+        new GsonBuilder()
+            .registerTypeAdapter(Event.class, new EventDeserializer())
+            .registerTypeAdapter(Supplier.class, new SupplierDeserializer())
+            .create();
+    Event event = gson.fromJson(commentAddedEventJson, Event.class);
+    assertThat(event).isInstanceOf(CommentAddedEvent.class);
+
+    CommentAddedEvent commentAddedEvent = (CommentAddedEvent) event;
+    assertThat(commentAddedEvent.comment).isEqualTo(expectedMessage);
+  }
+
+  private KafkaProperties kafkaProperties() {
+    return plugin.getSysInjector().getInstance(KafkaProperties.class);
+  }
+}