| // Copyright (C) 2018 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.googlesource.gerrit.plugins.kafka; |
| |
| import static com.google.common.truth.Truth.assertThat; |
| |
| import com.google.common.base.Supplier; |
| import com.google.gerrit.acceptance.GerritConfig; |
| import com.google.gerrit.acceptance.LightweightPluginDaemonTest; |
| import com.google.gerrit.acceptance.NoHttpd; |
| import com.google.gerrit.acceptance.PushOneCommit; |
| import com.google.gerrit.acceptance.TestPlugin; |
| import com.google.gerrit.acceptance.UseLocalDisk; |
| import com.google.gerrit.extensions.api.changes.ReviewInput; |
| import com.google.gerrit.extensions.common.ChangeMessageInfo; |
| import com.google.gerrit.server.events.CommentAddedEvent; |
| import com.google.gerrit.server.events.Event; |
| import com.google.gerrit.server.events.EventDeserializer; |
| import com.google.gerrit.server.events.SupplierDeserializer; |
| import com.google.gson.Gson; |
| import com.google.gson.GsonBuilder; |
| import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import org.apache.kafka.clients.consumer.Consumer; |
| import org.apache.kafka.clients.consumer.ConsumerRecord; |
| import org.apache.kafka.clients.consumer.ConsumerRecords; |
| import org.apache.kafka.clients.consumer.KafkaConsumer; |
| import org.junit.Test; |
| |
| /* |
| * This tests assumes that Kafka server is running on: localhost:9092. |
| * Alternatively, testcontainers library can be used to set up dockerized |
| * Kafka instance from withing JUnit test. |
| */ |
| @NoHttpd |
| @TestPlugin(name = "kafka-events", sysModule = "com.googlesource.gerrit.plugins.kafka.Module") |
| public class EventConsumerIT extends LightweightPluginDaemonTest { |
| static final long KAFKA_POLL_TIMEOUT = 10000L; |
| |
| @Test |
| @UseLocalDisk |
| @GerritConfig(name = "plugin.kafka-events.bootstrapServers", value = "localhost:9092") |
| @GerritConfig(name = "plugin.kafka-events.groupId", value = "test-consumer-group") |
| @GerritConfig( |
| name = "plugin.kafka-events.keyDeserializer", |
| value = "org.apache.kafka.common.serialization.StringDeserializer") |
| @GerritConfig( |
| name = "plugin.kafka-events.valueDeserializer", |
| value = "org.apache.kafka.common.serialization.StringDeserializer") |
| public void consumeEvents() throws Exception { |
| PushOneCommit.Result r = createChange(); |
| |
| ReviewInput in = ReviewInput.recommend(); |
| in.message = "LGTM"; |
| gApi.changes().id(r.getChangeId()).revision("current").review(in); |
| List<ChangeMessageInfo> messages = |
| new ArrayList<>(gApi.changes().id(r.getChangeId()).get().messages); |
| assertThat(messages).hasSize(2); |
| String expectedMessage = "Patch Set 1: Code-Review+1\n\nLGTM"; |
| assertThat(messages.get(1).message).isEqualTo(expectedMessage); |
| |
| KafkaProperties kafkaProperties = kafkaProperties(); |
| List<String> events = new ArrayList<>(); |
| try (Consumer<String, String> consumer = new KafkaConsumer<>(kafkaProperties)) { |
| consumer.subscribe(Collections.singleton(kafkaProperties.getTopic())); |
| ConsumerRecords<String, String> records = consumer.poll(KAFKA_POLL_TIMEOUT); |
| for (ConsumerRecord<String, String> record : records) { |
| events.add(record.value()); |
| } |
| } |
| |
| // The received 6 events in order: |
| // |
| // 1. refUpdate: ref: refs/sequences/changes |
| // 2. refUpdate: ref: refs/changes/01/1/1 |
| // 3. refUpdate: ref: refs/changes/01/1/meta |
| // 4. patchset-created: ref: refs/changes/01/1/1 |
| // 5. refUpdate: ref: refs/changes/01/1/meta" |
| // 6. comment-added: ref: refs/heads/master |
| assertThat(events).hasSize(6); |
| String commentAddedEventJson = events.get(5); |
| |
| Gson gson = |
| new GsonBuilder() |
| .registerTypeAdapter(Event.class, new EventDeserializer()) |
| .registerTypeAdapter(Supplier.class, new SupplierDeserializer()) |
| .create(); |
| Event event = gson.fromJson(commentAddedEventJson, Event.class); |
| assertThat(event).isInstanceOf(CommentAddedEvent.class); |
| |
| CommentAddedEvent commentAddedEvent = (CommentAddedEvent) event; |
| assertThat(commentAddedEvent.comment).isEqualTo(expectedMessage); |
| } |
| |
| private KafkaProperties kafkaProperties() { |
| return plugin.getSysInjector().getInstance(KafkaProperties.class); |
| } |
| } |