Move Kafka consumer logic to a separate class
Moving Kafka consumer logic to a separate class will allow us
to use BrokerApi for message consuming.
Feature: Issue 10829
Change-Id: Ib070e132891c53ad33e9b7538213a3e1a3b5b2e0
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index 457555b..64af68c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -14,13 +14,9 @@
package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.permissions.PermissionBackendException;
-import com.google.gerrit.server.util.ManualRequestContext;
-import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.gson.Gson;
import com.google.gwtorm.server.OrmException;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
@@ -31,105 +27,44 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.io.IOException;
-import java.time.Duration;
-import java.util.Collections;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.Deserializer;
public abstract class AbstractKafkaSubcriber implements Runnable {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private final Consumer<byte[], byte[]> consumer;
+ private final KafkaEventSubscriber subscriber;
private final ForwardedEventRouter eventRouter;
private final DynamicSet<DroppedEventListener> droppedEventListeners;
private final Gson gson;
private final UUID instanceId;
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private final Deserializer<SourceAwareEventWrapper> valueDeserializer;
- private final KafkaConfiguration configuration;
- private final KafkaConsumerFactory consumerFactory;
- private final OneOffRequestContext oneOffCtx;
private final MessageLogger msgLog;
private SubscriberMetrics subscriberMetrics;
public AbstractKafkaSubcriber(
- KafkaConfiguration configuration,
- KafkaConsumerFactory consumerFactory,
- Deserializer<byte[]> keyDeserializer,
- Deserializer<SourceAwareEventWrapper> valueDeserializer,
+ KafkaEventSubscriber subscriber,
ForwardedEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gson,
@InstanceId UUID instanceId,
- OneOffRequestContext oneOffCtx,
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics) {
- this.configuration = configuration;
- this.consumerFactory = consumerFactory;
this.eventRouter = eventRouter;
this.droppedEventListeners = droppedEventListeners;
this.gson = gson;
this.instanceId = instanceId;
- this.oneOffCtx = oneOffCtx;
this.msgLog = msgLog;
this.subscriberMetrics = subscriberMetrics;
- final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(AbstractKafkaSubcriber.class.getClassLoader());
- this.consumer = consumerFactory.create(keyDeserializer, instanceId);
- } finally {
- Thread.currentThread().setContextClassLoader(previousClassLoader);
- }
- this.valueDeserializer = valueDeserializer;
+ this.subscriber = subscriber;
}
@Override
public void run() {
- final String topic = configuration.getKafka().getTopicAlias(getTopic());
- subscribe(topic);
+ subscriber.subscribe(getTopic(), this::processRecord);
}
protected abstract EventTopic getTopic();
- public void subscribe(String topic) {
- try {
-
- logger.atInfo().log("Kafka consumer subscribing to topic alias [%s]", topic);
- consumer.subscribe(Collections.singleton(topic));
- while (!closed.get()) {
- ConsumerRecords<byte[], byte[]> consumerRecords =
- consumer.poll(Duration.ofMillis(configuration.kafkaSubscriber().getPollingInterval()));
- consumerRecords.forEach(
- consumerRecord -> {
- try (ManualRequestContext ctx = oneOffCtx.open()) {
- SourceAwareEventWrapper event =
- valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
- processRecord(event);
- } catch (Exception e) {
- logger.atSevere().withCause(e).log(
- "Malformed event '%s': [Exception: %s]",
- new String(consumerRecord.value(), UTF_8));
- subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
- }
- });
- }
- } catch (WakeupException e) {
- // Ignore exception if closing
- if (!closed.get()) throw e;
- } catch (Exception e) {
- subscriberMetrics.incrementSubscriberFailedToPollMessages();
- throw e;
- } finally {
- consumer.close();
- }
- }
-
private void processRecord(SourceAwareEventWrapper event) {
if (event.getHeader().getSourceInstanceId().equals(instanceId)) {
@@ -156,7 +91,6 @@
// Shutdown hook which can be called from a separate thread
public void shutdown() {
- closed.set(true);
- consumer.wakeup();
+ subscriber.shutdown();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
index 9e55430..28d068e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
@@ -15,7 +15,6 @@
package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -25,35 +24,25 @@
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.UUID;
-import org.apache.kafka.common.serialization.Deserializer;
@Singleton
public class IndexEventSubscriber extends AbstractKafkaSubcriber {
@Inject
public IndexEventSubscriber(
- KafkaConfiguration configuration,
- KafkaConsumerFactory consumerFactory,
- Deserializer<byte[]> keyDeserializer,
- Deserializer<SourceAwareEventWrapper> valueDeserializer,
+ KafkaEventSubscriber subscriber,
IndexEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gsonProvider,
@InstanceId UUID instanceId,
- OneOffRequestContext oneOffCtx,
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics) {
super(
- configuration,
- consumerFactory,
- keyDeserializer,
- valueDeserializer,
+ subscriber,
eventRouter,
droppedEventListeners,
gsonProvider,
instanceId,
- oneOffCtx,
msgLog,
subscriberMetrics);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
index 6663a64..26725d8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
@@ -15,7 +15,6 @@
package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -25,35 +24,25 @@
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.UUID;
-import org.apache.kafka.common.serialization.Deserializer;
@Singleton
public class KafkaCacheEvictionEventSubscriber extends AbstractKafkaSubcriber {
@Inject
public KafkaCacheEvictionEventSubscriber(
- KafkaConfiguration configuration,
- KafkaConsumerFactory consumerFactory,
- Deserializer<byte[]> keyDeserializer,
- Deserializer<SourceAwareEventWrapper> valueDeserializer,
+ KafkaEventSubscriber subscriber,
StreamEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gsonProvider,
@InstanceId UUID instanceId,
- OneOffRequestContext oneOffCtx,
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics) {
super(
- configuration,
- consumerFactory,
- keyDeserializer,
- valueDeserializer,
+ subscriber,
eventRouter,
droppedEventListeners,
gsonProvider,
instanceId,
- oneOffCtx,
msgLog,
subscriberMetrics);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
new file mode 100644
index 0000000..222cf94
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
@@ -0,0 +1,109 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+public class KafkaEventSubscriber {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private final Consumer<byte[], byte[]> consumer;
+ private final OneOffRequestContext oneOffCtx;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private final Deserializer<SourceAwareEventWrapper> valueDeserializer;
+ private final KafkaConfiguration configuration;
+ private final SubscriberMetrics subscriberMetrics;
+
+ @Inject
+ public KafkaEventSubscriber(
+ KafkaConfiguration configuration,
+ KafkaConsumerFactory consumerFactory,
+ Deserializer<byte[]> keyDeserializer,
+ Deserializer<SourceAwareEventWrapper> valueDeserializer,
+ @InstanceId UUID instanceId,
+ OneOffRequestContext oneOffCtx,
+ SubscriberMetrics subscriberMetrics) {
+
+ this.configuration = configuration;
+ this.oneOffCtx = oneOffCtx;
+ this.subscriberMetrics = subscriberMetrics;
+
+ final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(KafkaEventSubscriber.class.getClassLoader());
+ this.consumer = consumerFactory.create(keyDeserializer, instanceId);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousClassLoader);
+ }
+ this.valueDeserializer = valueDeserializer;
+ }
+
+ public void subscribe(
+ EventTopic evenTopic, java.util.function.Consumer<SourceAwareEventWrapper> messageProcessor) {
+ try {
+ final String topic = configuration.getKafka().getTopicAlias(evenTopic);
+ logger.atInfo().log(
+ "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, evenTopic);
+ consumer.subscribe(Collections.singleton(topic));
+ while (!closed.get()) {
+ ConsumerRecords<byte[], byte[]> consumerRecords =
+ consumer.poll(Duration.ofMillis(configuration.kafkaSubscriber().getPollingInterval()));
+ consumerRecords.forEach(
+ consumerRecord -> {
+ try (ManualRequestContext ctx = oneOffCtx.open()) {
+ SourceAwareEventWrapper event =
+ valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
+ messageProcessor.accept(event);
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log(
+ "Malformed event '%s': [Exception: %s]",
+ new String(consumerRecord.value(), UTF_8));
+ subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
+ }
+ });
+ }
+ } catch (WakeupException e) {
+ // Ignore exception if closing
+ if (!closed.get()) throw e;
+ } catch (Exception e) {
+ subscriberMetrics.incrementSubscriberFailedToPollMessages();
+ throw e;
+ } finally {
+ consumer.close();
+ }
+ }
+
+ public void shutdown() {
+ closed.set(true);
+ consumer.wakeup();
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
index e581b0e..e47a515 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
@@ -15,7 +15,6 @@
package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -25,35 +24,25 @@
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.UUID;
-import org.apache.kafka.common.serialization.Deserializer;
@Singleton
public class ProjectUpdateEventSubscriber extends AbstractKafkaSubcriber {
@Inject
public ProjectUpdateEventSubscriber(
- KafkaConfiguration configuration,
- KafkaConsumerFactory consumerFactory,
- Deserializer<byte[]> keyDeserializer,
- Deserializer<SourceAwareEventWrapper> valueDeserializer,
+ KafkaEventSubscriber subscriber,
ProjectListUpdateRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gson,
@InstanceId UUID instanceId,
- OneOffRequestContext oneOffCtx,
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics) {
super(
- configuration,
- consumerFactory,
- keyDeserializer,
- valueDeserializer,
+ subscriber,
eventRouter,
droppedEventListeners,
gson,
instanceId,
- oneOffCtx,
msgLog,
subscriberMetrics);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
index dd882cd..f07d969 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
@@ -15,7 +15,6 @@
package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -25,35 +24,25 @@
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.UUID;
-import org.apache.kafka.common.serialization.Deserializer;
@Singleton
public class StreamEventSubscriber extends AbstractKafkaSubcriber {
@Inject
public StreamEventSubscriber(
- KafkaConfiguration configuration,
- KafkaConsumerFactory consumerFactory,
- Deserializer<byte[]> keyDeserializer,
- Deserializer<SourceAwareEventWrapper> valueDeserializer,
+ KafkaEventSubscriber subscriber,
StreamEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gson,
@InstanceId UUID instanceId,
- OneOffRequestContext oneOffCtx,
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics) {
super(
- configuration,
- consumerFactory,
- keyDeserializer,
- valueDeserializer,
+ subscriber,
eventRouter,
droppedEventListeners,
gson,
instanceId,
- oneOffCtx,
msgLog,
subscriberMetrics);
}