Allow specifying failoverTimeMillis property for Kinesis consumer
A worker which does not renew it's lease within this time interval
will be regarded as having problems and it's shards will be assigned
to other workers.
Increasing failoverTimeMillis is suggested by AWS Support Team
to avoid following issue:
```
java.lang.RuntimeException: java.lang.InterruptedException at
software.amazon.kinesis.retrieval.polling.KinesisDataFetcher.
getRecords(KinesisDataFetcher.java:336
```
Bug: Issue 287304308
Change-Id: I9dbb79399d17b01e09d4cb672e83915b1909ec31
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
index 31810bd..e0edc71 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
@@ -37,6 +37,7 @@
private static final Integer DEFAULT_MAX_RECORDS = 100;
private static final Long DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS = 6000L;
private static final Long DEFAULT_PUBLISH_RECORD_MAX_BUFFERED_TIME_MS = 100L;
+ private static final Long DEFAULT_CONSUMER_FAILOVER_TIME_MS = 10000L;
private static final Long DEFAULT_PUBLISH_TIMEOUT_MS = 6000L;
private static final Long DEFAULT_SHUTDOWN_TIMEOUT_MS = 20000L;
private static final Long DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 60000L; // 5 min
@@ -59,6 +60,7 @@
private final Boolean sendAsync;
private final Optional<String> awsConfigurationProfileName;
private final Long publishRecordMaxBufferedTimeMs;
+ private final Long consumerFailoverTimeInMs;
@Inject
public Configuration(PluginConfigFactory configFactory, @PluginName String pluginName) {
@@ -98,6 +100,11 @@
.map(Long::parseLong)
.orElse(DEFAULT_PUBLISH_RECORD_MAX_BUFFERED_TIME_MS);
+ this.consumerFailoverTimeInMs =
+ Optional.ofNullable(getStringParam(pluginConfig, "consumerFailoverTimeInMs", null))
+ .map(Long::parseLong)
+ .orElse(DEFAULT_CONSUMER_FAILOVER_TIME_MS);
+
this.publishTimeoutMs =
Optional.ofNullable(getStringParam(pluginConfig, "publishTimeoutMs", null))
.map(Long::parseLong)
@@ -168,6 +175,10 @@
return publishRecordMaxBufferedTimeMs;
}
+ public long getConsumerFailoverTimeInMs() {
+ return consumerFailoverTimeInMs;
+ }
+
public Long getPollingIntervalMs() {
return pollingIntervalMs;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
index 4c59b49..09083df 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
@@ -67,6 +67,9 @@
cloudWatchAsyncClient,
String.format("klc-worker-%s-%s", configuration.getApplicationName(), streamName),
kinesisRecordProcessorFactory.create(messageProcessor));
+ configsBuilder
+ .leaseManagementConfig()
+ .failoverTimeMillis(configuration.getConsumerFailoverTimeInMs());
}
private RetrievalConfig getRetrievalConfig() {
diff --git a/src/main/resources/Documentation/Config.md b/src/main/resources/Documentation/Config.md
index 66f516f..9758aab 100644
--- a/src/main/resources/Documentation/Config.md
+++ b/src/main/resources/Documentation/Config.md
@@ -88,6 +88,15 @@
for more details on this.
Default: 100
+`plugin.events-aws-kinesis.consumerFailoverTimeInMs`
+: Optional. Failover time in milliseconds. A worker which does not renew
+ it's lease within this time interval will be regarded as having problems
+ and it's shards will be assigned to other workers.
+
+ See [AWS docs](https://github.com/awslabs/amazon-kinesis-client/blob/v2.3.4/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L107)
+ for more details on this.
+ Default: 10000
+
`plugin.events-aws-kinesis.shutdownTimeoutMs`
: Optional. The maximum total time (milliseconds) waiting when shutting down
kinesis consumers.