Stop using Gerrit instanceId as partition key

Using the Gerrit instanceId as partition key might lead to
IllegalArgument exceptions, due to the fact that it can be null (since
it is an optional configuration).

Moreover, using _only_ the instanceId would lead to a suboptimal
sharding of the messages across different kinesis shards, in case where
the majority of messages would come from one active gerrit instance.

Set the partition key to the event type, which is always present and
provides a better distribution than instanceId.

Bug: Issue 14697
Change-Id: I467e79d8c98958cc68911a54a684ae35afe26547
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
index 9846850..0316c06 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
@@ -18,8 +18,6 @@
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventGson;
-import com.google.gson.Gson;
 import com.google.inject.Inject;
 import java.util.Collections;
 import java.util.Set;
@@ -30,16 +28,12 @@
 class KinesisBrokerApi implements BrokerApi {
   private final KinesisConsumer.Factory consumerFactory;
 
-  private final Gson gson;
   private final KinesisPublisher kinesisPublisher;
   private final Set<KinesisConsumer> consumers;
 
   @Inject
   public KinesisBrokerApi(
-      @EventGson Gson gson,
-      KinesisPublisher kinesisPublisher,
-      KinesisConsumer.Factory consumerFactory) {
-    this.gson = gson;
+      KinesisPublisher kinesisPublisher, KinesisConsumer.Factory consumerFactory) {
     this.kinesisPublisher = kinesisPublisher;
     this.consumerFactory = consumerFactory;
     this.consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
@@ -47,7 +41,7 @@
 
   @Override
   public ListenableFuture<Boolean> send(String streamName, Event event) {
-    return kinesisPublisher.publish(streamName, gson.toJson(event), event.instanceId);
+    return kinesisPublisher.publish(streamName, event);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
index d54540d..158f8ac 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
@@ -58,14 +58,14 @@
 
   @Override
   public void onEvent(Event event) {
-    publish(configuration.getStreamEventsTopic(), gson.toJson(event), event.getType());
+    publish(configuration.getStreamEventsTopic(), event);
   }
 
-  ListenableFuture<Boolean> publish(String streamName, String stringEvent, String partitionKey) {
+  ListenableFuture<Boolean> publish(String streamName, Event event) {
     if (configuration.isSendAsync()) {
-      return publishAsync(streamName, stringEvent, partitionKey);
+      return publishAsync(streamName, gson.toJson(event), event.getType());
     }
-    return publishSync(streamName, stringEvent, partitionKey);
+    return publishSync(streamName, gson.toJson(event), event.getType());
   }
 
   private ListenableFuture<Boolean> publishSync(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
index 6b0eeb4..940cd31 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
@@ -123,6 +123,25 @@
   @Test
   @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
   @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
+  public void shouldConsumeAnEventWithoutInstanceId() throws Exception {
+    String streamName = UUID.randomUUID().toString();
+    createStreamAndWait(streamName, STREAM_CREATION_TIMEOUT);
+
+    EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
+    kinesisBroker().receiveAsync(streamName, eventConsumerCounter);
+
+    Event event = eventMessage();
+    event.instanceId = null;
+
+    kinesisBroker().send(streamName, event);
+    WaitUtil.waitUntil(
+        () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
+    compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
+  }
+
+  @Test
+  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
+  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
   public void shouldReplayMessages() throws Exception {
     String streamName = UUID.randomUUID().toString();
     createStreamAndWait(streamName, STREAM_CREATION_TIMEOUT);