Abstract Publisher/Subscriber into generic interfaces
Decouple the interface to send/receive messages to Kafka
using generic interfaces bound to the actual native implementation
classes.
The KafkaSession does not refer anymore to a native KafkaProducer
allowing other implementations to be plugged in.
This is a preparation work to introduce a REST-API client
based access to Kafka REST Proxy through parameters on
the events broker plugin config.
Change-Id: I6034915c5538e3df365a45e2f134bab50aff932f
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index 1be52cb..2f74b44 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -26,7 +26,7 @@
import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
class Module extends AbstractModule {
@@ -43,8 +43,7 @@
DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
- bind(new TypeLiteral<KafkaProducer<String, String>>() {})
- .toProvider(KafkaProducerProvider.class);
+ bind(new TypeLiteral<Producer<String, String>>() {}).toProvider(KafkaProducerProvider.class);
install(kafkaBrokerModule);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
index 73c7509..b76ff7f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
@@ -28,6 +28,8 @@
import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventDeserializer;
+import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventNativeSubscriber;
+import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -54,6 +56,7 @@
@Override
protected void configure() {
+ bind(KafkaEventSubscriber.class).to(KafkaEventNativeSubscriber.class);
bind(ExecutorService.class)
.annotatedWith(ConsumerExecutor.class)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java
index b1f11f7..4fb98b2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java
@@ -18,8 +18,9 @@
import com.google.inject.Provider;
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
-public class KafkaProducerProvider implements Provider<KafkaProducer<String, String>> {
+public class KafkaProducerProvider implements Provider<Producer<String, String>> {
private final KafkaProperties properties;
@Inject
@@ -28,7 +29,7 @@
}
@Override
- public KafkaProducer<String, String> get() {
+ public Producer<String, String> get() {
return new KafkaProducer<>(properties);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index 0e851aa..f7105f9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -19,7 +19,6 @@
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
import com.googlesource.gerrit.plugins.kafka.publish.KafkaEventsPublisherMetrics;
import java.util.concurrent.Future;
-import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
@@ -30,13 +29,13 @@
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSession.class);
private final KafkaProperties properties;
- private final Provider<KafkaProducer<String, String>> producerProvider;
+ private final Provider<Producer<String, String>> producerProvider;
private final KafkaEventsPublisherMetrics publisherMetrics;
private volatile Producer<String, String> producer;
@Inject
public KafkaSession(
- Provider<KafkaProducer<String, String>> producerProvider,
+ Provider<Producer<String, String>> producerProvider,
KafkaProperties properties,
KafkaEventsPublisherMetrics publisherMetrics) {
this.producerProvider = producerProvider;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventNativeSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventNativeSubscriber.java
new file mode 100644
index 0000000..a98e098
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventNativeSubscriber.java
@@ -0,0 +1,207 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.kafka.subscribe;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+public class KafkaEventNativeSubscriber implements KafkaEventSubscriber {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private static final int DELAY_RECONNECT_AFTER_FAILURE_MSEC = 1000;
+
+ private final OneOffRequestContext oneOffCtx;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private final Deserializer<EventMessage> valueDeserializer;
+ private final KafkaSubscriberProperties configuration;
+ private final ExecutorService executor;
+ private final KafkaEventSubscriberMetrics subscriberMetrics;
+ private final KafkaConsumerFactory consumerFactory;
+ private final Deserializer<byte[]> keyDeserializer;
+
+ private java.util.function.Consumer<EventMessage> messageProcessor;
+ private String topic;
+ private AtomicBoolean resetOffset = new AtomicBoolean(false);
+
+ private volatile ReceiverJob receiver;
+
+ @Inject
+ public KafkaEventNativeSubscriber(
+ KafkaSubscriberProperties configuration,
+ KafkaConsumerFactory consumerFactory,
+ Deserializer<byte[]> keyDeserializer,
+ Deserializer<EventMessage> valueDeserializer,
+ OneOffRequestContext oneOffCtx,
+ @ConsumerExecutor ExecutorService executor,
+ KafkaEventSubscriberMetrics subscriberMetrics) {
+
+ this.configuration = configuration;
+ this.oneOffCtx = oneOffCtx;
+ this.executor = executor;
+ this.subscriberMetrics = subscriberMetrics;
+ this.consumerFactory = consumerFactory;
+ this.keyDeserializer = keyDeserializer;
+ this.valueDeserializer = valueDeserializer;
+ }
+
+ /* (non-Javadoc)
+ * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#subscribe(java.lang.String, java.util.function.Consumer)
+ */
+ @Override
+ public void subscribe(String topic, java.util.function.Consumer<EventMessage> messageProcessor) {
+ this.topic = topic;
+ this.messageProcessor = messageProcessor;
+ logger.atInfo().log(
+ "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, topic);
+ runReceiver();
+ }
+
+ private void runReceiver() {
+ final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread()
+ .setContextClassLoader(KafkaEventNativeSubscriber.class.getClassLoader());
+ Consumer<byte[], byte[]> consumer = consumerFactory.create(keyDeserializer);
+ consumer.subscribe(Collections.singleton(topic));
+ receiver = new ReceiverJob(consumer);
+ executor.execute(receiver);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousClassLoader);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#shutdown()
+ */
+ @Override
+ public void shutdown() {
+ closed.set(true);
+ receiver.wakeup();
+ }
+
+ /* (non-Javadoc)
+ * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#getMessageProcessor()
+ */
+ @Override
+ public java.util.function.Consumer<EventMessage> getMessageProcessor() {
+ return messageProcessor;
+ }
+
+ /* (non-Javadoc)
+ * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#getTopic()
+ */
+ @Override
+ public String getTopic() {
+ return topic;
+ }
+
+ /* (non-Javadoc)
+ * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#resetOffset()
+ */
+ @Override
+ public void resetOffset() {
+ resetOffset.set(true);
+ }
+
+ private class ReceiverJob implements Runnable {
+ private final Consumer<byte[], byte[]> consumer;
+
+ public ReceiverJob(Consumer<byte[], byte[]> consumer) {
+ this.consumer = consumer;
+ }
+
+ public void wakeup() {
+ consumer.wakeup();
+ }
+
+ @Override
+ public void run() {
+ try {
+ consume();
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log("Consumer loop of topic %s ended", topic);
+ }
+ }
+
+ private void consume() throws InterruptedException {
+ try {
+ while (!closed.get()) {
+ if (resetOffset.getAndSet(false)) {
+ // Make sure there is an assignment for this consumer
+ while (consumer.assignment().isEmpty() && !closed.get()) {
+ logger.atInfo().log(
+ "Resetting offset: no partitions assigned to the consumer, request assignment.");
+ consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
+ }
+ consumer.seekToBeginning(consumer.assignment());
+ }
+ ConsumerRecords<byte[], byte[]> consumerRecords =
+ consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
+ consumerRecords.forEach(
+ consumerRecord -> {
+ try (ManualRequestContext ctx = oneOffCtx.open()) {
+ EventMessage event =
+ valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
+ messageProcessor.accept(event);
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log(
+ "Malformed event '%s': [Exception: %s]",
+ new String(consumerRecord.value(), UTF_8));
+ subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
+ }
+ });
+ }
+ } catch (WakeupException e) {
+ // Ignore exception if closing
+ if (!closed.get()) {
+ logger.atSevere().withCause(e).log("Consumer loop of topic %s interrupted", topic);
+ reconnectAfterFailure();
+ }
+ } catch (Exception e) {
+ subscriberMetrics.incrementSubscriberFailedToPollMessages();
+ logger.atSevere().withCause(e).log(
+ "Existing consumer loop of topic %s because of a non-recoverable exception", topic);
+ reconnectAfterFailure();
+ } finally {
+ consumer.close();
+ }
+ }
+
+ private void reconnectAfterFailure() throws InterruptedException {
+ // Random delay with average of DELAY_RECONNECT_AFTER_FAILURE_MSEC
+ // for avoiding hammering exactly at the same interval in case of failure
+ long reconnectDelay =
+ DELAY_RECONNECT_AFTER_FAILURE_MSEC / 2
+ + new Random().nextInt(DELAY_RECONNECT_AFTER_FAILURE_MSEC);
+ Thread.sleep(reconnectDelay);
+ runReceiver();
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
index 7ef9d7b..6315dea 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2019 The Android Open Source Project
+// Copyright (C) 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -11,176 +11,39 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+
package com.googlesource.gerrit.plugins.kafka.subscribe;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.server.util.ManualRequestContext;
-import com.google.gerrit.server.util.OneOffRequestContext;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
-import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.Deserializer;
-public class KafkaEventSubscriber {
- private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private static final int DELAY_RECONNECT_AFTER_FAILURE_MSEC = 1000;
+/** Generic interface to a Kafka topic subscriber. */
+public interface KafkaEventSubscriber {
- private final OneOffRequestContext oneOffCtx;
- private final AtomicBoolean closed = new AtomicBoolean(false);
+ /**
+ * Subscribe to a topic and receive messages asynchronously.
+ *
+ * @param topic Kafka topic name
+ * @param messageProcessor consumer function for processing incoming messages
+ */
+ void subscribe(String topic, java.util.function.Consumer<EventMessage> messageProcessor);
- private final Deserializer<EventMessage> valueDeserializer;
- private final KafkaSubscriberProperties configuration;
- private final ExecutorService executor;
- private final KafkaEventSubscriberMetrics subscriberMetrics;
- private final KafkaConsumerFactory consumerFactory;
- private final Deserializer<byte[]> keyDeserializer;
+ /** Shutdown Kafka consumer. */
+ void shutdown();
- private java.util.function.Consumer<EventMessage> messageProcessor;
- private String topic;
- private AtomicBoolean resetOffset = new AtomicBoolean(false);
+ /**
+ * Returns the current consumer function for the subscribed topic.
+ *
+ * @return the default topic consumer function.
+ */
+ java.util.function.Consumer<EventMessage> getMessageProcessor();
- private volatile ReceiverJob receiver;
+ /**
+ * Returns the current subscribed topic name.
+ *
+ * @return Kafka topic name.
+ */
+ String getTopic();
- @Inject
- public KafkaEventSubscriber(
- KafkaSubscriberProperties configuration,
- KafkaConsumerFactory consumerFactory,
- Deserializer<byte[]> keyDeserializer,
- Deserializer<EventMessage> valueDeserializer,
- OneOffRequestContext oneOffCtx,
- @ConsumerExecutor ExecutorService executor,
- KafkaEventSubscriberMetrics subscriberMetrics) {
-
- this.configuration = configuration;
- this.oneOffCtx = oneOffCtx;
- this.executor = executor;
- this.subscriberMetrics = subscriberMetrics;
- this.consumerFactory = consumerFactory;
- this.keyDeserializer = keyDeserializer;
- this.valueDeserializer = valueDeserializer;
- }
-
- public void subscribe(String topic, java.util.function.Consumer<EventMessage> messageProcessor) {
- this.topic = topic;
- this.messageProcessor = messageProcessor;
- logger.atInfo().log(
- "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, topic);
- runReceiver();
- }
-
- private void runReceiver() {
- final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(KafkaEventSubscriber.class.getClassLoader());
- Consumer<byte[], byte[]> consumer = consumerFactory.create(keyDeserializer);
- consumer.subscribe(Collections.singleton(topic));
- receiver = new ReceiverJob(consumer);
- executor.execute(receiver);
- } finally {
- Thread.currentThread().setContextClassLoader(previousClassLoader);
- }
- }
-
- public void shutdown() {
- closed.set(true);
- receiver.wakeup();
- }
-
- public java.util.function.Consumer<EventMessage> getMessageProcessor() {
- return messageProcessor;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void resetOffset() {
- resetOffset.set(true);
- }
-
- private class ReceiverJob implements Runnable {
- private final Consumer<byte[], byte[]> consumer;
-
- public ReceiverJob(Consumer<byte[], byte[]> consumer) {
- this.consumer = consumer;
- }
-
- public void wakeup() {
- consumer.wakeup();
- }
-
- @Override
- public void run() {
- try {
- consume();
- } catch (Exception e) {
- logger.atSevere().withCause(e).log("Consumer loop of topic %s ended", topic);
- }
- }
-
- private void consume() throws InterruptedException {
- try {
- while (!closed.get()) {
- if (resetOffset.getAndSet(false)) {
- // Make sure there is an assignment for this consumer
- while (consumer.assignment().isEmpty() && !closed.get()) {
- logger.atInfo().log(
- "Resetting offset: no partitions assigned to the consumer, request assignment.");
- consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
- }
- consumer.seekToBeginning(consumer.assignment());
- }
- ConsumerRecords<byte[], byte[]> consumerRecords =
- consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
- consumerRecords.forEach(
- consumerRecord -> {
- try (ManualRequestContext ctx = oneOffCtx.open()) {
- EventMessage event =
- valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
- messageProcessor.accept(event);
- } catch (Exception e) {
- logger.atSevere().withCause(e).log(
- "Malformed event '%s': [Exception: %s]",
- new String(consumerRecord.value(), UTF_8));
- subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
- }
- });
- }
- } catch (WakeupException e) {
- // Ignore exception if closing
- if (!closed.get()) {
- logger.atSevere().withCause(e).log("Consumer loop of topic %s interrupted", topic);
- reconnectAfterFailure();
- }
- } catch (Exception e) {
- subscriberMetrics.incrementSubscriberFailedToPollMessages();
- logger.atSevere().withCause(e).log(
- "Existing consumer loop of topic %s because of a non-recoverable exception", topic);
- reconnectAfterFailure();
- } finally {
- consumer.close();
- }
- }
-
- private void reconnectAfterFailure() throws InterruptedException {
- // Random delay with average of DELAY_RECONNECT_AFTER_FAILURE_MSEC
- // for avoiding hammering exactly at the same interval in case of failure
- long reconnectDelay =
- DELAY_RECONNECT_AFTER_FAILURE_MSEC / 2
- + new Random().nextInt(DELAY_RECONNECT_AFTER_FAILURE_MSEC);
- Thread.sleep(reconnectDelay);
- runReceiver();
- }
- }
+ /** Reset the offset for reading incoming Kafka messages of the topic. */
+ void resetOffset();
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 5a85b81..af0d53f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -45,7 +45,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.After;
import org.junit.AfterClass;
@@ -103,8 +103,7 @@
new KafkaSubscriberProperties(
TEST_POLLING_INTERVAL_MSEC, TEST_GROUP_ID, TEST_NUM_SUBSCRIBERS);
bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
- bind(new TypeLiteral<KafkaProducer<String, String>>() {})
- .toProvider(KafkaProducerProvider.class);
+ bind(new TypeLiteral<Producer<String, String>>() {}).toProvider(KafkaProducerProvider.class);
bind(WorkQueue.class).to(TestWorkQueue.class);
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
index 46eaa73..ccf60ce 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
@@ -25,7 +25,7 @@
import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.Before;
@@ -39,7 +39,7 @@
@RunWith(MockitoJUnitRunner.class)
public class KafkaSessionTest {
KafkaSession objectUnderTest;
- @Mock KafkaProducer<String, String> kafkaProducer;
+ @Mock Producer<String, String> kafkaProducer;
@Mock KafkaProducerProvider producerProvider;
@Mock KafkaProperties properties;
@Mock KafkaEventsPublisherMetrics publisherMetrics;