Fix shutdown of consumers, NatsBrokerApi and the NATS connection

Do the shutdown in the following steps to implement graceful shutdown:
- drain connection, this drains all consumers stopping incoming
  messages
- disconnect the broker API, unsubscribing consumers and stopping
  dispatchers
- close the connection
diff --git a/src/main/java/com/googlesource/gerrit/plugins/nats/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/nats/Configuration.java
index b05ecdd..838f9ab 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/nats/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/nats/Configuration.java
@@ -79,7 +79,7 @@
             .maxAckPending(pluginConfig.getLong("maxAckPending", 1000L))
             .deliverPolicy(DeliverPolicy.All)
             .buildPushSubscribeOptions();
-    shutdownTimeoutMs = pluginConfig.getInt("shutdownTimeoutMs", 1000);
+    shutdownTimeoutMs = pluginConfig.getInt("shutdownTimeoutMs", 30000);
 
     logger.atInfo().log(
         "NATS client configuration: sendStreamEvents: %b, streamEventsSubject: %s, natsOptions: %s, publishOptions: %s",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/nats/NatsBrokerLifeCycleManager.java b/src/main/java/com/googlesource/gerrit/plugins/nats/NatsBrokerLifeCycleManager.java
index 7c286e0..bfd8512 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/nats/NatsBrokerLifeCycleManager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/nats/NatsBrokerLifeCycleManager.java
@@ -20,6 +20,7 @@
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import io.nats.client.Connection;
 import io.nats.client.JetStreamApiException;
 import io.nats.client.JetStreamManagement;
 import io.nats.client.api.StorageType;
@@ -27,8 +28,13 @@
 import io.nats.client.api.StreamInfo;
 import java.io.IOException;
 import java.security.ProviderException;
+import java.time.Duration;
 import java.time.format.DateTimeFormatter;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.logging.Level;
 
 @Singleton
@@ -36,6 +42,7 @@
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   private final Configuration config;
+  private final Connection connection;
   private final Set<TopicSubscriber> consumers;
   private final BrokerApi brokerApi;
   private JetStreamManagement jetStreamManagement;
@@ -44,12 +51,14 @@
   public NatsBrokerLifeCycleManager(
       Configuration config,
       JetStreamManagement jetStreamManagement,
-      Set<TopicSubscriber> consumers,
-      BrokerApi brokerApi) {
+      Connection connection,
+      BrokerApi brokerApi,
+      Set<TopicSubscriber> consumers) {
     this.config = config;
     this.jetStreamManagement = jetStreamManagement;
-    this.consumers = consumers;
+    this.connection = connection;
     this.brokerApi = brokerApi;
+    this.consumers = consumers;
   }
 
   @Override
@@ -96,6 +105,29 @@
 
   @Override
   public void stop() {
+    drainConnection();
     brokerApi.disconnect();
+    try {
+      connection.close();
+    } catch (InterruptedException e) {
+      logger.at(Level.SEVERE).withCause(e).log(
+          "NATS broker - stopping connection failed with error");
+    }
+  }
+
+  private void drainConnection() {
+    try {
+      int shutdownTimeoutMs = config.getShutdownTimeoutMs();
+      Duration shutdownTimeout = Duration.ofMillis(shutdownTimeoutMs);
+      logger.atInfo().log(
+          "NATS consumer - Waiting up to '%s' milliseconds to drain connection to stream '%s'",
+          shutdownTimeoutMs, config.getStreamName());
+      CompletableFuture<Boolean> f = connection.drain(shutdownTimeout);
+      f.get(shutdownTimeoutMs, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException e) {
+      logger.at(Level.WARNING).withCause(e).log("NATS broker - graceful shutdown failed with timeout");
+    } catch (InterruptedException | ExecutionException e) {
+      logger.at(Level.SEVERE).withCause(e).log("NATS broker - graceful shutdown failed with error");
+    }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/nats/NatsConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/nats/NatsConsumer.java
index 52cf721..918a7ac 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/nats/NatsConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/nats/NatsConsumer.java
@@ -32,12 +32,10 @@
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.security.ProviderException;
-import java.time.Duration;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
+import java.util.logging.Level;
 
 // import software.amazon.kinesis.coordinator.Scheduler;
 
@@ -50,7 +48,7 @@
   private final Configuration config;
 
   private java.util.function.Consumer<Event> messageProcessor;
-  private String streamName;
+  private String subjectName;
   private AtomicBoolean resetOffset = new AtomicBoolean(false);
   private final Connection connection;
   private final JetStream jetStream;
@@ -61,25 +59,26 @@
 
   @Inject
   public NatsConsumer(
+      @EventGson Gson gson,
       Configuration config,
       Connection connection,
       JetStream jetStream,
-      JetStreamManagement jetStreamManagement,
-      @EventGson Gson gson) {
+      JetStreamManagement jetStreamManagement) {
+    this.gson = gson;
     this.config = config;
     this.connection = connection;
-    this.dispatcher = connection.createDispatcher();
     this.jetStream = jetStream;
     this.jetStreamManagement = jetStreamManagement;
-    this.gson = gson;
+    // create one dispatcher per consumer
+    this.dispatcher = connection.createDispatcher();
   }
 
-  public void subscribe(String streamName, java.util.function.Consumer<Event> messageProcessor) {
-    this.streamName = streamName;
+  public void subscribe(String subjectName, java.util.function.Consumer<Event> messageProcessor) {
+    this.subjectName = subjectName;
     this.messageProcessor = messageProcessor;
-    String subject = getOrCreateSubject(streamName);
-    logger.atInfo().log("NATS consumer - subscribe to subject [%s]", streamName);
-    runReceiver(messageProcessor, subject);
+    String subject = getOrCreateSubject(subjectName);
+    subscription = runReceiver(messageProcessor, subject);
+    logger.atInfo().log("NATS consumer - subscribed to subject [%s]", subjectName);
   }
 
   private String getOrCreateSubject(String name) {
@@ -107,49 +106,43 @@
         name, sc.getName(), subjects);
   }
 
-  private void runReceiver(java.util.function.Consumer<Event> messageProcessor, String subject) {
+  private JetStreamSubscription runReceiver(
+      java.util.function.Consumer<Event> messageProcessor, String subject) {
     try {
-      this.subscription =
-          jetStream.subscribe(
-              subject,
-              dispatcher,
-              new MessageHandler() {
+      return jetStream.subscribe(
+          subject,
+          dispatcher,
+          new MessageHandler() {
 
-                @Override
-                public void onMessage(Message msg) throws InterruptedException {
-                  String json = new String(msg.getData(), StandardCharsets.UTF_8);
-                  Event e = gson.fromJson(json, Event.class);
-                  messageProcessor.accept(e);
-                  msg.ack();
-                  logger.atFine().log("NATS consumer - consumed and acked event '%s'", e);
-                }
-              },
-              false);
+            @Override
+            public void onMessage(Message msg) throws InterruptedException {
+              String json = new String(msg.getData(), StandardCharsets.UTF_8);
+              Event e = gson.fromJson(json, Event.class);
+              messageProcessor.accept(e);
+              msg.ack();
+              logger.atFine().log("NATS consumer - consumed and acked event '%s'", e);
+            }
+          },
+          false);
     } catch (IOException | JetStreamApiException e) {
       logger.atSevere().withCause(e).log(
           "NATS consumer - subscribing to subject [%s] failed", subject);
+      return null;
     }
   }
 
   public void shutdown() {
     try {
-      int shutdownTimeoutMs = config.getShutdownTimeoutMs();
-      Duration shutdownTimeout = Duration.ofMillis(shutdownTimeoutMs);
-      logger.atInfo().log(
-          "NATS consumer - Waiting up to '%s' milliseconds to complete shutdown of consumer of stream '%s'",
-          shutdownTimeoutMs, getStreamName());
+      subscription.unsubscribe();
       connection.closeDispatcher(dispatcher);
-      CompletableFuture<Boolean> f = connection.drain(shutdownTimeout);
-      connection.flush(shutdownTimeout);
-      f.get(shutdownTimeoutMs, TimeUnit.MILLISECONDS);
-      connection.close();
+      logger.at(Level.INFO).log(
+          "NATS consumer - consumer for subject '%s' was shutdown", subjectName);
     } catch (Exception e) {
       logger.atSevere().withCause(e).log(
-          "NATS consumer - error caught when shutting down consumer for stream %s",
+          "NATS consumer - error caught when shutting down consumer for subject %s",
           getStreamName());
     }
-    logger.atInfo().log(
-        "NATS consumer - shutdown consumer of stream %s completed.", getStreamName());
+    logger.atInfo().log("NATS consumer - shutdown consumer of subject %s completed.", subjectName);
   }
 
   public java.util.function.Consumer<Event> getMessageProcessor() {
@@ -157,7 +150,8 @@
   }
 
   public String getStreamName() {
-    return streamName;
+    // we map BrokerApi streams to NATS subjects
+    return subjectName;
   }
 
   public void resetOffset() {