Fix shutdown of consumers, NatsBrokerApi and the NATS connection
Do the shutdown in the following steps to implement graceful shutdown:
- drain connection, this drains all consumers stopping incoming
messages
- disconnect the broker API, unsubscribing consumers and stopping
dispatchers
- close the connection
diff --git a/src/main/java/com/googlesource/gerrit/plugins/nats/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/nats/Configuration.java
index b05ecdd..838f9ab 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/nats/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/nats/Configuration.java
@@ -79,7 +79,7 @@
.maxAckPending(pluginConfig.getLong("maxAckPending", 1000L))
.deliverPolicy(DeliverPolicy.All)
.buildPushSubscribeOptions();
- shutdownTimeoutMs = pluginConfig.getInt("shutdownTimeoutMs", 1000);
+ shutdownTimeoutMs = pluginConfig.getInt("shutdownTimeoutMs", 30000);
logger.atInfo().log(
"NATS client configuration: sendStreamEvents: %b, streamEventsSubject: %s, natsOptions: %s, publishOptions: %s",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/nats/NatsBrokerLifeCycleManager.java b/src/main/java/com/googlesource/gerrit/plugins/nats/NatsBrokerLifeCycleManager.java
index 7c286e0..bfd8512 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/nats/NatsBrokerLifeCycleManager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/nats/NatsBrokerLifeCycleManager.java
@@ -20,6 +20,7 @@
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import io.nats.client.Connection;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.api.StorageType;
@@ -27,8 +28,13 @@
import io.nats.client.api.StreamInfo;
import java.io.IOException;
import java.security.ProviderException;
+import java.time.Duration;
import java.time.format.DateTimeFormatter;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
@Singleton
@@ -36,6 +42,7 @@
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final Configuration config;
+ private final Connection connection;
private final Set<TopicSubscriber> consumers;
private final BrokerApi brokerApi;
private JetStreamManagement jetStreamManagement;
@@ -44,12 +51,14 @@
public NatsBrokerLifeCycleManager(
Configuration config,
JetStreamManagement jetStreamManagement,
- Set<TopicSubscriber> consumers,
- BrokerApi brokerApi) {
+ Connection connection,
+ BrokerApi brokerApi,
+ Set<TopicSubscriber> consumers) {
this.config = config;
this.jetStreamManagement = jetStreamManagement;
- this.consumers = consumers;
+ this.connection = connection;
this.brokerApi = brokerApi;
+ this.consumers = consumers;
}
@Override
@@ -96,6 +105,29 @@
@Override
public void stop() {
+ drainConnection();
brokerApi.disconnect();
+ try {
+ connection.close();
+ } catch (InterruptedException e) {
+ logger.at(Level.SEVERE).withCause(e).log(
+ "NATS broker - stopping connection failed with error");
+ }
+ }
+
+ private void drainConnection() {
+ try {
+ int shutdownTimeoutMs = config.getShutdownTimeoutMs();
+ Duration shutdownTimeout = Duration.ofMillis(shutdownTimeoutMs);
+ logger.atInfo().log(
+ "NATS consumer - Waiting up to '%s' milliseconds to drain connection to stream '%s'",
+ shutdownTimeoutMs, config.getStreamName());
+ CompletableFuture<Boolean> f = connection.drain(shutdownTimeout);
+ f.get(shutdownTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ logger.at(Level.WARNING).withCause(e).log("NATS broker - graceful shutdown failed with timeout");
+ } catch (InterruptedException | ExecutionException e) {
+ logger.at(Level.SEVERE).withCause(e).log("NATS broker - graceful shutdown failed with error");
+ }
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/nats/NatsConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/nats/NatsConsumer.java
index 52cf721..918a7ac 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/nats/NatsConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/nats/NatsConsumer.java
@@ -32,12 +32,10 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.ProviderException;
-import java.time.Duration;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import java.util.logging.Level;
// import software.amazon.kinesis.coordinator.Scheduler;
@@ -50,7 +48,7 @@
private final Configuration config;
private java.util.function.Consumer<Event> messageProcessor;
- private String streamName;
+ private String subjectName;
private AtomicBoolean resetOffset = new AtomicBoolean(false);
private final Connection connection;
private final JetStream jetStream;
@@ -61,25 +59,26 @@
@Inject
public NatsConsumer(
+ @EventGson Gson gson,
Configuration config,
Connection connection,
JetStream jetStream,
- JetStreamManagement jetStreamManagement,
- @EventGson Gson gson) {
+ JetStreamManagement jetStreamManagement) {
+ this.gson = gson;
this.config = config;
this.connection = connection;
- this.dispatcher = connection.createDispatcher();
this.jetStream = jetStream;
this.jetStreamManagement = jetStreamManagement;
- this.gson = gson;
+ // create one dispatcher per consumer
+ this.dispatcher = connection.createDispatcher();
}
- public void subscribe(String streamName, java.util.function.Consumer<Event> messageProcessor) {
- this.streamName = streamName;
+ public void subscribe(String subjectName, java.util.function.Consumer<Event> messageProcessor) {
+ this.subjectName = subjectName;
this.messageProcessor = messageProcessor;
- String subject = getOrCreateSubject(streamName);
- logger.atInfo().log("NATS consumer - subscribe to subject [%s]", streamName);
- runReceiver(messageProcessor, subject);
+ String subject = getOrCreateSubject(subjectName);
+ subscription = runReceiver(messageProcessor, subject);
+ logger.atInfo().log("NATS consumer - subscribed to subject [%s]", subjectName);
}
private String getOrCreateSubject(String name) {
@@ -107,49 +106,43 @@
name, sc.getName(), subjects);
}
- private void runReceiver(java.util.function.Consumer<Event> messageProcessor, String subject) {
+ private JetStreamSubscription runReceiver(
+ java.util.function.Consumer<Event> messageProcessor, String subject) {
try {
- this.subscription =
- jetStream.subscribe(
- subject,
- dispatcher,
- new MessageHandler() {
+ return jetStream.subscribe(
+ subject,
+ dispatcher,
+ new MessageHandler() {
- @Override
- public void onMessage(Message msg) throws InterruptedException {
- String json = new String(msg.getData(), StandardCharsets.UTF_8);
- Event e = gson.fromJson(json, Event.class);
- messageProcessor.accept(e);
- msg.ack();
- logger.atFine().log("NATS consumer - consumed and acked event '%s'", e);
- }
- },
- false);
+ @Override
+ public void onMessage(Message msg) throws InterruptedException {
+ String json = new String(msg.getData(), StandardCharsets.UTF_8);
+ Event e = gson.fromJson(json, Event.class);
+ messageProcessor.accept(e);
+ msg.ack();
+ logger.atFine().log("NATS consumer - consumed and acked event '%s'", e);
+ }
+ },
+ false);
} catch (IOException | JetStreamApiException e) {
logger.atSevere().withCause(e).log(
"NATS consumer - subscribing to subject [%s] failed", subject);
+ return null;
}
}
public void shutdown() {
try {
- int shutdownTimeoutMs = config.getShutdownTimeoutMs();
- Duration shutdownTimeout = Duration.ofMillis(shutdownTimeoutMs);
- logger.atInfo().log(
- "NATS consumer - Waiting up to '%s' milliseconds to complete shutdown of consumer of stream '%s'",
- shutdownTimeoutMs, getStreamName());
+ subscription.unsubscribe();
connection.closeDispatcher(dispatcher);
- CompletableFuture<Boolean> f = connection.drain(shutdownTimeout);
- connection.flush(shutdownTimeout);
- f.get(shutdownTimeoutMs, TimeUnit.MILLISECONDS);
- connection.close();
+ logger.at(Level.INFO).log(
+ "NATS consumer - consumer for subject '%s' was shutdown", subjectName);
} catch (Exception e) {
logger.atSevere().withCause(e).log(
- "NATS consumer - error caught when shutting down consumer for stream %s",
+ "NATS consumer - error caught when shutting down consumer for subject %s",
getStreamName());
}
- logger.atInfo().log(
- "NATS consumer - shutdown consumer of stream %s completed.", getStreamName());
+ logger.atInfo().log("NATS consumer - shutdown consumer of subject %s completed.", subjectName);
}
public java.util.function.Consumer<Event> getMessageProcessor() {
@@ -157,7 +150,8 @@
}
public String getStreamName() {
- return streamName;
+ // we map BrokerApi streams to NATS subjects
+ return subjectName;
}
public void resetOffset() {