Adding unit tests for KafkaEventDeserializer

Change-Id: I38a10f34d76f431a6a3e4a9fd767c450ca1a1b41
diff --git a/BUILD b/BUILD
index 54e1d5c..bab2335 100644
--- a/BUILD
+++ b/BUILD
@@ -19,6 +19,7 @@
     resources = glob(["src/main/resources/**/*"]),
     deps = [
         "@kafka_client//jar",
+        "@commons-lang3//jar",
     ],
 )
 
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index e8f2e60..472f962 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -49,3 +49,9 @@
         artifact = "org.testcontainers:kafka:1.10.6",
         sha1 = "5984e31306bd6c84a36092cdd19e0ef7e2268d98",
     )
+
+    maven_jar(
+        name = "commons-lang3",
+        artifact = "org.apache.commons:commons-lang3:3.6",
+        sha1 = "9d28a6b23650e8a7e9063c04588ace6cf7012c17",
+    )
\ No newline at end of file
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
index f8b8409..cb84398 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
@@ -22,7 +22,7 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.BrokerReadEvent;
+import com.googlesource.gerrit.plugins.multisite.kafka.consumer.SourceAwareEventWrapper;
 import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,12 +61,18 @@
   }
 
   private String getPayload(Event event) {
-    JsonObject obj = gson.toJsonTree(event).getAsJsonObject();
-    BrokerReadEvent eventWithHeader =
-        new BrokerReadEvent(
-            new BrokerReadEvent.KafkaEventHeader(
-                UUID.randomUUID(), event.getType(), instanceId, System.currentTimeMillis()),
-            obj);
-    return gson.toJson(eventWithHeader);
+    return gson.toJson(toBrokerEvent(event));
+  }
+
+  private SourceAwareEventWrapper toBrokerEvent(Event event) {
+    JsonObject body = eventToJson(event);
+    return new SourceAwareEventWrapper(
+        new SourceAwareEventWrapper.KafkaEventHeader(
+            UUID.randomUUID(), event.getType(), instanceId, event.eventCreatedOn),
+        body);
+  }
+
+  private JsonObject eventToJson(Event event) {
+    return gson.toJsonTree(event).getAsJsonObject();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheEntry.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheEntry.java
index 5d213f3..53abf32 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheEntry.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheEntry.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.forwarder;
 
+import com.google.common.base.Objects;
 import com.googlesource.gerrit.plugins.multisite.cache.Constants;
 
 /** Represents a cache entry to evict */
@@ -62,4 +63,19 @@
     }
     return new CacheEntry(Constants.GERRIT, cache, key);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    CacheEntry that = (CacheEntry) o;
+    return Objects.equal(pluginName, that.pluginName)
+        && Objects.equal(cacheName, that.cacheName)
+        && Objects.equal(key, that.key);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(pluginName, cacheName, key);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java
index 2b7fdfd..6fc8171 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java
@@ -16,7 +16,6 @@
 
 import com.google.gerrit.reviewdb.client.Account;
 import com.google.gerrit.server.index.account.AccountIndexer;
-import com.google.gwtorm.server.OrmException;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
@@ -42,8 +41,7 @@
   }
 
   @Override
-  protected void doIndex(Account.Id id, Optional<AccountIndexEvent> event)
-      throws IOException, OrmException {
+  protected void doIndex(Account.Id id, Optional<AccountIndexEvent> event) throws IOException {
     indexer.index(id);
     log.debug("Account {} successfully indexed", id);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ForwardedEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ForwardedEventRouter.java
index 67b75d3..e920069 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ForwardedEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ForwardedEventRouter.java
@@ -21,6 +21,7 @@
 import com.google.gerrit.reviewdb.client.AccountGroup;
 import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.RefEvent;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gwtorm.server.OrmException;
 import com.google.inject.Inject;
@@ -96,7 +97,9 @@
       Object parsedKey =
           GsonParser.fromJson(cacheEvictionEvent.cacheName, cacheEvictionEvent.key.toString());
       cacheEvictionHanlder.evict(CacheEntry.from(cacheEvictionEvent.cacheName, parsedKey));
-    } else if (sourceEvent instanceof Event) {
+    } else if (sourceEvent instanceof RefEvent) {
+      // ForwardedEventHandler can receive any Event subclass but actually just processes subclasses
+      // of RefEvent
       streamEventHandler.dispatch(sourceEvent);
     } else {
       throw new UnsupportedOperationException(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
index c553f04..0c76ba7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
@@ -28,7 +28,8 @@
   protected void configure() {
     MultiSiteEvent.registerEventTypes();
     bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
-    bind(new TypeLiteral<Deserializer<BrokerReadEvent>>() {}).to(KafkaEventDeserializer.class);
+    bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
+        .to(KafkaEventDeserializer.class);
 
     bind(Executor.class)
         .annotatedWith(ConsumerExecutor.class)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
index 61f43eb..9a589df 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
@@ -23,7 +23,7 @@
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 @Singleton
-public class KafkaEventDeserializer implements Deserializer<BrokerReadEvent> {
+public class KafkaEventDeserializer implements Deserializer<SourceAwareEventWrapper> {
 
   private final StringDeserializer stringDeserializer = new StringDeserializer();
   private Provider<Gson> gsonProvider;
@@ -44,10 +44,15 @@
   }
 
   @Override
-  public BrokerReadEvent deserialize(String topic, byte[] data) {
-    return gsonProvider
-        .get()
-        .fromJson(stringDeserializer.deserialize(topic, data), BrokerReadEvent.class);
+  public SourceAwareEventWrapper deserialize(String topic, byte[] data) {
+    final SourceAwareEventWrapper result =
+        gsonProvider
+            .get()
+            .fromJson(stringDeserializer.deserialize(topic, data), SourceAwareEventWrapper.class);
+
+    result.validate();
+
+    return result;
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaSubcriber.java
index d7f688c..827b365 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaSubcriber.java
@@ -32,13 +32,13 @@
   private final Provider<Gson> gsonProvider;
   private final UUID instanceId;
   private final AtomicBoolean closed = new AtomicBoolean(false);
-  private final Deserializer<BrokerReadEvent> valueDeserializer;
+  private final Deserializer<SourceAwareEventWrapper> valueDeserializer;
 
   @Inject
   public KafkaSubcriber(
       Configuration configuration,
       Deserializer<byte[]> keyDeserializer,
-      Deserializer<BrokerReadEvent> valueDeserializer,
+      Deserializer<SourceAwareEventWrapper> valueDeserializer,
       ForwardedEventRouter eventRouter,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId) {
@@ -80,7 +80,7 @@
   private void processRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
     try {
 
-      BrokerReadEvent event =
+      SourceAwareEventWrapper event =
           valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
 
       if (event.getHeader().getSourceInstanceId().equals(instanceId)) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/BrokerReadEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/SourceAwareEventWrapper.java
similarity index 78%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/BrokerReadEvent.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/SourceAwareEventWrapper.java
index 191bd39..3e68265 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/BrokerReadEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/SourceAwareEventWrapper.java
@@ -19,8 +19,9 @@
 import com.google.gson.JsonObject;
 import com.google.inject.Provider;
 import java.util.UUID;
+import org.apache.commons.lang3.Validate;
 
-public class BrokerReadEvent {
+public class SourceAwareEventWrapper {
 
   private final KafkaEventHeader header;
   private final JsonObject body;
@@ -67,15 +68,21 @@
       return eventCreatedOn;
     }
 
-    @Override
-    public String toString() {
-      return String.format(
-          "ts=%s, id=%s, type=%s, source=%s", eventCreatedOn, eventId, eventType, sourceInstanceId);
+    public void validate() {
+      Validate.notNull(eventId, "EventId cannot be null");
+      Validate.notNull(eventType, "EventType cannot be null");
+      Validate.notNull(sourceInstanceId, "Source Instance ID cannot be null");
     }
   }
 
-  public BrokerReadEvent(KafkaEventHeader header, JsonObject body) {
+  public SourceAwareEventWrapper(KafkaEventHeader header, JsonObject body) {
     this.header = header;
     this.body = body;
   }
+
+  public void validate() {
+    Validate.notNull(header, "Header cannot be null");
+    Validate.notNull(body, "Body cannot be null");
+    header.validate();
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ForwardedEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ForwardedEventRouterTest.java
new file mode 100644
index 0000000..efb0c4e
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ForwardedEventRouterTest.java
@@ -0,0 +1,215 @@
+package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import com.google.gerrit.reviewdb.client.Account;
+import com.google.gerrit.reviewdb.client.AccountGroup;
+import com.google.gerrit.reviewdb.client.Branch;
+import com.google.gerrit.reviewdb.client.Change;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.events.CommentAddedEvent;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.util.time.TimeUtil;
+import com.google.gwtorm.client.KeyUtil;
+import com.google.gwtorm.server.StandardKeyEncoder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEntry;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedCacheEvictionHandler;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexAccountHandler;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexChangeHandler;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexGroupHandler;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexProjectHandler;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import java.util.Optional;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ForwardedEventRouterTest {
+
+  static {
+    KeyUtil.setEncoderImpl(new StandardKeyEncoder());
+  }
+
+  private ForwardedEventRouter router;
+  @Mock private ForwardedCacheEvictionHandler cacheEvictionHandler;
+  @Mock private ForwardedIndexAccountHandler indexAccountHandler;
+  @Mock private ForwardedIndexChangeHandler indexChangeHandler;
+  @Mock private ForwardedIndexGroupHandler indexGroupHandler;
+  @Mock private ForwardedIndexProjectHandler indexProjectHandler;
+  @Mock private ForwardedEventHandler streamEventHandler;
+
+  @Before
+  public void setUp() {
+    router =
+        new ForwardedEventRouter(
+            cacheEvictionHandler,
+            indexAccountHandler,
+            indexChangeHandler,
+            indexGroupHandler,
+            indexProjectHandler,
+            streamEventHandler);
+  }
+
+  @Test
+  public void routerShouldSendEventsToTheAppropriateHandler_CacheEviction() throws Exception {
+    final CacheEvictionEvent event = new CacheEvictionEvent("cache", "key");
+    router.route(event);
+
+    verify(cacheEvictionHandler).evict(CacheEntry.from(event.cacheName, event.key));
+
+    verifyZeroInteractions(
+        indexAccountHandler,
+        indexChangeHandler,
+        indexGroupHandler,
+        indexProjectHandler,
+        streamEventHandler);
+  }
+
+  @Test
+  public void routerShouldSendEventsToTheAppropriateHandler_AccountIndex() throws Exception {
+    final AccountIndexEvent event = new AccountIndexEvent(1);
+    router.route(event);
+
+    verify(indexAccountHandler)
+        .index(
+            new Account.Id(event.accountId),
+            ForwardedIndexingHandler.Operation.INDEX,
+            Optional.of(event));
+
+    verifyZeroInteractions(
+        indexChangeHandler,
+        indexGroupHandler,
+        indexProjectHandler,
+        streamEventHandler,
+        cacheEvictionHandler);
+  }
+
+  @Test
+  public void routerShouldSendEventsToTheAppropriateHandler_GroupIndex() throws Exception {
+    final String groupId = "12";
+    final GroupIndexEvent event = new GroupIndexEvent(groupId);
+    router.route(event);
+
+    verify(indexGroupHandler)
+        .index(
+            new AccountGroup.UUID(groupId),
+            ForwardedIndexingHandler.Operation.INDEX,
+            Optional.of(event));
+
+    verifyZeroInteractions(
+        indexAccountHandler,
+        indexChangeHandler,
+        indexProjectHandler,
+        streamEventHandler,
+        cacheEvictionHandler);
+  }
+
+  @Test
+  public void routerShouldSendEventsToTheAppropriateHandler_ProjectIndex() throws Exception {
+    final String projectName = "projectName";
+    final ProjectIndexEvent event = new ProjectIndexEvent(projectName);
+    router.route(event);
+
+    verify(indexProjectHandler)
+        .index(
+            Project.NameKey.parse(projectName),
+            ForwardedIndexingHandler.Operation.INDEX,
+            Optional.of(event));
+
+    verifyZeroInteractions(
+        indexAccountHandler,
+        indexChangeHandler,
+        indexGroupHandler,
+        streamEventHandler,
+        cacheEvictionHandler);
+  }
+
+  @Test
+  public void routerShouldSendEventsToTheAppropriateHandler_ChangeIndex() throws Exception {
+    final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, false);
+    router.route(event);
+
+    verify(indexChangeHandler)
+        .index(
+            event.projectName + "~" + event.changeId,
+            ForwardedIndexingHandler.Operation.INDEX,
+            Optional.of(event));
+
+    verifyZeroInteractions(
+        indexAccountHandler,
+        indexGroupHandler,
+        indexProjectHandler,
+        streamEventHandler,
+        cacheEvictionHandler);
+  }
+
+  @Test
+  public void routerShouldSendEventsToTheAppropriateHandler_ChangeIndexDelete() throws Exception {
+    final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, true);
+    router.route(event);
+
+    verify(indexChangeHandler)
+        .index(
+            event.projectName + "~" + event.changeId,
+            ForwardedIndexingHandler.Operation.DELETE,
+            Optional.of(event));
+
+    verifyZeroInteractions(
+        indexAccountHandler,
+        indexGroupHandler,
+        indexProjectHandler,
+        streamEventHandler,
+        cacheEvictionHandler);
+  }
+
+  @Test
+  public void routerShouldSendEventsToTheAppropriateHandler_StreamEvent() throws Exception {
+    final CommentAddedEvent event = new CommentAddedEvent(aChange());
+    router.route(event);
+    verify(streamEventHandler).dispatch(event);
+    verifyZeroInteractions(
+        indexAccountHandler,
+        indexChangeHandler,
+        indexGroupHandler,
+        indexProjectHandler,
+        cacheEvictionHandler);
+  }
+
+  @Test
+  public void routerShouldFailForNotRecognisedEvents() throws Exception {
+    final Event newEventType = new Event("new-type") {};
+
+    try {
+      router.route(newEventType);
+      Assert.fail("Expected exception for not supported event");
+    } catch (UnsupportedOperationException expected) {
+      verifyZeroInteractions(
+          indexAccountHandler,
+          indexChangeHandler,
+          indexGroupHandler,
+          indexProjectHandler,
+          streamEventHandler,
+          cacheEvictionHandler);
+    }
+  }
+
+  private Change aChange() {
+    return new Change(
+        new Change.Key("Iabcd1234abcd1234abcd1234abcd1234abcd1234"),
+        new Change.Id(1),
+        new Account.Id(1),
+        new Branch.NameKey("proj", "refs/heads/master"),
+        TimeUtil.nowTs());
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
new file mode 100644
index 0000000..4198581
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
@@ -0,0 +1,65 @@
+package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.base.Supplier;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventDeserializer;
+import com.google.gerrit.server.events.SupplierDeserializer;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.inject.Provider;
+import java.util.UUID;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KafkaEventDeserializerTest {
+  private KafkaEventDeserializer deserializer;
+
+  @Before
+  public void setUp() {
+    final Provider<Gson> gsonProvider = buildGsonProvider();
+    deserializer = new KafkaEventDeserializer(gsonProvider);
+  }
+
+  @Test
+  public void kafkaEventDeserializerShouldParseAKafkaEvent() {
+    final UUID eventId = UUID.randomUUID();
+    final String eventType = "event-type";
+    final UUID sourceInstanceId = UUID.randomUUID();
+    final long eventCreatedOn = 10L;
+    final String eventJson =
+        String.format(
+            "{ "
+                + "\"header\": { \"eventId\": \"%s\", \"eventType\": \"%s\", \"sourceInstanceId\": \"%s\", \"eventCreatedOn\": %d },"
+                + "\"body\": {}"
+                + "}",
+            eventId, eventType, sourceInstanceId, eventCreatedOn);
+    final SourceAwareEventWrapper event = deserializer.deserialize("ignored", eventJson.getBytes());
+
+    assertThat(event.getBody().entrySet()).isEmpty();
+    assertThat(event.getHeader().getEventId()).isEqualTo(eventId);
+    assertThat(event.getHeader().getEventType()).isEqualTo(eventType);
+    assertThat(event.getHeader().getSourceInstanceId()).isEqualTo(sourceInstanceId);
+    assertThat(event.getHeader().getEventCreatedOn()).isEqualTo(eventCreatedOn);
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void kafkaEventDeserializerShouldFailForInvalidJson() {
+    deserializer.deserialize("ignored", "this is not a JSON string".getBytes());
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void kafkaEventDeserializerShouldFailForInvalidObjectButValidJSON() {
+    deserializer.deserialize("ignored", "{}".getBytes());
+  }
+
+  private Provider<Gson> buildGsonProvider() {
+    Gson gson =
+        new GsonBuilder()
+            .registerTypeAdapter(Event.class, new EventDeserializer())
+            .registerTypeAdapter(Supplier.class, new SupplierDeserializer())
+            .create();
+    return () -> gson;
+  }
+}