Abstract Publisher/Subscriber into generic interfaces

Decouple the interface to send/receive messages to Kafka
using generic interfaces bound to the actual native implementation
classes.

The KafkaSession does not refer anymore to a native KafkaProducer
allowing other implementations to be plugged in.

This is a preparation work to introduce a REST-API client
based access to Kafka REST Proxy through parameters on
the events broker plugin config.

Change-Id: I6034915c5538e3df365a45e2f134bab50aff932f
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index 1be52cb..2f74b44 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -26,7 +26,7 @@
 import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 
 class Module extends AbstractModule {
 
@@ -43,8 +43,7 @@
     DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
     DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
 
-    bind(new TypeLiteral<KafkaProducer<String, String>>() {})
-        .toProvider(KafkaProducerProvider.class);
+    bind(new TypeLiteral<Producer<String, String>>() {}).toProvider(KafkaProducerProvider.class);
 
     install(kafkaBrokerModule);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
index 73c7509..b76ff7f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
@@ -28,6 +28,8 @@
 import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
 import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventDeserializer;
+import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventNativeSubscriber;
+import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -54,6 +56,7 @@
 
   @Override
   protected void configure() {
+    bind(KafkaEventSubscriber.class).to(KafkaEventNativeSubscriber.class);
 
     bind(ExecutorService.class)
         .annotatedWith(ConsumerExecutor.class)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java
index b1f11f7..4fb98b2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java
@@ -18,8 +18,9 @@
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 
-public class KafkaProducerProvider implements Provider<KafkaProducer<String, String>> {
+public class KafkaProducerProvider implements Provider<Producer<String, String>> {
   private final KafkaProperties properties;
 
   @Inject
@@ -28,7 +29,7 @@
   }
 
   @Override
-  public KafkaProducer<String, String> get() {
+  public Producer<String, String> get() {
     return new KafkaProducer<>(properties);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index 0e851aa..f7105f9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -19,7 +19,6 @@
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaEventsPublisherMetrics;
 import java.util.concurrent.Future;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -30,13 +29,13 @@
 
   private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSession.class);
   private final KafkaProperties properties;
-  private final Provider<KafkaProducer<String, String>> producerProvider;
+  private final Provider<Producer<String, String>> producerProvider;
   private final KafkaEventsPublisherMetrics publisherMetrics;
   private volatile Producer<String, String> producer;
 
   @Inject
   public KafkaSession(
-      Provider<KafkaProducer<String, String>> producerProvider,
+      Provider<Producer<String, String>> producerProvider,
       KafkaProperties properties,
       KafkaEventsPublisherMetrics publisherMetrics) {
     this.producerProvider = producerProvider;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventNativeSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventNativeSubscriber.java
new file mode 100644
index 0000000..a98e098
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventNativeSubscriber.java
@@ -0,0 +1,207 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.kafka.subscribe;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+public class KafkaEventNativeSubscriber implements KafkaEventSubscriber {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int DELAY_RECONNECT_AFTER_FAILURE_MSEC = 1000;
+
+  private final OneOffRequestContext oneOffCtx;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  private final Deserializer<EventMessage> valueDeserializer;
+  private final KafkaSubscriberProperties configuration;
+  private final ExecutorService executor;
+  private final KafkaEventSubscriberMetrics subscriberMetrics;
+  private final KafkaConsumerFactory consumerFactory;
+  private final Deserializer<byte[]> keyDeserializer;
+
+  private java.util.function.Consumer<EventMessage> messageProcessor;
+  private String topic;
+  private AtomicBoolean resetOffset = new AtomicBoolean(false);
+
+  private volatile ReceiverJob receiver;
+
+  @Inject
+  public KafkaEventNativeSubscriber(
+      KafkaSubscriberProperties configuration,
+      KafkaConsumerFactory consumerFactory,
+      Deserializer<byte[]> keyDeserializer,
+      Deserializer<EventMessage> valueDeserializer,
+      OneOffRequestContext oneOffCtx,
+      @ConsumerExecutor ExecutorService executor,
+      KafkaEventSubscriberMetrics subscriberMetrics) {
+
+    this.configuration = configuration;
+    this.oneOffCtx = oneOffCtx;
+    this.executor = executor;
+    this.subscriberMetrics = subscriberMetrics;
+    this.consumerFactory = consumerFactory;
+    this.keyDeserializer = keyDeserializer;
+    this.valueDeserializer = valueDeserializer;
+  }
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#subscribe(java.lang.String, java.util.function.Consumer)
+   */
+  @Override
+  public void subscribe(String topic, java.util.function.Consumer<EventMessage> messageProcessor) {
+    this.topic = topic;
+    this.messageProcessor = messageProcessor;
+    logger.atInfo().log(
+        "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, topic);
+    runReceiver();
+  }
+
+  private void runReceiver() {
+    final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+    try {
+      Thread.currentThread()
+          .setContextClassLoader(KafkaEventNativeSubscriber.class.getClassLoader());
+      Consumer<byte[], byte[]> consumer = consumerFactory.create(keyDeserializer);
+      consumer.subscribe(Collections.singleton(topic));
+      receiver = new ReceiverJob(consumer);
+      executor.execute(receiver);
+    } finally {
+      Thread.currentThread().setContextClassLoader(previousClassLoader);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#shutdown()
+   */
+  @Override
+  public void shutdown() {
+    closed.set(true);
+    receiver.wakeup();
+  }
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#getMessageProcessor()
+   */
+  @Override
+  public java.util.function.Consumer<EventMessage> getMessageProcessor() {
+    return messageProcessor;
+  }
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#getTopic()
+   */
+  @Override
+  public String getTopic() {
+    return topic;
+  }
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#resetOffset()
+   */
+  @Override
+  public void resetOffset() {
+    resetOffset.set(true);
+  }
+
+  private class ReceiverJob implements Runnable {
+    private final Consumer<byte[], byte[]> consumer;
+
+    public ReceiverJob(Consumer<byte[], byte[]> consumer) {
+      this.consumer = consumer;
+    }
+
+    public void wakeup() {
+      consumer.wakeup();
+    }
+
+    @Override
+    public void run() {
+      try {
+        consume();
+      } catch (Exception e) {
+        logger.atSevere().withCause(e).log("Consumer loop of topic %s ended", topic);
+      }
+    }
+
+    private void consume() throws InterruptedException {
+      try {
+        while (!closed.get()) {
+          if (resetOffset.getAndSet(false)) {
+            // Make sure there is an assignment for this consumer
+            while (consumer.assignment().isEmpty() && !closed.get()) {
+              logger.atInfo().log(
+                  "Resetting offset: no partitions assigned to the consumer, request assignment.");
+              consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
+            }
+            consumer.seekToBeginning(consumer.assignment());
+          }
+          ConsumerRecords<byte[], byte[]> consumerRecords =
+              consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
+          consumerRecords.forEach(
+              consumerRecord -> {
+                try (ManualRequestContext ctx = oneOffCtx.open()) {
+                  EventMessage event =
+                      valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
+                  messageProcessor.accept(event);
+                } catch (Exception e) {
+                  logger.atSevere().withCause(e).log(
+                      "Malformed event '%s': [Exception: %s]",
+                      new String(consumerRecord.value(), UTF_8));
+                  subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
+                }
+              });
+        }
+      } catch (WakeupException e) {
+        // Ignore exception if closing
+        if (!closed.get()) {
+          logger.atSevere().withCause(e).log("Consumer loop of topic %s interrupted", topic);
+          reconnectAfterFailure();
+        }
+      } catch (Exception e) {
+        subscriberMetrics.incrementSubscriberFailedToPollMessages();
+        logger.atSevere().withCause(e).log(
+            "Existing consumer loop of topic %s because of a non-recoverable exception", topic);
+        reconnectAfterFailure();
+      } finally {
+        consumer.close();
+      }
+    }
+
+    private void reconnectAfterFailure() throws InterruptedException {
+      // Random delay with average of DELAY_RECONNECT_AFTER_FAILURE_MSEC
+      // for avoiding hammering exactly at the same interval in case of failure
+      long reconnectDelay =
+          DELAY_RECONNECT_AFTER_FAILURE_MSEC / 2
+              + new Random().nextInt(DELAY_RECONNECT_AFTER_FAILURE_MSEC);
+      Thread.sleep(reconnectDelay);
+      runReceiver();
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
index 7ef9d7b..6315dea 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2019 The Android Open Source Project
+// Copyright (C) 2021 The Android Open Source Project
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -11,176 +11,39 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
+
 package com.googlesource.gerrit.plugins.kafka.subscribe;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.server.util.ManualRequestContext;
-import com.google.gerrit.server.util.OneOffRequestContext;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
-import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.Deserializer;
 
-public class KafkaEventSubscriber {
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-  private static final int DELAY_RECONNECT_AFTER_FAILURE_MSEC = 1000;
+/** Generic interface to a Kafka topic subscriber. */
+public interface KafkaEventSubscriber {
 
-  private final OneOffRequestContext oneOffCtx;
-  private final AtomicBoolean closed = new AtomicBoolean(false);
+  /**
+   * Subscribe to a topic and receive messages asynchronously.
+   *
+   * @param topic Kafka topic name
+   * @param messageProcessor consumer function for processing incoming messages
+   */
+  void subscribe(String topic, java.util.function.Consumer<EventMessage> messageProcessor);
 
-  private final Deserializer<EventMessage> valueDeserializer;
-  private final KafkaSubscriberProperties configuration;
-  private final ExecutorService executor;
-  private final KafkaEventSubscriberMetrics subscriberMetrics;
-  private final KafkaConsumerFactory consumerFactory;
-  private final Deserializer<byte[]> keyDeserializer;
+  /** Shutdown Kafka consumer. */
+  void shutdown();
 
-  private java.util.function.Consumer<EventMessage> messageProcessor;
-  private String topic;
-  private AtomicBoolean resetOffset = new AtomicBoolean(false);
+  /**
+   * Returns the current consumer function for the subscribed topic.
+   *
+   * @return the default topic consumer function.
+   */
+  java.util.function.Consumer<EventMessage> getMessageProcessor();
 
-  private volatile ReceiverJob receiver;
+  /**
+   * Returns the current subscribed topic name.
+   *
+   * @return Kafka topic name.
+   */
+  String getTopic();
 
-  @Inject
-  public KafkaEventSubscriber(
-      KafkaSubscriberProperties configuration,
-      KafkaConsumerFactory consumerFactory,
-      Deserializer<byte[]> keyDeserializer,
-      Deserializer<EventMessage> valueDeserializer,
-      OneOffRequestContext oneOffCtx,
-      @ConsumerExecutor ExecutorService executor,
-      KafkaEventSubscriberMetrics subscriberMetrics) {
-
-    this.configuration = configuration;
-    this.oneOffCtx = oneOffCtx;
-    this.executor = executor;
-    this.subscriberMetrics = subscriberMetrics;
-    this.consumerFactory = consumerFactory;
-    this.keyDeserializer = keyDeserializer;
-    this.valueDeserializer = valueDeserializer;
-  }
-
-  public void subscribe(String topic, java.util.function.Consumer<EventMessage> messageProcessor) {
-    this.topic = topic;
-    this.messageProcessor = messageProcessor;
-    logger.atInfo().log(
-        "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, topic);
-    runReceiver();
-  }
-
-  private void runReceiver() {
-    final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
-    try {
-      Thread.currentThread().setContextClassLoader(KafkaEventSubscriber.class.getClassLoader());
-      Consumer<byte[], byte[]> consumer = consumerFactory.create(keyDeserializer);
-      consumer.subscribe(Collections.singleton(topic));
-      receiver = new ReceiverJob(consumer);
-      executor.execute(receiver);
-    } finally {
-      Thread.currentThread().setContextClassLoader(previousClassLoader);
-    }
-  }
-
-  public void shutdown() {
-    closed.set(true);
-    receiver.wakeup();
-  }
-
-  public java.util.function.Consumer<EventMessage> getMessageProcessor() {
-    return messageProcessor;
-  }
-
-  public String getTopic() {
-    return topic;
-  }
-
-  public void resetOffset() {
-    resetOffset.set(true);
-  }
-
-  private class ReceiverJob implements Runnable {
-    private final Consumer<byte[], byte[]> consumer;
-
-    public ReceiverJob(Consumer<byte[], byte[]> consumer) {
-      this.consumer = consumer;
-    }
-
-    public void wakeup() {
-      consumer.wakeup();
-    }
-
-    @Override
-    public void run() {
-      try {
-        consume();
-      } catch (Exception e) {
-        logger.atSevere().withCause(e).log("Consumer loop of topic %s ended", topic);
-      }
-    }
-
-    private void consume() throws InterruptedException {
-      try {
-        while (!closed.get()) {
-          if (resetOffset.getAndSet(false)) {
-            // Make sure there is an assignment for this consumer
-            while (consumer.assignment().isEmpty() && !closed.get()) {
-              logger.atInfo().log(
-                  "Resetting offset: no partitions assigned to the consumer, request assignment.");
-              consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
-            }
-            consumer.seekToBeginning(consumer.assignment());
-          }
-          ConsumerRecords<byte[], byte[]> consumerRecords =
-              consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
-          consumerRecords.forEach(
-              consumerRecord -> {
-                try (ManualRequestContext ctx = oneOffCtx.open()) {
-                  EventMessage event =
-                      valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
-                  messageProcessor.accept(event);
-                } catch (Exception e) {
-                  logger.atSevere().withCause(e).log(
-                      "Malformed event '%s': [Exception: %s]",
-                      new String(consumerRecord.value(), UTF_8));
-                  subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
-                }
-              });
-        }
-      } catch (WakeupException e) {
-        // Ignore exception if closing
-        if (!closed.get()) {
-          logger.atSevere().withCause(e).log("Consumer loop of topic %s interrupted", topic);
-          reconnectAfterFailure();
-        }
-      } catch (Exception e) {
-        subscriberMetrics.incrementSubscriberFailedToPollMessages();
-        logger.atSevere().withCause(e).log(
-            "Existing consumer loop of topic %s because of a non-recoverable exception", topic);
-        reconnectAfterFailure();
-      } finally {
-        consumer.close();
-      }
-    }
-
-    private void reconnectAfterFailure() throws InterruptedException {
-      // Random delay with average of DELAY_RECONNECT_AFTER_FAILURE_MSEC
-      // for avoiding hammering exactly at the same interval in case of failure
-      long reconnectDelay =
-          DELAY_RECONNECT_AFTER_FAILURE_MSEC / 2
-              + new Random().nextInt(DELAY_RECONNECT_AFTER_FAILURE_MSEC);
-      Thread.sleep(reconnectDelay);
-      runReceiver();
-    }
-  }
+  /** Reset the offset for reading incoming Kafka messages of the topic. */
+  void resetOffset();
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 5a85b81..af0d53f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -45,7 +45,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -103,8 +103,7 @@
           new KafkaSubscriberProperties(
               TEST_POLLING_INTERVAL_MSEC, TEST_GROUP_ID, TEST_NUM_SUBSCRIBERS);
       bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
-      bind(new TypeLiteral<KafkaProducer<String, String>>() {})
-          .toProvider(KafkaProducerProvider.class);
+      bind(new TypeLiteral<Producer<String, String>>() {}).toProvider(KafkaProducerProvider.class);
 
       bind(WorkQueue.class).to(TestWorkQueue.class);
     }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
index 46eaa73..ccf60ce 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
@@ -25,7 +25,7 @@
 import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
 import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Before;
@@ -39,7 +39,7 @@
 @RunWith(MockitoJUnitRunner.class)
 public class KafkaSessionTest {
   KafkaSession objectUnderTest;
-  @Mock KafkaProducer<String, String> kafkaProducer;
+  @Mock Producer<String, String> kafkaProducer;
   @Mock KafkaProducerProvider producerProvider;
   @Mock KafkaProperties properties;
   @Mock KafkaEventsPublisherMetrics publisherMetrics;