Move kafka related code from multi-site plugin to kafka-events plugin
Kafka broker implementation is provided as a plugin.
Multi-site plugin is message broker implementation agnostic.
This separation allows replacing message broker implementation.
Feature: Issue 11599
Change-Id: Id35f0a8fe6e1d43cd893e7b9521c2c67f0160ed9
diff --git a/BUILD b/BUILD
index db2462f..6b11272 100644
--- a/BUILD
+++ b/BUILD
@@ -17,7 +17,8 @@
],
resources = glob(["src/main/resources/**/*"]),
deps = [
- "@kafka_client//jar",
+ "@kafka-client//jar",
+ "@events-broker//jar",
],
)
@@ -28,7 +29,8 @@
deps = [
":kafka-events__plugin_test_deps",
"//lib/testcontainers",
- "@kafka_client//jar",
+ "@kafka-client//jar",
+ "@events-broker//jar",
"@testcontainers-kafka//jar",
],
)
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index bebcc4e..bfbf487 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -2,7 +2,7 @@
def external_plugin_deps():
maven_jar(
- name = "kafka_client",
+ name = "kafka-client",
artifact = "org.apache.kafka:kafka-clients:2.1.0",
sha1 = "34d9983705c953b97abb01e1cd04647f47272fe5",
)
@@ -12,3 +12,9 @@
artifact = "org.testcontainers:kafka:1.10.6",
sha1 = "5984e31306bd6c84a36092cdd19e0ef7e2268d98",
)
+
+ maven_jar(
+ name = "events-broker",
+ artifact = "com.gerritforge:events-broker:3.0.3",
+ sha1 = "efdc5bf6897563e2f6f85bfc1b8a5d65e3393424",
+ )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
index b20b312..4dc394b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
@@ -14,28 +14,39 @@
package com.googlesource.gerrit.plugins.kafka;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
+import java.util.Set;
@Singleton
public class Manager implements LifecycleListener {
private final KafkaPublisher publisher;
+ private final Set<TopicSubscriber> consumers;
+ private final BrokerApi brokerApi;
@Inject
- public Manager(KafkaPublisher publisher) {
+ public Manager(KafkaPublisher publisher, Set<TopicSubscriber> consumers, BrokerApi brokerApi) {
this.publisher = publisher;
+ this.consumers = consumers;
+ this.brokerApi = brokerApi;
}
@Override
public void start() {
publisher.start();
+ consumers.forEach(
+ topicSubscriber ->
+ brokerApi.receiveAsync(topicSubscriber.topic(), topicSubscriber.consumer()));
}
@Override
public void stop() {
publisher.stop();
+ brokerApi.disconnect();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index 7f219d3..c9a28ba 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -14,21 +14,32 @@
package com.googlesource.gerrit.plugins.kafka;
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.kafka.publish.GsonProvider;
+import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
class Module extends AbstractModule {
+ private final KafkaApiModule kafkaBrokerModule;
+
+ @Inject
+ public Module(KafkaApiModule kafkaBrokerModule) {
+ this.kafkaBrokerModule = kafkaBrokerModule;
+ }
+
@Override
protected void configure() {
- bind(Gson.class).toProvider(GsonProvider.class).in(Singleton.class);
+ bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class);
DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
+
+ install(kafkaBrokerModule);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
new file mode 100644
index 0000000..73c7509
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
@@ -0,0 +1,69 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.api;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.collect.Sets;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.google.inject.TypeLiteral;
+import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventDeserializer;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+
+@Singleton
+public class KafkaApiModule extends LifecycleModule {
+ private Set<TopicSubscriber> activeConsumers = Sets.newHashSet();
+ private WorkQueue workQueue;
+ private KafkaSubscriberProperties configuration;
+
+ @Inject
+ public KafkaApiModule(WorkQueue workQueue, KafkaSubscriberProperties configuration) {
+ this.workQueue = workQueue;
+ this.configuration = configuration;
+ }
+
+ @Inject(optional = true)
+ public void setPreviousBrokerApi(DynamicItem<BrokerApi> previousBrokerApi) {
+ if (previousBrokerApi != null && previousBrokerApi.get() != null) {
+ this.activeConsumers = previousBrokerApi.get().topicSubscribers();
+ }
+ }
+
+ @Override
+ protected void configure() {
+
+ bind(ExecutorService.class)
+ .annotatedWith(ConsumerExecutor.class)
+ .toInstance(
+ workQueue.createQueue(configuration.getNumberOfSubscribers(), "kafka-subscriber"));
+
+ bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
+ bind(new TypeLiteral<Deserializer<EventMessage>>() {}).to(KafkaEventDeserializer.class);
+ bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
+
+ DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
new file mode 100644
index 0000000..78df908
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -0,0 +1,72 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.api;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
+import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class KafkaBrokerApi implements BrokerApi {
+
+ private final KafkaPublisher publisher;
+ private final Provider<KafkaEventSubscriber> subscriberProvider;
+ private List<KafkaEventSubscriber> subscribers;
+
+ @Inject
+ public KafkaBrokerApi(
+ KafkaPublisher publisher, Provider<KafkaEventSubscriber> subscriberProvider) {
+ this.publisher = publisher;
+ this.subscriberProvider = subscriberProvider;
+ subscribers = new ArrayList<>();
+ }
+
+ @Override
+ public boolean send(String topic, EventMessage event) {
+ return publisher.publish(topic, event);
+ }
+
+ @Override
+ public void receiveAsync(String topic, Consumer<EventMessage> eventConsumer) {
+ KafkaEventSubscriber subscriber = subscriberProvider.get();
+ synchronized (subscribers) {
+ subscribers.add(subscriber);
+ }
+ subscriber.subscribe(topic, eventConsumer);
+ }
+
+ @Override
+ public void disconnect() {
+ for (KafkaEventSubscriber subscriber : subscribers) {
+ subscriber.shutdown();
+ }
+ subscribers.clear();
+ }
+
+ @Override
+ public Set<TopicSubscriber> topicSubscribers() {
+ return subscribers.stream()
+ .map(s -> TopicSubscriber.topicSubscriber(s.getTopic(), s.getMessageProcessor()))
+ .collect(Collectors.toSet());
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/broker/ConsumerExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/broker/ConsumerExecutor.java
new file mode 100644
index 0000000..eb878fa
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/broker/ConsumerExecutor.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.broker;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+public @interface ConsumerExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index c01e34e..7bb1f4f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -79,4 +79,8 @@
public String getTopic() {
return topic;
}
+
+ public String getBootstrapServers() {
+ return getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaPublisherProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaPublisherProperties.java
new file mode 100644
index 0000000..17f3ebd
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaPublisherProperties.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.config;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import com.google.inject.Inject;
+
+public class KafkaPublisherProperties extends KafkaProperties {
+ private static final long serialVersionUID = 0L;
+
+ @Inject
+ public KafkaPublisherProperties(
+ PluginConfigFactory configFactory, @PluginName String pluginName) {
+ super(configFactory, pluginName);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
new file mode 100644
index 0000000..b50053f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
@@ -0,0 +1,53 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.config;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import com.google.inject.Inject;
+
+public class KafkaSubscriberProperties extends KafkaProperties {
+ private static final long serialVersionUID = 1L;
+ private static final String DEFAULT_POLLING_INTERVAL_MS = "1000";
+ private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "4";
+
+ private final Integer pollingInterval;
+ private final String groupId;
+ private final Integer numberOfSubscribers;
+
+ @Inject
+ public KafkaSubscriberProperties(
+ PluginConfigFactory configFactory, @PluginName String pluginName) {
+ super(configFactory, pluginName);
+
+ this.pollingInterval =
+ Integer.parseInt(getProperty("pollingIntervalMs", DEFAULT_POLLING_INTERVAL_MS));
+ this.groupId = getProperty("groupId");
+ this.numberOfSubscribers =
+ Integer.parseInt(getProperty("numberOfSubscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS));
+ }
+
+ public Integer getPollingInterval() {
+ return pollingInterval;
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public Integer getNumberOfSubscribers() {
+ return numberOfSubscribers;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/GsonProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/GsonProvider.java
deleted file mode 100644
index 2c5c1e7..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/GsonProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.kafka.publish;
-
-import com.google.common.base.Supplier;
-import com.google.gerrit.server.events.SupplierSerializer;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.inject.Provider;
-
-public class GsonProvider implements Provider<Gson> {
-
- @Override
- public Gson get() {
- return new GsonBuilder().registerTypeAdapter(Supplier.class, new SupplierSerializer()).create();
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
index 21b89ce..cc271b5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
@@ -14,9 +14,12 @@
package com.googlesource.gerrit.plugins.kafka.publish;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.annotations.VisibleForTesting;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
+import com.google.gson.JsonObject;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
@@ -49,4 +52,17 @@
session.publish(gson.toJson(event));
}
}
+
+ public boolean publish(String topic, EventMessage event) {
+ return session.publish(topic, getPayload(event));
+ }
+
+ private String getPayload(EventMessage event) {
+ return gson.toJson(event);
+ }
+
+ @VisibleForTesting
+ public JsonObject eventToJson(Event event) {
+ return gson.toJsonTree(event).getAsJsonObject();
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index fcb5263..52b242a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -16,9 +16,12 @@
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +73,19 @@
}
public void publish(String messageBody) {
- producer.send(new ProducerRecord<>(properties.getTopic(), "" + System.nanoTime(), messageBody));
+ publish(properties.getTopic(), messageBody);
+ }
+
+ public boolean publish(String topic, String messageBody) {
+ Future<RecordMetadata> future =
+ producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody));
+ try {
+ RecordMetadata metadata = future.get();
+ LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
+ return true;
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error("Cannot send the message", e);
+ return false;
+ }
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaConsumerFactory.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaConsumerFactory.java
new file mode 100644
index 0000000..9ec109d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaConsumerFactory.java
@@ -0,0 +1,37 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.subscribe;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+
+@Singleton
+class KafkaConsumerFactory {
+ private KafkaSubscriberProperties config;
+
+ @Inject
+ public KafkaConsumerFactory(KafkaSubscriberProperties configuration) {
+ this.config = configuration;
+ }
+
+ public Consumer<byte[], byte[]> create(Deserializer<byte[]> keyDeserializer) {
+ return new KafkaConsumer<>(config, keyDeserializer, new ByteArrayDeserializer());
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
new file mode 100644
index 0000000..bab2ad0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
@@ -0,0 +1,54 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.subscribe;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+@Singleton
+public class KafkaEventDeserializer implements Deserializer<EventMessage> {
+
+ private final StringDeserializer stringDeserializer = new StringDeserializer();
+ private Gson gson;
+
+ // To be used when providing this deserializer with class name (then need to add a configuration
+ // entry to set the gson.provider
+ public KafkaEventDeserializer() {}
+
+ @Inject
+ public KafkaEventDeserializer(Gson gson) {
+ this.gson = gson;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {}
+
+ @Override
+ public EventMessage deserialize(String topic, byte[] data) {
+ final EventMessage result =
+ gson.fromJson(stringDeserializer.deserialize(topic, data), EventMessage.class);
+ result.validate();
+
+ return result;
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
new file mode 100644
index 0000000..6548f2d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -0,0 +1,134 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.kafka.subscribe;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+public class KafkaEventSubscriber {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private final Consumer<byte[], byte[]> consumer;
+ private final OneOffRequestContext oneOffCtx;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private final Deserializer<EventMessage> valueDeserializer;
+ private final KafkaSubscriberProperties configuration;
+ private final ExecutorService executor;
+ private final KafkaEventSubscriberMetrics subscriberMetrics;
+
+ private java.util.function.Consumer<EventMessage> messageProcessor;
+
+ private String topic;
+
+ @Inject
+ public KafkaEventSubscriber(
+ KafkaSubscriberProperties configuration,
+ KafkaConsumerFactory consumerFactory,
+ Deserializer<byte[]> keyDeserializer,
+ Deserializer<EventMessage> valueDeserializer,
+ OneOffRequestContext oneOffCtx,
+ @ConsumerExecutor ExecutorService executor,
+ KafkaEventSubscriberMetrics subscriberMetrics) {
+
+ this.configuration = configuration;
+ this.oneOffCtx = oneOffCtx;
+ this.executor = executor;
+ this.subscriberMetrics = subscriberMetrics;
+
+ final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(KafkaEventSubscriber.class.getClassLoader());
+ this.consumer = consumerFactory.create(keyDeserializer);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousClassLoader);
+ }
+ this.valueDeserializer = valueDeserializer;
+ }
+
+ public void subscribe(String topic, java.util.function.Consumer<EventMessage> messageProcessor) {
+ this.topic = topic;
+ this.messageProcessor = messageProcessor;
+ logger.atInfo().log(
+ "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, topic);
+ consumer.subscribe(Collections.singleton(topic));
+ executor.execute(new ReceiverJob());
+ }
+
+ public void shutdown() {
+ closed.set(true);
+ consumer.wakeup();
+ }
+
+ public java.util.function.Consumer<EventMessage> getMessageProcessor() {
+ return messageProcessor;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ private class ReceiverJob implements Runnable {
+
+ @Override
+ public void run() {
+ consume();
+ }
+
+ private void consume() {
+ try {
+ while (!closed.get()) {
+ ConsumerRecords<byte[], byte[]> consumerRecords =
+ consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
+ consumerRecords.forEach(
+ consumerRecord -> {
+ try (ManualRequestContext ctx = oneOffCtx.open()) {
+ EventMessage event =
+ valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
+ messageProcessor.accept(event);
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log(
+ "Malformed event '%s': [Exception: %s]",
+ new String(consumerRecord.value(), UTF_8));
+ subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
+ }
+ });
+ }
+ } catch (WakeupException e) {
+ // Ignore exception if closing
+ if (!closed.get()) throw e;
+ } catch (Exception e) {
+ subscriberMetrics.incrementSubscriberFailedToPollMessages();
+ throw e;
+ } finally {
+ consumer.close();
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
new file mode 100644
index 0000000..81174f9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
@@ -0,0 +1,48 @@
+package com.googlesource.gerrit.plugins.kafka.subscribe;
+
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+class KafkaEventSubscriberMetrics {
+
+ private static final String SUBSCRIBER_POLL_FAILURE_COUNTER =
+ "subscriber_msg_consumer_poll_failure_counter";
+ private static final String SUBSCRIBER_FAILURE_COUNTER =
+ "subscriber_msg_consumer_failure_counter";
+
+ private final Counter1<String> subscriberPollFailureCounter;
+ private final Counter1<String> subscriberFailureCounter;
+
+ @Inject
+ public KafkaEventSubscriberMetrics(MetricMaker metricMaker) {
+ this.subscriberPollFailureCounter =
+ metricMaker.newCounter(
+ "kafka/subscriber/subscriber_message_consumer_poll_failure_counter",
+ new Description("Number of failed attempts to poll messages by the subscriber")
+ .setRate()
+ .setUnit("errors"),
+ Field.ofString(
+ SUBSCRIBER_POLL_FAILURE_COUNTER, "Subscriber failed to poll messages count"));
+ this.subscriberFailureCounter =
+ metricMaker.newCounter(
+ "kafka/subscriber/subscriber_message_consumer_failure_counter",
+ new Description("Number of messages failed to consume by the subscriber consumer")
+ .setRate()
+ .setUnit("errors"),
+ Field.ofString(
+ SUBSCRIBER_FAILURE_COUNTER, "Subscriber failed to consume messages count"));
+ }
+
+ public void incrementSubscriberFailedToPollMessages() {
+ subscriberPollFailureCounter.increment(SUBSCRIBER_POLL_FAILURE_COUNTER);
+ }
+
+ public void incrementSubscriberFailedToConsumeMessage() {
+ subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
+ }
+}
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 685c195..c6dc18e 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -1 +1,27 @@
-This plugin can publish gerrit stream events to an Apache Kafka topic.
+This plugin publishes gerrit stream events to an Apache Kafka topic.
+
+It also provides a Kafka-based implementation of a generic
+[Events Broker Api](https://github.com/GerritForge/events-broker) which can be used by
+Gerrit and other plugins.
+
+Use-cases
+=========
+
+CI/CD Validation
+----------------
+
+Gerrit stream events can be published to the internal network where other subscribers
+can trigger automated jobs (e.g. CI/CD validation) for fetching the changes and validating
+them through build and testing.
+
+__NOTE__: This use-case would require a CI/CD system (e.g. Jenkins, Zuul or other) and
+the development of a Kafka-based subscriber to receive the event and trigger the build.
+
+Events replication
+------------------
+
+Multiple Gerrit masters in a multi-site setup can be informed on the stream events
+happening on every node thanks to the notification to a Kafka pub/sub topic.
+
+__NOTE__: This use-case would require the [multi-site plugin](https://gerrit.googlesource.com/plugins/multi-site)
+on each of the Gerrit masters that are part of the same multi-site cluster.
\ No newline at end of file
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 96e4b94..faa5bf1 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -31,4 +31,15 @@
| lingerMs | 1
| bufferMemory | 33554432
| keySerializer | org.apache.kafka.common.serialization.StringSerializer
-| valueSerializer | org.apache.kafka.common.serialization.StringSerializer
\ No newline at end of file
+| valueSerializer | org.apache.kafka.common.serialization.StringSerializer
+
+Additional properties
+---------------------
+
+`plugin.kafka-events.groupId`
+: Kafka consumer group for receiving messages.
+ Default: Gerrit instance-id
+
+`plugin.kafka-events.pollingIntervalMs`
+: Polling interval in msec for receiving messages from Kafka topic subscription.
+ Default: 1000
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 12a448c..95d2603 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -16,7 +16,7 @@
import static com.google.common.truth.Truth.assertThat;
-import com.google.common.base.Supplier;
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
import com.google.common.collect.Iterables;
import com.google.gerrit.acceptance.GerritConfig;
import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
@@ -28,10 +28,7 @@
import com.google.gerrit.extensions.common.ChangeMessageInfo;
import com.google.gerrit.server.events.CommentAddedEvent;
import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventDeserializer;
-import com.google.gerrit.server.events.SupplierDeserializer;
import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
import java.util.ArrayList;
import java.util.Collections;
@@ -115,11 +112,7 @@
assertThat(events).hasSize(6);
String commentAddedEventJson = Iterables.getLast(events);
- Gson gson =
- new GsonBuilder()
- .registerTypeAdapter(Event.class, new EventDeserializer())
- .registerTypeAdapter(Supplier.class, new SupplierDeserializer())
- .create();
+ Gson gson = new EventGsonProvider().get();
Event event = gson.fromJson(commentAddedEventJson, Event.class);
assertThat(event).isInstanceOf(CommentAddedEvent.class);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
new file mode 100644
index 0000000..e456a2a
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
@@ -0,0 +1,64 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.subscribe;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gson.Gson;
+import java.util.UUID;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KafkaEventDeserializerTest {
+ private KafkaEventDeserializer deserializer;
+
+ @Before
+ public void setUp() {
+ final Gson gson = new EventGsonProvider().get();
+ deserializer = new KafkaEventDeserializer(gson);
+ }
+
+ @Test
+ public void kafkaEventDeserializerShouldParseAKafkaEvent() {
+ final UUID eventId = UUID.randomUUID();
+ final String eventType = "event-type";
+ final UUID sourceInstanceId = UUID.randomUUID();
+ final long eventCreatedOn = 10L;
+ final String eventJson =
+ String.format(
+ "{ "
+ + "\"header\": { \"eventId\": \"%s\", \"eventType\": \"%s\", \"sourceInstanceId\": \"%s\", \"eventCreatedOn\": %d },"
+ + "\"body\": { \"type\": \"project-created\" }"
+ + "}",
+ eventId, eventType, sourceInstanceId, eventCreatedOn);
+ final EventMessage event = deserializer.deserialize("ignored", eventJson.getBytes(UTF_8));
+
+ assertThat(event.getHeader().eventId).isEqualTo(eventId);
+ assertThat(event.getHeader().sourceInstanceId).isEqualTo(sourceInstanceId);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void kafkaEventDeserializerShouldFailForInvalidJson() {
+ deserializer.deserialize("ignored", "this is not a JSON string".getBytes(UTF_8));
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void kafkaEventDeserializerShouldFailForInvalidObjectButValidJSON() {
+ deserializer.deserialize("ignored", "{}".getBytes(UTF_8));
+ }
+}