Allow specifying RecordMaxBufferedTime property for Kinesis producer
RecordMaxBufferedTime property defines the maximum amount of time
(milliseconds) a record may spend being buffered. Increasing that
value might improve the performance of message producer.
Increasing RecordMaxBufferedTime is suggested by AWS Support Team
to avoid following issue:
`PutRecords processing time is taking longer than 500 ms to complete.`
Change-Id: Ie031fc74b9b857830b5525b2bfce725ccd7547f3
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
index 34a1a8c..31810bd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
@@ -36,6 +36,7 @@
private static final Long DEFAULT_POLLING_INTERVAL_MS = 1000L;
private static final Integer DEFAULT_MAX_RECORDS = 100;
private static final Long DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS = 6000L;
+ private static final Long DEFAULT_PUBLISH_RECORD_MAX_BUFFERED_TIME_MS = 100L;
private static final Long DEFAULT_PUBLISH_TIMEOUT_MS = 6000L;
private static final Long DEFAULT_SHUTDOWN_TIMEOUT_MS = 20000L;
private static final Long DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 60000L; // 5 min
@@ -57,6 +58,7 @@
private final Level awsLibLogLevel;
private final Boolean sendAsync;
private final Optional<String> awsConfigurationProfileName;
+ private final Long publishRecordMaxBufferedTimeMs;
@Inject
public Configuration(PluginConfigFactory configFactory, @PluginName String pluginName) {
@@ -91,6 +93,11 @@
.map(Long::parseLong)
.orElse(DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS);
+ this.publishRecordMaxBufferedTimeMs =
+ Optional.ofNullable(getStringParam(pluginConfig, "recordMaxBufferedTimeMs", null))
+ .map(Long::parseLong)
+ .orElse(DEFAULT_PUBLISH_RECORD_MAX_BUFFERED_TIME_MS);
+
this.publishTimeoutMs =
Optional.ofNullable(getStringParam(pluginConfig, "publishTimeoutMs", null))
.map(Long::parseLong)
@@ -157,6 +164,10 @@
return publishSingleRequestTimeoutMs;
}
+ public Long getPublishRecordMaxBufferedTimeMs() {
+ return publishRecordMaxBufferedTimeMs;
+ }
+
public Long getPollingIntervalMs() {
return pollingIntervalMs;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java
index 341d650..b13bd0c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java
@@ -41,7 +41,8 @@
new KinesisProducerConfiguration()
.setAggregationEnabled(false)
.setMaxConnections(1)
- .setRequestTimeout(configuration.getPublishSingleRequestTimeoutMs());
+ .setRequestTimeout(configuration.getPublishSingleRequestTimeoutMs())
+ .setRecordMaxBufferedTime(configuration.getPublishRecordMaxBufferedTimeMs());
conf.setRegion(configuration.getRegion().orElseGet(regionProvider::getRegion).toString());
diff --git a/src/main/resources/Documentation/Config.md b/src/main/resources/Documentation/Config.md
index 3562be2..66f516f 100644
--- a/src/main/resources/Documentation/Config.md
+++ b/src/main/resources/Documentation/Config.md
@@ -76,6 +76,18 @@
If it goes over, the request will be timed-out and not attempted again.
Default: 6000
+`plugin.events-aws-kinesis.recordMaxBufferedTimeMs`
+: Optional. Maximum amount of time (milliseconds) a record may spend being buffered
+ before it gets sent. Records may be sent sooner than this depending on the
+ other buffering limits.
+
+ This setting provides coarse ordering among records - any two records will
+ be reordered by no more than twice this amount (assuming no failures and
+ retries and equal network latency).
+ See [AWS docs](https://github.com/awslabs/amazon-kinesis-producer/blob/v0.14.6/java/amazon-kinesis-producer-sample/default_config.properties#L239)
+ for more details on this.
+ Default: 100
+
`plugin.events-aws-kinesis.shutdownTimeoutMs`
: Optional. The maximum total time (milliseconds) waiting when shutting down
kinesis consumers.