diff --git a/BUILD b/BUILD
index 2aa6678..1f6cdfa 100644
--- a/BUILD
+++ b/BUILD
@@ -19,6 +19,7 @@
     ],
     resources = glob(["src/main/resources/**/*"]),
     deps = [
+        ":events-broker-neverlink",
         "@amazon-auth//jar",
         "@amazon-aws-core//jar",
         "@amazon-cloudwatch//jar",
@@ -43,7 +44,6 @@
         "@awssdk-query-protocol//jar",
         "@commons-codec//jar",
         "@commons-lang//jar",
-        "@events-broker//jar:neverlink",
         "@io-netty-all//jar",
         "@jackson-annotations//jar",
         "@jackson-core//jar",
@@ -63,11 +63,11 @@
     tags = ["events-aws-kinesis"],
     deps = [
         ":events-aws-kinesis__plugin_test_deps",
+        "//plugins/events-broker",
         "@amazon-http-client-spi//jar",
         "@amazon-kinesis-client//jar",
         "@amazon-kinesis//jar",
         "@awssdk-kinesis-producer//jar",
-        "@events-broker//jar",
     ],
 )
 
@@ -96,3 +96,9 @@
         "@testcontainer-localstack//jar",
     ],
 )
+
+java_library(
+    name = "events-broker-neverlink",
+    neverlink = 1,
+    exports = ["//plugins/events-broker"],
+)
diff --git a/Jenkinsfile b/Jenkinsfile
index a32db2b..b7f3090 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -1,2 +1,3 @@
 pluginPipeline(formatCheckId: 'gerritforge:plugins-events-aws-kinesis-code-style',
-                buildCheckId: 'gerritforge:plugins-events-aws-kinesis-build-test')
+                buildCheckId: 'gerritforge:plugins-events-aws-kinesis-build-test',
+                extraModules: ['events-broker'])
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 253b5a1..65abd99 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -120,12 +120,6 @@
     )
 
     maven_jar(
-        name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.6.0-rc3",
-        sha1 = "cb398afa4f76367be5c62b99a7ffce74ae1d3d8b",
-    )
-
-    maven_jar(
         name = "io-netty-all",
         artifact = "io.netty:netty-all:4.1.51.Final",
         sha1 = "5e5f741acc4c211ac4572c31c7e5277ec465e4e4",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
index 307b130..89476cf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
@@ -52,6 +52,8 @@
   static final Long DEFAULT_POLLING_INTERVAL_MS = 1000L;
   static final Integer DEFAULT_MAX_RECORDS = 100;
   static final Long DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS = 6000L;
+  static final Long DEFAULT_PUBLISH_RECORD_MAX_BUFFERED_TIME_MS = 100L;
+  static final Long DEFAULT_CONSUMER_FAILOVER_TIME_MS = 10000L;
   static final Long DEFAULT_PUBLISH_TIMEOUT_MS = 6000L;
   static final Long DEFAULT_SHUTDOWN_TIMEOUT_MS = 20000L;
   static final Level DEFAULT_AWS_LIB_LOG_LEVEL = Level.WARN;
@@ -74,6 +76,9 @@
   private final Level awsLibLogLevel;
   private final Boolean sendAsync;
   private final Boolean sendStreamEvents;
+  private final Optional<String> awsConfigurationProfileName;
+  private final Long publishRecordMaxBufferedTimeMs;
+  private final Long consumerFailoverTimeInMs;
 
   @Inject
   public Configuration(PluginConfigFactory configFactory, @PluginName String pluginName) {
@@ -116,6 +121,16 @@
             .map(Long::parseLong)
             .orElse(DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS);
 
+    this.publishRecordMaxBufferedTimeMs =
+        Optional.ofNullable(getStringParam(pluginConfig, "recordMaxBufferedTimeMs", null))
+            .map(Long::parseLong)
+            .orElse(DEFAULT_PUBLISH_RECORD_MAX_BUFFERED_TIME_MS);
+
+    this.consumerFailoverTimeInMs =
+        Optional.ofNullable(getStringParam(pluginConfig, "consumerFailoverTimeInMs", null))
+            .map(Long::parseLong)
+            .orElse(DEFAULT_CONSUMER_FAILOVER_TIME_MS);
+
     this.publishTimeoutMs =
         Optional.ofNullable(getStringParam(pluginConfig, PUBLISH_TIMEOUT_MS_FIELD, null))
             .map(Long::parseLong)
@@ -141,13 +156,17 @@
             .map(Boolean::new)
             .orElse(DEFAULT_SEND_ASYNC);
 
+    this.awsConfigurationProfileName =
+        Optional.ofNullable(getStringParam(pluginConfig, "profileName", null));
+
     logger.atInfo().log(
-        "Kinesis client. Application:'%s'|PollingInterval: %s|maxRecords: %s%s%s",
+        "Kinesis client. Application:'%s'|PollingInterval: %s|maxRecords: %s%s%s%s",
         applicationName,
         pollingIntervalMs,
         maxRecords,
         region.map(r -> String.format("|region: %s", r.id())).orElse(""),
-        endpoint.map(e -> String.format("|endpoint: %s", e.toASCIIString())).orElse(""));
+        endpoint.map(e -> String.format("|endpoint: %s", e.toASCIIString())).orElse(""),
+        awsConfigurationProfileName.map(p -> String.format("|profile: %s", p)).orElse(""));
   }
 
   public String getStreamEventsTopic() {
@@ -178,6 +197,14 @@
     return publishSingleRequestTimeoutMs;
   }
 
+  public Long getPublishRecordMaxBufferedTimeMs() {
+    return publishRecordMaxBufferedTimeMs;
+  }
+
+  public long getConsumerFailoverTimeInMs() {
+    return consumerFailoverTimeInMs;
+  }
+
   public Long getPollingIntervalMs() {
     return pollingIntervalMs;
   }
@@ -190,6 +217,10 @@
     return initialPosition;
   }
 
+  public Optional<String> getAwsConfigurationProfileName() {
+    return awsConfigurationProfileName;
+  }
+
   private static String getStringParam(
       PluginConfig pluginConfig, String name, String defaultValue) {
     return Strings.isNullOrEmpty(System.getProperty(name))
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/DynamoDbAsyncClientProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/DynamoDbAsyncClientProvider.java
index f3812df..d55dab2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/DynamoDbAsyncClientProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/DynamoDbAsyncClientProvider.java
@@ -17,6 +17,7 @@
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
 
@@ -35,6 +36,11 @@
     configuration.getRegion().ifPresent(builder::region);
     configuration.getEndpoint().ifPresent(builder::endpointOverride);
 
+    configuration
+        .getAwsConfigurationProfileName()
+        .ifPresent(
+            profile -> builder.credentialsProvider(ProfileCredentialsProvider.create(profile)));
+
     return builder.build();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisAsyncClientProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisAsyncClientProvider.java
index c983d60..681bb0a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisAsyncClientProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisAsyncClientProvider.java
@@ -17,6 +17,7 @@
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
 import software.amazon.kinesis.common.KinesisClientUtil;
@@ -35,7 +36,10 @@
     KinesisAsyncClientBuilder builder = KinesisAsyncClient.builder();
     configuration.getRegion().ifPresent(builder::region);
     configuration.getEndpoint().ifPresent(builder::endpointOverride);
-
+    configuration
+        .getAwsConfigurationProfileName()
+        .ifPresent(
+            profile -> builder.credentialsProvider(ProfileCredentialsProvider.create(profile)));
     return KinesisClientUtil.createKinesisAsyncClient(builder);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java
index 5c5dc52..b13bd0c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.kinesis;
 
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
 import com.amazonaws.services.kinesis.producer.KinesisProducer;
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 import com.google.common.flogger.FluentLogger;
@@ -40,11 +41,15 @@
         new KinesisProducerConfiguration()
             .setAggregationEnabled(false)
             .setMaxConnections(1)
-            .setRequestTimeout(configuration.getPublishSingleRequestTimeoutMs());
+            .setRequestTimeout(configuration.getPublishSingleRequestTimeoutMs())
+            .setRecordMaxBufferedTime(configuration.getPublishRecordMaxBufferedTimeMs());
 
     conf.setRegion(configuration.getRegion().orElseGet(regionProvider::getRegion).toString());
 
     configuration
+        .getAwsConfigurationProfileName()
+        .ifPresent(profile -> conf.setCredentialsProvider(new ProfileCredentialsProvider(profile)));
+    configuration
         .getEndpoint()
         .ifPresent(
             uri ->
@@ -54,12 +59,16 @@
                     .setCloudwatchPort(uri.getPort())
                     .setVerifyCertificate(false));
     logger.atInfo().log(
-        "Kinesis producer configured. Request Timeout (ms):'%s'%s%s",
+        "Kinesis producer configured. Request Timeout (ms):'%s'%s%s%s",
         configuration.getPublishSingleRequestTimeoutMs(),
         String.format("|region: '%s'", conf.getRegion()),
         configuration
             .getEndpoint()
             .map(e -> String.format("|endpoint: '%s'", e.toASCIIString()))
+            .orElse(""),
+        configuration
+            .getAwsConfigurationProfileName()
+            .map(p -> String.format("|profile: '%s'", p))
             .orElse(""));
 
     return new KinesisProducer(conf);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
index 598aa6c..b794aaf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
@@ -122,7 +122,7 @@
   }
 
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
-    logger.atInfo().log("Checkpointing shard: " + kinesisShardId);
+    logger.atInfo().log("Checkpointing shard: %s", kinesisShardId);
     try {
       checkpointer.checkpoint();
     } catch (ShutdownException se) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
index 4c59b49..09083df 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
@@ -67,6 +67,9 @@
             cloudWatchAsyncClient,
             String.format("klc-worker-%s-%s", configuration.getApplicationName(), streamName),
             kinesisRecordProcessorFactory.create(messageProcessor));
+    configsBuilder
+        .leaseManagementConfig()
+        .failoverTimeMillis(configuration.getConsumerFailoverTimeInMs());
   }
 
   private RetrievalConfig getRetrievalConfig() {
diff --git a/src/main/resources/Documentation/Build.md b/src/main/resources/Documentation/Build.md
index ffb8b0c..f2bb4d7 100644
--- a/src/main/resources/Documentation/Build.md
+++ b/src/main/resources/Documentation/Build.md
@@ -2,7 +2,8 @@
 
 The events-aws-kinesis plugin can be build as a regular 'in-tree' plugin. That means
 that is required to clone a Gerrit source tree first and then to have the plugin
-source directory into the `/plugins` path.
+source directory into the `/plugins` path. The plugin depends on [events-broker](https://gerrit.googlesource.com/modules/events-broker)
+which is linked directly from source with the same 'in-tree' plugin structure.
 
 Additionally, the `plugins/external_plugin_deps.bzl` file needs to be updated to
 match the events-aws-kinesis plugin one.
@@ -11,6 +12,7 @@
 git clone --recursive https://gerrit.googlesource.com/gerrit
 cd gerrit
 git clone "https://gerrit.googlesource.com/plugins/events-aws-kinesis" plugins/events-aws-kinesis
+git clone "https://gerrit.googlesource.com/modules/events-broker" plugins/events-broker
 ln -sf plugins/events-aws-kinesis/external_plugin_deps.bzl plugins/.
 bazelisk build plugins/events-aws-kinesis
 ```
diff --git a/src/main/resources/Documentation/Config.md b/src/main/resources/Documentation/Config.md
index c6565f1..f3e5ffa 100644
--- a/src/main/resources/Documentation/Config.md
+++ b/src/main/resources/Documentation/Config.md
@@ -76,6 +76,27 @@
   If it goes over, the request will be timed-out and not attempted again.
   Default: 6000
 
+`plugin.events-aws-kinesis.recordMaxBufferedTimeMs`
+: Optional. Maximum amount of time (milliseconds) a record may spend being buffered
+  before it gets sent. Records may be sent sooner than this depending on the
+  other buffering limits.
+
+  This setting provides coarse ordering among records - any two records will
+  be reordered by no more than twice this amount (assuming no failures and
+  retries and equal network latency).
+  See [AWS docs](https://github.com/awslabs/amazon-kinesis-producer/blob/v0.14.6/java/amazon-kinesis-producer-sample/default_config.properties#L239)
+  for more details on this.
+  Default: 100
+
+`plugin.events-aws-kinesis.consumerFailoverTimeInMs`
+: Optional. Failover time in milliseconds. A worker which does not renew
+  it's lease within this time interval will be regarded as having problems
+  and it's shards will be assigned to other workers.
+
+  See [AWS docs](https://github.com/awslabs/amazon-kinesis-client/blob/v2.3.4/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L107)
+  for more details on this.
+  Default: 10000
+
 `plugin.events-aws-kinesis.shutdownTimeoutMs`
 : Optional. The maximum total time (milliseconds) waiting when shutting down
   kinesis consumers.
@@ -113,6 +134,12 @@
     The overall result of the operation, once available, will be logged.
     Default: true
 
+`plugin.events-aws-kinesis.profileName`
+:   Optional. The name of the aws configuration and credentials profile used to
+    connect to the Kinesis. See [Configuration and credential file settings](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html)
+    Default: When not specified credentials are provided via the Default Credentials
+    Provider Chain, as explained [here](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html)
+
 Overrides
 =========================
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java
index a83b04b..9ce2cba 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java
@@ -19,6 +19,7 @@
 
 import com.google.gerrit.server.config.PluginConfig;
 import com.google.gerrit.server.config.PluginConfigFactory;
+import java.util.Optional;
 import org.apache.log4j.Level;
 import org.eclipse.jgit.lib.Config;
 import org.junit.Before;
@@ -112,4 +113,27 @@
 
     assertThat(configuration.isSendStreamEvents()).isEqualTo(false);
   }
+
+  @Test
+  public void shouldReturnAWSProfileNameWhenConfigured() {
+    String awsProfileName = "aws_profile_name";
+    pluginConfig.setString("profileName", awsProfileName);
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    Configuration configuration = new Configuration(pluginConfigFactoryMock, PLUGIN_NAME);
+    Optional<String> profileName = configuration.getAwsConfigurationProfileName();
+    assertThat(profileName.isPresent()).isTrue();
+    assertThat(profileName.get()).isEqualTo(awsProfileName);
+  }
+
+  @Test
+  public void shouldSkipAWSProfileNameWhenNotConfigured() {
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    Configuration configuration = new Configuration(pluginConfigFactoryMock, PLUGIN_NAME);
+    Optional<String> profileName = configuration.getAwsConfigurationProfileName();
+    assertThat(profileName.isPresent()).isFalse();
+  }
 }
