Pass stream name and consumer to KinesisConsumer
An instance of `KinesisConsumer` was constucted through a factory that
receives the stream name and the message processor as arguments.
Such arguments however were ignored and the `KinesisConsumer` was
instantiated with the corresponding class fields set as `null`.
Those class fields (`streamName` and `messageProcessor`) were then
populated when `subscribe()` was called, by providing again `streamName`
and `messageProcessor` as arguments.
This was the wrong way of populating `streamName` and
`messageProcessor`, it forced those arguments to be mutable and thus be
`null`, until the `subscribe()` method was called.
Pass stream name and consumer to KinesisConsumer via the existing
Factory by annotating them as @Assisted parameters. This allows to
remove arguments from the `subscribe()` method, since the
`KinesisConsumer` already has those set as class fields.
Change-Id: I1b102fe55b769f4b2899d4ff3fa7e64d36355fa6
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
index 675d801..27335fb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
@@ -109,6 +109,6 @@
String groupId = Optional.ofNullable(maybeGroupId).orElse(configuration.getApplicationName());
KinesisConsumer consumer = consumerFactory.create(streamName, groupId, eventConsumer);
consumers.add(consumer);
- consumer.subscribe(streamName, eventConsumer);
+ consumer.subscribe();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
index d17595c..5bf551a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
@@ -28,7 +28,7 @@
class KinesisConsumer {
interface Factory {
KinesisConsumer create(
- @Assisted("topic") String topic,
+ @Assisted("streamName") String streamName,
@Assisted("groupId") String groupId,
Consumer<Event> messageProcessor);
}
@@ -42,8 +42,8 @@
private final String groupId;
private Scheduler kinesisScheduler;
- private java.util.function.Consumer<Event> messageProcessor;
- private String streamName;
+ private final java.util.function.Consumer<Event> messageProcessor;
+ private final String streamName;
private AtomicBoolean resetOffset = new AtomicBoolean(false);
@Inject
@@ -52,18 +52,19 @@
CheckpointResetter checkpointResetter,
Configuration configuration,
@ConsumerExecutor ExecutorService executor,
- @Assisted("groupId") String groupId) {
+ @Assisted("streamName") String streamName,
+ @Assisted("groupId") String groupId,
+ @Assisted java.util.function.Consumer<Event> messageProcessor) {
this.schedulerFactory = schedulerFactory;
this.checkpointResetter = checkpointResetter;
this.configuration = configuration;
this.executor = executor;
this.groupId = groupId;
- }
-
- public void subscribe(String streamName, java.util.function.Consumer<Event> messageProcessor) {
this.streamName = streamName;
this.messageProcessor = messageProcessor;
+ }
+ public void subscribe() {
logger.atInfo().log("Subscribe kinesis consumer to stream [%s]", streamName);
runReceiver(groupId, messageProcessor);
}