Merge branch 'stable-3.4' into stable-3.5
* stable-3.4:
Remove logging unused format arguments
Update checkpoint on regular bases
Change-Id: I44474451859b96c6aace974e6274b1df2c466686
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
index 2a15d2c..307b130 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
@@ -57,6 +57,7 @@
static final Level DEFAULT_AWS_LIB_LOG_LEVEL = Level.WARN;
static final Boolean DEFAULT_SEND_ASYNC = true;
static final Boolean DEFAULT_SEND_STREAM_EVENTS = false;
+ static final Long DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 60000L; // 5 min
private final String applicationName;
private final String streamEventsTopic;
@@ -69,6 +70,7 @@
private final Long publishTimeoutMs;
private final Long publishSingleRequestTimeoutMs;
private final Long shutdownTimeoutMs;
+ private final Long checkpointIntervalMs;
private final Level awsLibLogLevel;
private final Boolean sendAsync;
private final Boolean sendStreamEvents;
@@ -124,6 +126,11 @@
.map(Long::parseLong)
.orElse(DEFAULT_SHUTDOWN_TIMEOUT_MS);
+ this.checkpointIntervalMs =
+ Optional.ofNullable(getStringParam(pluginConfig, "checkpointIntervalMs", null))
+ .map(Long::parseLong)
+ .orElse(DEFAULT_CHECKPOINT_INTERVAL_MS);
+
this.awsLibLogLevel =
Optional.ofNullable(getStringParam(pluginConfig, AWS_LIB_LOG_LEVEL_FIELD, null))
.map(l -> Level.toLevel(l, DEFAULT_AWS_LIB_LOG_LEVEL))
@@ -198,6 +205,10 @@
return shutdownTimeoutMs;
}
+ public Long getCheckpointIntervalMs() {
+ return checkpointIntervalMs;
+ }
+
public Level getAwsLibLogLevel() {
return awsLibLogLevel;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
index 2786b82..598aa6c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
@@ -24,11 +24,13 @@
import java.util.function.Consumer;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
+import software.amazon.kinesis.exceptions.ThrottlingException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
+import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
class KinesisRecordProcessor implements ShardRecordProcessor {
@@ -40,21 +42,29 @@
private final Consumer<Event> recordProcessor;
private final OneOffRequestContext oneOffCtx;
private final EventDeserializer eventDeserializer;
+ private final Configuration configuration;
+
+ private long nextCheckpointTimeInMillis;
+ private String kinesisShardId;
@Inject
KinesisRecordProcessor(
@Assisted Consumer<Event> recordProcessor,
OneOffRequestContext oneOffCtx,
- EventDeserializer eventDeserializer) {
+ EventDeserializer eventDeserializer,
+ Configuration configuration) {
this.recordProcessor = recordProcessor;
this.oneOffCtx = oneOffCtx;
this.eventDeserializer = eventDeserializer;
+ this.configuration = configuration;
}
@Override
public void initialize(InitializationInput initializationInput) {
+ kinesisShardId = initializationInput.shardId();
logger.atInfo().log(
"Initializing @ Sequence: %s", initializationInput.extendedSequenceNumber());
+ setNextCheckpointTime();
}
@Override
@@ -79,11 +89,21 @@
logger.atSevere().withCause(e).log("Could not process event '%s'", jsonMessage);
}
});
+
+ if (System.currentTimeMillis() >= nextCheckpointTimeInMillis) {
+ checkpoint(processRecordsInput.checkpointer());
+ setNextCheckpointTime();
+ }
} catch (Throwable t) {
logger.atSevere().withCause(t).log("Caught throwable while processing records. Aborting.");
}
}
+ private void setNextCheckpointTime() {
+ nextCheckpointTimeInMillis =
+ System.currentTimeMillis() + configuration.getCheckpointIntervalMs();
+ }
+
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
logger.atInfo().log("Lost lease, so terminating.");
@@ -91,22 +111,27 @@
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
- try {
- logger.atInfo().log("Reached shard end checkpointing.");
- shardEndedInput.checkpointer().checkpoint();
- } catch (ShutdownException | InvalidStateException e) {
- logger.atSevere().withCause(e).log("Exception while checkpointing at shard end. Giving up.");
- }
+ logger.atInfo().log("Reached shard end checkpointing.");
+ checkpoint(shardEndedInput.checkpointer());
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
+ logger.atInfo().log("Scheduler is shutting down, checkpointing.");
+ checkpoint(shutdownRequestedInput.checkpointer());
+ }
+
+ private void checkpoint(RecordProcessorCheckpointer checkpointer) {
+ logger.atInfo().log("Checkpointing shard: " + kinesisShardId);
try {
- logger.atInfo().log("Scheduler is shutting down, checkpointing.");
- shutdownRequestedInput.checkpointer().checkpoint();
- } catch (ShutdownException | InvalidStateException e) {
+ checkpointer.checkpoint();
+ } catch (ShutdownException se) {
+ logger.atSevere().withCause(se).log("Caught shutdown exception, skipping checkpoint.");
+ } catch (ThrottlingException e) {
+ logger.atSevere().withCause(e).log("Caught throttling exception, skipping checkpoint.");
+ } catch (InvalidStateException e) {
logger.atSevere().withCause(e).log(
- "Exception while checkpointing at requested shutdown. Giving up.");
+ "Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.");
}
}
}
diff --git a/src/main/resources/Documentation/Config.md b/src/main/resources/Documentation/Config.md
index 0683975..c6565f1 100644
--- a/src/main/resources/Documentation/Config.md
+++ b/src/main/resources/Documentation/Config.md
@@ -81,6 +81,10 @@
kinesis consumers.
Default: 20000
+`plugin.events-aws-kinesis.checkpointIntervalMs`
+: Optional. The interval between checkpoints (milliseconds).
+Default: 300000 (5 minutes)
+
`plugin.events-aws-kinesis.awsLibLogLevel`
: Optional. Which level AWS libraries should log at.
This plugin delegates most complex tasks associated to the production and
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
index d488ab8..24027fc 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
@@ -38,11 +38,14 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
+import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@RunWith(MockitoJUnitRunner.class)
public class KinesisRecordProcessorTest {
@@ -54,11 +57,41 @@
@Captor ArgumentCaptor<Event> eventMessageCaptor;
@Mock OneOffRequestContext oneOffCtx;
@Mock ManualRequestContext requestContext;
+ @Mock Configuration configuration;
@Before
public void setup() {
when(oneOffCtx.open()).thenReturn(requestContext);
- objectUnderTest = new KinesisRecordProcessor(succeedingConsumer, oneOffCtx, eventDeserializer);
+ objectUnderTest =
+ new KinesisRecordProcessor(succeedingConsumer, oneOffCtx, eventDeserializer, configuration);
+ }
+
+ @Test
+ public void shouldNotCheckpointBeforeIntervalIsExpired() {
+ when(configuration.getCheckpointIntervalMs()).thenReturn(10000L);
+ Event event = new ProjectCreatedEvent();
+
+ initializeRecordProcessor();
+
+ ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+ ProcessRecordsInput processRecordsInputSpy = Mockito.spy(kinesisInput);
+ objectUnderTest.processRecords(processRecordsInputSpy);
+
+ verify(processRecordsInputSpy, never()).checkpointer();
+ }
+
+ @Test
+ public void shouldCheckpointAfterIntervalIsExpired() throws InterruptedException {
+ when(configuration.getCheckpointIntervalMs()).thenReturn(0L);
+ Event event = new ProjectCreatedEvent();
+
+ initializeRecordProcessor();
+
+ ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
+ ProcessRecordsInput processRecordsInputSpy = Mockito.spy(kinesisInput);
+ objectUnderTest.processRecords(processRecordsInputSpy);
+
+ verify(processRecordsInputSpy).checkpointer();
}
@Test
@@ -153,4 +186,13 @@
.build();
return kinesisInput;
}
+
+ private void initializeRecordProcessor() {
+ InitializationInput initializationInput =
+ InitializationInput.builder()
+ .shardId("shard-0000")
+ .extendedSequenceNumber(new ExtendedSequenceNumber("0000"))
+ .build();
+ objectUnderTest.initialize(initializationInput);
+ }
}