Re-run consumer when exited because of an error
When a Kafka consumer is ending because of an error, re-run
the subscription so that the it can smoothly recover from
temporary failures.
Test explicitly the use-case of reconnect after a kafka failure.
Bug: Issue 12463
Change-Id: Id061475197d3a481d3fb3bd1210936caee64119c
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index 7bb1f4f..2adf28f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.kafka.config;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Strings;
import com.google.gerrit.extensions.annotations.PluginName;
@@ -44,6 +45,14 @@
initDockerizedKafkaServer();
}
+ @VisibleForTesting
+ public KafkaProperties() {
+ super();
+ setDefaults();
+ topic = "gerrit";
+ initDockerizedKafkaServer();
+ }
+
private void setDefaults() {
put("acks", "all");
put("retries", 0);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
index c3fd917..ed1b44b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.kafka.config;
+import com.google.common.annotations.VisibleForTesting;
import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.server.config.PluginConfigFactory;
import com.google.inject.Inject;
@@ -39,6 +40,13 @@
Integer.parseInt(getProperty("number.of.subscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS));
}
+ @VisibleForTesting
+ public KafkaSubscriberProperties(int pollingInterval, String groupId, int numberOfSubscribers) {
+ this.pollingInterval = pollingInterval;
+ this.groupId = groupId;
+ this.numberOfSubscribers = numberOfSubscribers;
+ }
+
public Integer getPollingInterval() {
return pollingInterval;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
index 0e6a353..40415a4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -24,6 +24,7 @@
import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
import java.time.Duration;
import java.util.Collections;
+import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
@@ -33,8 +34,8 @@
public class KafkaEventSubscriber {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private static final int DELAY_RECONNECT_AFTER_FAILURE_MSEC = 1000;
- private final Consumer<byte[], byte[]> consumer;
private final OneOffRequestContext oneOffCtx;
private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -42,12 +43,15 @@
private final KafkaSubscriberProperties configuration;
private final ExecutorService executor;
private final KafkaEventSubscriberMetrics subscriberMetrics;
+ private final KafkaConsumerFactory consumerFactory;
+ private final Deserializer<byte[]> keyDeserializer;
private java.util.function.Consumer<EventMessage> messageProcessor;
-
private String topic;
private AtomicBoolean resetOffset = new AtomicBoolean(false);
+ private volatile ReceiverJob receiver;
+
@Inject
public KafkaEventSubscriber(
KafkaSubscriberProperties configuration,
@@ -62,14 +66,8 @@
this.oneOffCtx = oneOffCtx;
this.executor = executor;
this.subscriberMetrics = subscriberMetrics;
-
- final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(KafkaEventSubscriber.class.getClassLoader());
- this.consumer = consumerFactory.create(keyDeserializer);
- } finally {
- Thread.currentThread().setContextClassLoader(previousClassLoader);
- }
+ this.consumerFactory = consumerFactory;
+ this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
}
@@ -78,13 +76,25 @@
this.messageProcessor = messageProcessor;
logger.atInfo().log(
"Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, topic);
- consumer.subscribe(Collections.singleton(topic));
- executor.execute(new ReceiverJob());
+ runReceiver();
+ }
+
+ private void runReceiver() {
+ final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(KafkaEventSubscriber.class.getClassLoader());
+ Consumer<byte[], byte[]> consumer = consumerFactory.create(keyDeserializer);
+ consumer.subscribe(Collections.singleton(topic));
+ receiver = new ReceiverJob(consumer);
+ executor.execute(receiver);
+ } finally {
+ Thread.currentThread().setContextClassLoader(previousClassLoader);
+ }
}
public void shutdown() {
closed.set(true);
- consumer.wakeup();
+ receiver.wakeup();
}
public java.util.function.Consumer<EventMessage> getMessageProcessor() {
@@ -100,13 +110,26 @@
}
private class ReceiverJob implements Runnable {
+ private final Consumer<byte[], byte[]> consumer;
+
+ public ReceiverJob(Consumer<byte[], byte[]> consumer) {
+ this.consumer = consumer;
+ }
+
+ public void wakeup() {
+ consumer.wakeup();
+ }
@Override
public void run() {
- consume();
+ try {
+ consume();
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log("Consumer loop of topic %s ended", topic);
+ }
}
- private void consume() {
+ private void consume() throws InterruptedException {
try {
while (!closed.get()) {
if (resetOffset.getAndSet(false)) {
@@ -132,14 +155,26 @@
// Ignore exception if closing
if (!closed.get()) {
logger.atSevere().withCause(e).log("Consumer loop of topic %s interrupted", topic);
+ reconnectAfterFailure();
}
} catch (Exception e) {
subscriberMetrics.incrementSubscriberFailedToPollMessages();
logger.atSevere().withCause(e).log(
"Existing consumer loop of topic %s because of a non-recoverable exception", topic);
+ reconnectAfterFailure();
} finally {
consumer.close();
}
}
+
+ private void reconnectAfterFailure() throws InterruptedException {
+ // Random delay with average of DELAY_RECONNECT_AFTER_FAILURE_MSEC
+ // for avoiding hammering exactly at the same interval in case of failure
+ long reconnectDelay =
+ DELAY_RECONNECT_AFTER_FAILURE_MSEC / 2
+ + new Random().nextInt(DELAY_RECONNECT_AFTER_FAILURE_MSEC);
+ Thread.sleep(reconnectDelay);
+ runReceiver();
+ }
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
new file mode 100644
index 0000000..9078d05
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -0,0 +1,188 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.api;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.util.IdGenerator;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gson.Gson;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.testcontainers.containers.KafkaContainer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaBrokerApiTest {
+ private static KafkaContainer kafka;
+ private static Injector injector;
+ private static KafkaSession session;
+ private static Gson gson;
+
+ private static final int TEST_NUM_SUBSCRIBERS = 1;
+ private static final String TEST_GROUP_ID = KafkaBrokerApiTest.class.getName();
+ private static final int TEST_POLLING_INTERVAL_MSEC = 100;
+ private static final int TEST_THREAD_POOL_SIZE = 10;
+ private static final UUID TEST_INSTANCE_ID = UUID.randomUUID();
+ private static final TimeUnit TEST_TIMOUT_UNIT = TimeUnit.SECONDS;
+ private static final int TEST_TIMEOUT = 30;
+
+ public static class TestKafkaContainer extends KafkaContainer {
+ public TestKafkaContainer() {
+ addFixedExposedPort(KAFKA_PORT, KAFKA_PORT);
+ addFixedExposedPort(ZOOKEEPER_PORT, ZOOKEEPER_PORT);
+ }
+
+ @Override
+ public String getBootstrapServers() {
+ return String.format("PLAINTEXT://%s:%s", getContainerIpAddress(), KAFKA_PORT);
+ }
+ }
+
+ public static class TestWorkQueue extends WorkQueue {
+
+ @Inject
+ public TestWorkQueue(IdGenerator idGenerator, MetricMaker metrics) {
+ super(idGenerator, TEST_THREAD_POOL_SIZE, metrics);
+ }
+ }
+
+ public static class TestModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class);
+ bind(MetricMaker.class).toInstance(mock(MetricMaker.class, Answers.RETURNS_DEEP_STUBS));
+ bind(OneOffRequestContext.class)
+ .toInstance(mock(OneOffRequestContext.class, Answers.RETURNS_DEEP_STUBS));
+
+ KafkaProperties kafkaProperties = new KafkaProperties();
+ bind(KafkaProperties.class).toInstance(kafkaProperties);
+ bind(KafkaSession.class).in(Scopes.SINGLETON);
+ KafkaSubscriberProperties kafkaSubscriberProperties =
+ new KafkaSubscriberProperties(
+ TEST_POLLING_INTERVAL_MSEC, TEST_GROUP_ID, TEST_NUM_SUBSCRIBERS);
+ bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
+ bind(WorkQueue.class).to(TestWorkQueue.class);
+ }
+ }
+
+ public static class TestConsumer implements Consumer<EventMessage> {
+ public final List<EventMessage> messages = new ArrayList<>();
+ private final CountDownLatch lock;
+
+ public TestConsumer(int numMessagesExpected) {
+ lock = new CountDownLatch(numMessagesExpected);
+ }
+
+ @Override
+ public void accept(EventMessage message) {
+ messages.add(message);
+ lock.countDown();
+ }
+
+ public boolean await() {
+ try {
+ return lock.await(TEST_TIMEOUT, TEST_TIMOUT_UNIT);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+ }
+
+ public static class TestHeader extends Header {
+
+ public TestHeader() {
+ super(UUID.randomUUID(), TEST_INSTANCE_ID);
+ }
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ kafka = new TestKafkaContainer();
+ kafka.start();
+ System.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
+
+ Injector baseInjector = Guice.createInjector(new TestModule());
+ WorkQueue testWorkQueue = baseInjector.getInstance(WorkQueue.class);
+ KafkaSubscriberProperties kafkaSubscriberProperties =
+ baseInjector.getInstance(KafkaSubscriberProperties.class);
+ injector =
+ baseInjector.createChildInjector(
+ new KafkaApiModule(testWorkQueue, kafkaSubscriberProperties));
+ session = injector.getInstance(KafkaSession.class);
+ gson = injector.getInstance(Gson.class);
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ if (kafka != null) {
+ kafka.stop();
+ }
+ }
+
+ @Before
+ public void setup() {
+ session.connect();
+ }
+
+ @After
+ public void teardown() {
+ session.disconnect();
+ }
+
+ @Test
+ public void shouldSendAndReceiveToTopic() {
+ KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+ String testTopic = "test_topic";
+ TestConsumer testConsumer = new TestConsumer(1);
+ EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent());
+
+ kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
+ kafkaBrokerApi.send(testTopic, testEventMessage);
+
+ assertThat(testConsumer.await()).isTrue();
+ assertThat(testConsumer.messages).hasSize(1);
+ assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
+ }
+}