Merge branch 'stable-3.4' into stable-3.5

* stable-3.4:
  Consume events-broker from source
  Allow specifying failoverTimeMillis property for Kinesis consumer
  Allow specifying RecordMaxBufferedTime property for Kinesis producer
  Fix issue with casting aws credentials providers
  Add AWS credentials profile name parameter

Change-Id: If3d0c4993bedb2ebb86a3d5ad1baba37b318a8f3
diff --git a/BUILD b/BUILD
index 2aa6678..1f6cdfa 100644
--- a/BUILD
+++ b/BUILD
@@ -19,6 +19,7 @@
     ],
     resources = glob(["src/main/resources/**/*"]),
     deps = [
+        ":events-broker-neverlink",
         "@amazon-auth//jar",
         "@amazon-aws-core//jar",
         "@amazon-cloudwatch//jar",
@@ -43,7 +44,6 @@
         "@awssdk-query-protocol//jar",
         "@commons-codec//jar",
         "@commons-lang//jar",
-        "@events-broker//jar:neverlink",
         "@io-netty-all//jar",
         "@jackson-annotations//jar",
         "@jackson-core//jar",
@@ -63,11 +63,11 @@
     tags = ["events-aws-kinesis"],
     deps = [
         ":events-aws-kinesis__plugin_test_deps",
+        "//plugins/events-broker",
         "@amazon-http-client-spi//jar",
         "@amazon-kinesis-client//jar",
         "@amazon-kinesis//jar",
         "@awssdk-kinesis-producer//jar",
-        "@events-broker//jar",
     ],
 )
 
@@ -96,3 +96,9 @@
         "@testcontainer-localstack//jar",
     ],
 )
+
+java_library(
+    name = "events-broker-neverlink",
+    neverlink = 1,
+    exports = ["//plugins/events-broker"],
+)
diff --git a/Jenkinsfile b/Jenkinsfile
index a32db2b..b7f3090 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -1,2 +1,3 @@
 pluginPipeline(formatCheckId: 'gerritforge:plugins-events-aws-kinesis-code-style',
-                buildCheckId: 'gerritforge:plugins-events-aws-kinesis-build-test')
+                buildCheckId: 'gerritforge:plugins-events-aws-kinesis-build-test',
+                extraModules: ['events-broker'])
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 9e44a2e..65abd99 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -120,12 +120,6 @@
     )
 
     maven_jar(
-        name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.5.1",
-        sha1 = "78b8bc6ad7fd7caadcc1c6e3484332464de0ac38",
-    )
-
-    maven_jar(
         name = "io-netty-all",
         artifact = "io.netty:netty-all:4.1.51.Final",
         sha1 = "5e5f741acc4c211ac4572c31c7e5277ec465e4e4",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
index 307b130..89476cf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
@@ -52,6 +52,8 @@
   static final Long DEFAULT_POLLING_INTERVAL_MS = 1000L;
   static final Integer DEFAULT_MAX_RECORDS = 100;
   static final Long DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS = 6000L;
+  static final Long DEFAULT_PUBLISH_RECORD_MAX_BUFFERED_TIME_MS = 100L;
+  static final Long DEFAULT_CONSUMER_FAILOVER_TIME_MS = 10000L;
   static final Long DEFAULT_PUBLISH_TIMEOUT_MS = 6000L;
   static final Long DEFAULT_SHUTDOWN_TIMEOUT_MS = 20000L;
   static final Level DEFAULT_AWS_LIB_LOG_LEVEL = Level.WARN;
@@ -74,6 +76,9 @@
   private final Level awsLibLogLevel;
   private final Boolean sendAsync;
   private final Boolean sendStreamEvents;
+  private final Optional<String> awsConfigurationProfileName;
+  private final Long publishRecordMaxBufferedTimeMs;
+  private final Long consumerFailoverTimeInMs;
 
   @Inject
   public Configuration(PluginConfigFactory configFactory, @PluginName String pluginName) {
@@ -116,6 +121,16 @@
             .map(Long::parseLong)
             .orElse(DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS);
 
+    this.publishRecordMaxBufferedTimeMs =
+        Optional.ofNullable(getStringParam(pluginConfig, "recordMaxBufferedTimeMs", null))
+            .map(Long::parseLong)
+            .orElse(DEFAULT_PUBLISH_RECORD_MAX_BUFFERED_TIME_MS);
+
+    this.consumerFailoverTimeInMs =
+        Optional.ofNullable(getStringParam(pluginConfig, "consumerFailoverTimeInMs", null))
+            .map(Long::parseLong)
+            .orElse(DEFAULT_CONSUMER_FAILOVER_TIME_MS);
+
     this.publishTimeoutMs =
         Optional.ofNullable(getStringParam(pluginConfig, PUBLISH_TIMEOUT_MS_FIELD, null))
             .map(Long::parseLong)
@@ -141,13 +156,17 @@
             .map(Boolean::new)
             .orElse(DEFAULT_SEND_ASYNC);
 
+    this.awsConfigurationProfileName =
+        Optional.ofNullable(getStringParam(pluginConfig, "profileName", null));
+
     logger.atInfo().log(
-        "Kinesis client. Application:'%s'|PollingInterval: %s|maxRecords: %s%s%s",
+        "Kinesis client. Application:'%s'|PollingInterval: %s|maxRecords: %s%s%s%s",
         applicationName,
         pollingIntervalMs,
         maxRecords,
         region.map(r -> String.format("|region: %s", r.id())).orElse(""),
-        endpoint.map(e -> String.format("|endpoint: %s", e.toASCIIString())).orElse(""));
+        endpoint.map(e -> String.format("|endpoint: %s", e.toASCIIString())).orElse(""),
+        awsConfigurationProfileName.map(p -> String.format("|profile: %s", p)).orElse(""));
   }
 
   public String getStreamEventsTopic() {
@@ -178,6 +197,14 @@
     return publishSingleRequestTimeoutMs;
   }
 
+  public Long getPublishRecordMaxBufferedTimeMs() {
+    return publishRecordMaxBufferedTimeMs;
+  }
+
+  public long getConsumerFailoverTimeInMs() {
+    return consumerFailoverTimeInMs;
+  }
+
   public Long getPollingIntervalMs() {
     return pollingIntervalMs;
   }
@@ -190,6 +217,10 @@
     return initialPosition;
   }
 
+  public Optional<String> getAwsConfigurationProfileName() {
+    return awsConfigurationProfileName;
+  }
+
   private static String getStringParam(
       PluginConfig pluginConfig, String name, String defaultValue) {
     return Strings.isNullOrEmpty(System.getProperty(name))
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/DynamoDbAsyncClientProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/DynamoDbAsyncClientProvider.java
index f3812df..d55dab2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/DynamoDbAsyncClientProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/DynamoDbAsyncClientProvider.java
@@ -17,6 +17,7 @@
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
 
@@ -35,6 +36,11 @@
     configuration.getRegion().ifPresent(builder::region);
     configuration.getEndpoint().ifPresent(builder::endpointOverride);
 
+    configuration
+        .getAwsConfigurationProfileName()
+        .ifPresent(
+            profile -> builder.credentialsProvider(ProfileCredentialsProvider.create(profile)));
+
     return builder.build();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisAsyncClientProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisAsyncClientProvider.java
index c983d60..681bb0a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisAsyncClientProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisAsyncClientProvider.java
@@ -17,6 +17,7 @@
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
 import software.amazon.kinesis.common.KinesisClientUtil;
@@ -35,7 +36,10 @@
     KinesisAsyncClientBuilder builder = KinesisAsyncClient.builder();
     configuration.getRegion().ifPresent(builder::region);
     configuration.getEndpoint().ifPresent(builder::endpointOverride);
-
+    configuration
+        .getAwsConfigurationProfileName()
+        .ifPresent(
+            profile -> builder.credentialsProvider(ProfileCredentialsProvider.create(profile)));
     return KinesisClientUtil.createKinesisAsyncClient(builder);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java
index 5c5dc52..b13bd0c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.kinesis;
 
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
 import com.amazonaws.services.kinesis.producer.KinesisProducer;
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 import com.google.common.flogger.FluentLogger;
@@ -40,11 +41,15 @@
         new KinesisProducerConfiguration()
             .setAggregationEnabled(false)
             .setMaxConnections(1)
-            .setRequestTimeout(configuration.getPublishSingleRequestTimeoutMs());
+            .setRequestTimeout(configuration.getPublishSingleRequestTimeoutMs())
+            .setRecordMaxBufferedTime(configuration.getPublishRecordMaxBufferedTimeMs());
 
     conf.setRegion(configuration.getRegion().orElseGet(regionProvider::getRegion).toString());
 
     configuration
+        .getAwsConfigurationProfileName()
+        .ifPresent(profile -> conf.setCredentialsProvider(new ProfileCredentialsProvider(profile)));
+    configuration
         .getEndpoint()
         .ifPresent(
             uri ->
@@ -54,12 +59,16 @@
                     .setCloudwatchPort(uri.getPort())
                     .setVerifyCertificate(false));
     logger.atInfo().log(
-        "Kinesis producer configured. Request Timeout (ms):'%s'%s%s",
+        "Kinesis producer configured. Request Timeout (ms):'%s'%s%s%s",
         configuration.getPublishSingleRequestTimeoutMs(),
         String.format("|region: '%s'", conf.getRegion()),
         configuration
             .getEndpoint()
             .map(e -> String.format("|endpoint: '%s'", e.toASCIIString()))
+            .orElse(""),
+        configuration
+            .getAwsConfigurationProfileName()
+            .map(p -> String.format("|profile: '%s'", p))
             .orElse(""));
 
     return new KinesisProducer(conf);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
index 4c59b49..09083df 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
@@ -67,6 +67,9 @@
             cloudWatchAsyncClient,
             String.format("klc-worker-%s-%s", configuration.getApplicationName(), streamName),
             kinesisRecordProcessorFactory.create(messageProcessor));
+    configsBuilder
+        .leaseManagementConfig()
+        .failoverTimeMillis(configuration.getConsumerFailoverTimeInMs());
   }
 
   private RetrievalConfig getRetrievalConfig() {
diff --git a/src/main/resources/Documentation/Build.md b/src/main/resources/Documentation/Build.md
index ffb8b0c..f2bb4d7 100644
--- a/src/main/resources/Documentation/Build.md
+++ b/src/main/resources/Documentation/Build.md
@@ -2,7 +2,8 @@
 
 The events-aws-kinesis plugin can be build as a regular 'in-tree' plugin. That means
 that is required to clone a Gerrit source tree first and then to have the plugin
-source directory into the `/plugins` path.
+source directory into the `/plugins` path. The plugin depends on [events-broker](https://gerrit.googlesource.com/modules/events-broker)
+which is linked directly from source with the same 'in-tree' plugin structure.
 
 Additionally, the `plugins/external_plugin_deps.bzl` file needs to be updated to
 match the events-aws-kinesis plugin one.
@@ -11,6 +12,7 @@
 git clone --recursive https://gerrit.googlesource.com/gerrit
 cd gerrit
 git clone "https://gerrit.googlesource.com/plugins/events-aws-kinesis" plugins/events-aws-kinesis
+git clone "https://gerrit.googlesource.com/modules/events-broker" plugins/events-broker
 ln -sf plugins/events-aws-kinesis/external_plugin_deps.bzl plugins/.
 bazelisk build plugins/events-aws-kinesis
 ```
diff --git a/src/main/resources/Documentation/Config.md b/src/main/resources/Documentation/Config.md
index c6565f1..f3e5ffa 100644
--- a/src/main/resources/Documentation/Config.md
+++ b/src/main/resources/Documentation/Config.md
@@ -76,6 +76,27 @@
   If it goes over, the request will be timed-out and not attempted again.
   Default: 6000
 
+`plugin.events-aws-kinesis.recordMaxBufferedTimeMs`
+: Optional. Maximum amount of time (milliseconds) a record may spend being buffered
+  before it gets sent. Records may be sent sooner than this depending on the
+  other buffering limits.
+
+  This setting provides coarse ordering among records - any two records will
+  be reordered by no more than twice this amount (assuming no failures and
+  retries and equal network latency).
+  See [AWS docs](https://github.com/awslabs/amazon-kinesis-producer/blob/v0.14.6/java/amazon-kinesis-producer-sample/default_config.properties#L239)
+  for more details on this.
+  Default: 100
+
+`plugin.events-aws-kinesis.consumerFailoverTimeInMs`
+: Optional. Failover time in milliseconds. A worker which does not renew
+  it's lease within this time interval will be regarded as having problems
+  and it's shards will be assigned to other workers.
+
+  See [AWS docs](https://github.com/awslabs/amazon-kinesis-client/blob/v2.3.4/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L107)
+  for more details on this.
+  Default: 10000
+
 `plugin.events-aws-kinesis.shutdownTimeoutMs`
 : Optional. The maximum total time (milliseconds) waiting when shutting down
   kinesis consumers.
@@ -113,6 +134,12 @@
     The overall result of the operation, once available, will be logged.
     Default: true
 
+`plugin.events-aws-kinesis.profileName`
+:   Optional. The name of the aws configuration and credentials profile used to
+    connect to the Kinesis. See [Configuration and credential file settings](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html)
+    Default: When not specified credentials are provided via the Default Credentials
+    Provider Chain, as explained [here](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html)
+
 Overrides
 =========================
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java
index a83b04b..9ce2cba 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java
@@ -19,6 +19,7 @@
 
 import com.google.gerrit.server.config.PluginConfig;
 import com.google.gerrit.server.config.PluginConfigFactory;
+import java.util.Optional;
 import org.apache.log4j.Level;
 import org.eclipse.jgit.lib.Config;
 import org.junit.Before;
@@ -112,4 +113,27 @@
 
     assertThat(configuration.isSendStreamEvents()).isEqualTo(false);
   }
+
+  @Test
+  public void shouldReturnAWSProfileNameWhenConfigured() {
+    String awsProfileName = "aws_profile_name";
+    pluginConfig.setString("profileName", awsProfileName);
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    Configuration configuration = new Configuration(pluginConfigFactoryMock, PLUGIN_NAME);
+    Optional<String> profileName = configuration.getAwsConfigurationProfileName();
+    assertThat(profileName.isPresent()).isTrue();
+    assertThat(profileName.get()).isEqualTo(awsProfileName);
+  }
+
+  @Test
+  public void shouldSkipAWSProfileNameWhenNotConfigured() {
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    Configuration configuration = new Configuration(pluginConfigFactoryMock, PLUGIN_NAME);
+    Optional<String> profileName = configuration.getAwsConfigurationProfileName();
+    assertThat(profileName.isPresent()).isFalse();
+  }
 }