Pass stream name and consumer to KinesisConsumer

An instance of `KinesisConsumer` was constucted through a factory that
receives the stream name and the message processor as arguments.

Such arguments however were ignored and the `KinesisConsumer` was
instantiated with the corresponding class fields set as `null`.

Those class fields (`streamName` and `messageProcessor`) were then
populated when `subscribe()` was called, by providing again `streamName`
and `messageProcessor` as arguments.

This was the wrong way of populating `streamName` and
`messageProcessor`, it forced those arguments to be mutable and thus be
`null`, until the `subscribe()` method was called.

Pass stream name and consumer to KinesisConsumer via the existing
Factory by annotating them as @Assisted parameters. This allows to
remove arguments from the `subscribe()` method, since the
`KinesisConsumer` already has those set as class fields.

Change-Id: I1b102fe55b769f4b2899d4ff3fa7e64d36355fa6
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
index 675d801..27335fb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
@@ -109,6 +109,6 @@
     String groupId = Optional.ofNullable(maybeGroupId).orElse(configuration.getApplicationName());
     KinesisConsumer consumer = consumerFactory.create(streamName, groupId, eventConsumer);
     consumers.add(consumer);
-    consumer.subscribe(streamName, eventConsumer);
+    consumer.subscribe();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
index d17595c..5bf551a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
@@ -28,7 +28,7 @@
 class KinesisConsumer {
   interface Factory {
     KinesisConsumer create(
-        @Assisted("topic") String topic,
+        @Assisted("streamName") String streamName,
         @Assisted("groupId") String groupId,
         Consumer<Event> messageProcessor);
   }
@@ -42,8 +42,8 @@
   private final String groupId;
   private Scheduler kinesisScheduler;
 
-  private java.util.function.Consumer<Event> messageProcessor;
-  private String streamName;
+  private final java.util.function.Consumer<Event> messageProcessor;
+  private final String streamName;
   private AtomicBoolean resetOffset = new AtomicBoolean(false);
 
   @Inject
@@ -52,18 +52,19 @@
       CheckpointResetter checkpointResetter,
       Configuration configuration,
       @ConsumerExecutor ExecutorService executor,
-      @Assisted("groupId") String groupId) {
+      @Assisted("streamName") String streamName,
+      @Assisted("groupId") String groupId,
+      @Assisted java.util.function.Consumer<Event> messageProcessor) {
     this.schedulerFactory = schedulerFactory;
     this.checkpointResetter = checkpointResetter;
     this.configuration = configuration;
     this.executor = executor;
     this.groupId = groupId;
-  }
-
-  public void subscribe(String streamName, java.util.function.Consumer<Event> messageProcessor) {
     this.streamName = streamName;
     this.messageProcessor = messageProcessor;
+  }
 
+  public void subscribe() {
     logger.atInfo().log("Subscribe kinesis consumer to stream [%s]", streamName);
     runReceiver(groupId, messageProcessor);
   }