Merge branch 'stable-3.4'

* stable-3.4:
  Set version to 3.4.0.4
  Do not validate if Event.instanceId is populated
  Set version to 3.4.0.3
  Events-broker sends/receives Event object instead of EventMessage
  Set version to 3.4.0.2
  Use EventGson annotation from Gerrit core
  Set version to 3.4.0.1
  Add event deserialization login
  Set events-broker and Gerrit to 3.4.0
  Make send method async
  Set Gerrit and plugin to v3.4.0-rc2
  Change source instance id type from UUID to String

Change-Id: I8b419e97914e87cf37c78eb3b20380418337ab30
diff --git a/pom.xml b/pom.xml
index 9581073..68f0160 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,7 +39,7 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <auto-value.version>1.7.4</auto-value.version>
-        <gerrit.version>3.5.0-SNAPSHOT</gerrit.version>
+        <gerrit.version>3.4.0</gerrit.version>
     </properties>
 
     <dependencies>
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java
index a74bdc2..0894436 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApi.java
@@ -14,35 +14,22 @@
 
 package com.gerritforge.gerrit.eventbroker;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.server.events.Event;
 import java.util.Set;
-import java.util.UUID;
 import java.util.function.Consumer;
 
 /** API for sending/receiving events through a message Broker. */
 public interface BrokerApi {
 
   /**
-   * Creates a {@link EventMessage} for an event
-   *
-   * @param instanceId {@link UUID} of the Gerrit instance originating the event
-   * @param event Gerrit event
-   * @return {@link EventMessage} object
-   */
-  default EventMessage newMessage(UUID instanceId, Event event) {
-
-    return new EventMessage(new Header(UUID.randomUUID(), instanceId), event);
-  }
-
-  /**
-   * Send an message to a topic.
+   * Send a message to a topic.
    *
    * @param topic topic name
    * @param message to be send to the topic
-   * @return true if the message was successfully sent. False otherwise.
+   * @return a future that returns when the message has been sent.
    */
-  boolean send(String topic, EventMessage message);
+  ListenableFuture<Boolean> send(String topic, Event message);
 
   /**
    * Receive asynchronously a message from a topic.
@@ -50,7 +37,7 @@
    * @param topic topic name
    * @param consumer an operation that accepts and process a single message
    */
-  void receiveAsync(String topic, Consumer<EventMessage> consumer);
+  void receiveAsync(String topic, Consumer<Event> consumer);
 
   /**
    * Get the active subscribers
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApiModule.java b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApiModule.java
index ae6f216..10f3fc0 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApiModule.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/BrokerApiModule.java
@@ -33,5 +33,6 @@
       DynamicItem.itemOf(binder(), BrokerApi.class);
       DynamicItem.bind(binder(), BrokerApi.class).to(InProcessBrokerApi.class).in(Scopes.SINGLETON);
     }
+    bind(EventDeserializer.class).in(Scopes.SINGLETON);
   }
 }
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/EventDeserializer.java b/src/main/java/com/gerritforge/gerrit/eventbroker/EventDeserializer.java
new file mode 100644
index 0000000..986fab2
--- /dev/null
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/EventDeserializer.java
@@ -0,0 +1,53 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.Strings;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGson;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+
+public class EventDeserializer {
+
+  private Gson gson;
+
+  @Inject
+  public EventDeserializer(@EventGson Gson gson) {
+    this.gson = gson;
+  }
+
+  public Event deserialize(String json) {
+    Event resultEvent;
+    EventMessage eventMessage = gson.fromJson(json, EventMessage.class);
+    if (eventMessage.getEvent() == null && eventMessage.getHeader() == null) {
+      resultEvent = gson.fromJson(json, Event.class);
+    } else {
+      eventMessage.validate();
+      resultEvent = eventMessage.getEvent();
+      if (Strings.isNullOrEmpty(resultEvent.instanceId)) {
+        resultEvent.instanceId = eventMessage.getHeader().sourceInstanceId;
+      }
+    }
+    validate(resultEvent);
+    return resultEvent;
+  }
+
+  private void validate(Event event) {
+    requireNonNull(event.type, "Event type cannot be null");
+  }
+}
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/EventMessage.java b/src/main/java/com/gerritforge/gerrit/eventbroker/EventMessage.java
index 204173e..0684ae4 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/EventMessage.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/EventMessage.java
@@ -48,12 +48,16 @@
     public final UUID eventId;
 
     /** Gerrit server instance id from which event was sent. */
-    public final UUID sourceInstanceId;
+    public final String sourceInstanceId;
 
     /** @deprecated required for interoperability with older JSON wire protocols */
     public final String eventType;
 
     public Header(UUID eventId, UUID sourceInstanceId) {
+      this(eventId, sourceInstanceId.toString());
+    }
+
+    public Header(UUID eventId, String sourceInstanceId) {
       this.eventId = eventId;
       this.sourceInstanceId = sourceInstanceId;
       this.eventType = "";
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
index c8daa0e..a039d24 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/InProcessBrokerApi.java
@@ -22,6 +22,9 @@
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.gerrit.server.events.Event;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -32,7 +35,7 @@
 
   private static final Integer DEFAULT_MESSAGE_QUEUE_SIZE = 100;
 
-  private final Map<String, EvictingQueue<EventMessage>> messagesQueueMap;
+  private final Map<String, EvictingQueue<Event>> messagesQueueMap;
   private final Map<String, EventBus> eventBusMap;
   private final Set<TopicSubscriber> topicSubscribers;
 
@@ -43,21 +46,22 @@
   }
 
   @Override
-  public boolean send(String topic, EventMessage message) {
+  public ListenableFuture<Boolean> send(String topic, Event message) {
     EventBus topicEventConsumers = eventBusMap.get(topic);
-    try {
-      if (topicEventConsumers != null) {
-        topicEventConsumers.post(message);
-      }
-    } catch (RuntimeException e) {
-      log.atSevere().withCause(e).log();
-      return false;
+    SettableFuture<Boolean> future = SettableFuture.create();
+
+    if (topicEventConsumers != null) {
+      topicEventConsumers.post(message);
+      future.set(true);
+    } else {
+      future.set(false);
     }
-    return true;
+
+    return future;
   }
 
   @Override
-  public void receiveAsync(String topic, Consumer<EventMessage> eventConsumer) {
+  public void receiveAsync(String topic, Consumer<Event> eventConsumer) {
     EventBus topicEventConsumers = eventBusMap.get(topic);
     if (topicEventConsumers == null) {
       topicEventConsumers = new EventBus(topic);
@@ -67,7 +71,7 @@
     topicEventConsumers.register(eventConsumer);
     topicSubscribers.add(topicSubscriber(topic, eventConsumer));
 
-    EvictingQueue<EventMessage> messageQueue = EvictingQueue.create(DEFAULT_MESSAGE_QUEUE_SIZE);
+    EvictingQueue<Event> messageQueue = EvictingQueue.create(DEFAULT_MESSAGE_QUEUE_SIZE);
     messagesQueueMap.put(topic, messageQueue);
     topicEventConsumers.register(new EventBusMessageRecorder(messageQueue));
   }
@@ -97,7 +101,7 @@
     }
 
     @Subscribe
-    public void recordCustomerChange(EventMessage e) {
+    public void recordCustomerChange(Event e) {
       if (!messagesQueue.contains(e)) {
         messagesQueue.add(e);
       }
diff --git a/src/main/java/com/gerritforge/gerrit/eventbroker/TopicSubscriber.java b/src/main/java/com/gerritforge/gerrit/eventbroker/TopicSubscriber.java
index afe09f2..3fecc48 100644
--- a/src/main/java/com/gerritforge/gerrit/eventbroker/TopicSubscriber.java
+++ b/src/main/java/com/gerritforge/gerrit/eventbroker/TopicSubscriber.java
@@ -15,15 +15,16 @@
 package com.gerritforge.gerrit.eventbroker;
 
 import com.google.auto.value.AutoValue;
+import com.google.gerrit.server.events.Event;
 import java.util.function.Consumer;
 
 @AutoValue
 public abstract class TopicSubscriber {
-  public static TopicSubscriber topicSubscriber(String topic, Consumer<EventMessage> consumer) {
+  public static TopicSubscriber topicSubscriber(String topic, Consumer<Event> consumer) {
     return new AutoValue_TopicSubscriber(topic, consumer);
   }
 
   public abstract String topic();
 
-  public abstract Consumer<EventMessage> consumer();
+  public abstract Consumer<Event> consumer();
 }
diff --git a/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java b/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
index dd0edfa..5f6c73d 100644
--- a/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
+++ b/src/test/java/com/gerritforge/gerrit/eventbroker/BrokerApiTest.java
@@ -29,6 +29,9 @@
 import com.google.gson.JsonObject;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,8 +44,9 @@
 @RunWith(MockitoJUnitRunner.class)
 public class BrokerApiTest {
 
-  @Captor ArgumentCaptor<EventMessage> eventCaptor;
-  Consumer<EventMessage> eventConsumer;
+  public static final int SEND_FUTURE_TIMEOUT = 1;
+  @Captor ArgumentCaptor<Event> eventCaptor;
+  Consumer<Event> eventConsumer;
 
   BrokerApi brokerApiUnderTest;
   UUID instanceId = UUID.randomUUID();
@@ -55,31 +59,37 @@
   }
 
   @Test
-  public void shouldSendEvent() {
+  public void shouldSendEvent() throws InterruptedException, TimeoutException, ExecutionException {
     ProjectCreatedEvent event = new ProjectCreatedEvent();
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
 
-    assertThat(brokerApiUnderTest.send("topic", wrap(event))).isTrue();
+    assertThat(brokerApiUnderTest.send("topic", wrap(event)).get(1, TimeUnit.SECONDS)).isTrue();
+
     compareWithExpectedEvent(eventConsumer, eventCaptor, event);
   }
 
-  private EventMessage wrap(ProjectCreatedEvent event) {
-    return brokerApiUnderTest.newMessage(instanceId, event);
+  private Event wrap(ProjectCreatedEvent event) {
+    return event;
   }
 
   @Test
-  public void shouldRegisterConsumerPerTopic() {
-    Consumer<EventMessage> secondConsumer = mockEventConsumer();
-    ArgumentCaptor<EventMessage> secondArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
+  public void shouldRegisterConsumerPerTopic()
+      throws InterruptedException, TimeoutException, ExecutionException {
+    Consumer<Event> secondConsumer = mockEventConsumer();
+    ArgumentCaptor<Event> secondArgCaptor = ArgumentCaptor.forClass(Event.class);
 
     ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
     ProjectCreatedEvent eventForTopic2 = testProjectCreatedEvent("Project name 2");
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
     brokerApiUnderTest.receiveAsync("topic2", secondConsumer);
-    brokerApiUnderTest.send("topic", wrap(eventForTopic));
-    brokerApiUnderTest.send("topic2", wrap(eventForTopic2));
+    brokerApiUnderTest
+        .send("topic", wrap(eventForTopic))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
+    brokerApiUnderTest
+        .send("topic2", wrap(eventForTopic2))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
     compareWithExpectedEvent(secondConsumer, secondArgCaptor, eventForTopic2);
@@ -87,10 +97,10 @@
 
   @Test
   public void shouldReturnMapOfConsumersPerTopic() {
-    Consumer<EventMessage> firstConsumerTopicA = mockEventConsumer();
+    Consumer<Event> firstConsumerTopicA = mockEventConsumer();
 
-    Consumer<EventMessage> secondConsumerTopicA = mockEventConsumer();
-    Consumer<EventMessage> thirdConsumerTopicB = mockEventConsumer();
+    Consumer<Event> secondConsumerTopicA = mockEventConsumer();
+    Consumer<Event> thirdConsumerTopicB = mockEventConsumer();
 
     brokerApiUnderTest.receiveAsync("TopicA", firstConsumerTopicA);
     brokerApiUnderTest.receiveAsync("TopicA", secondConsumerTopicA);
@@ -108,83 +118,99 @@
   }
 
   @Test
-  public void shouldDeliverEventToAllRegisteredConsumers() {
-    Consumer<EventMessage> secondConsumer = mockEventConsumer();
-    ArgumentCaptor<EventMessage> secondArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
+  public void shouldDeliverAsynchronouslyEventToAllRegisteredConsumers()
+      throws InterruptedException, TimeoutException, ExecutionException {
+    Consumer<Event> secondConsumer = mockEventConsumer();
+    ArgumentCaptor<Event> secondArgCaptor = ArgumentCaptor.forClass(Event.class);
 
     ProjectCreatedEvent event = testProjectCreatedEvent("Project name");
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
     brokerApiUnderTest.receiveAsync("topic", secondConsumer);
-    brokerApiUnderTest.send("topic", wrap(event));
+    brokerApiUnderTest.send("topic", wrap(event)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     compareWithExpectedEvent(eventConsumer, eventCaptor, event);
     compareWithExpectedEvent(secondConsumer, secondArgCaptor, event);
   }
 
   @Test
-  public void shouldReceiveEventsOnlyFromRegisteredTopic() {
+  public void shouldReceiveEventsOnlyFromRegisteredTopic()
+      throws InterruptedException, TimeoutException, ExecutionException {
 
     ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
 
     ProjectCreatedEvent eventForTopic2 = testProjectCreatedEvent("Project name 2");
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-    brokerApiUnderTest.send("topic", wrap(eventForTopic));
-    brokerApiUnderTest.send("topic2", wrap(eventForTopic2));
+    brokerApiUnderTest
+        .send("topic", wrap(eventForTopic))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
+    brokerApiUnderTest
+        .send("topic2", wrap(eventForTopic2))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
   }
 
   @Test
-  public void shouldNotRegisterTheSameConsumerTwicePerTopic() {
+  public void shouldNotRegisterTheSameConsumerTwicePerTopic()
+      throws InterruptedException, TimeoutException, ExecutionException {
     ProjectCreatedEvent event = new ProjectCreatedEvent();
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-    brokerApiUnderTest.send("topic", wrap(event));
+    brokerApiUnderTest.send("topic", wrap(event)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     compareWithExpectedEvent(eventConsumer, eventCaptor, event);
   }
 
   @Test
-  public void shouldReconnectSubscribers() {
-    ArgumentCaptor<EventMessage> newConsumerArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
+  public void shouldReconnectSubscribers()
+      throws InterruptedException, TimeoutException, ExecutionException {
+    ArgumentCaptor<Event> newConsumerArgCaptor = ArgumentCaptor.forClass(Event.class);
 
     ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
-    brokerApiUnderTest.send("topic", wrap(eventForTopic));
+    brokerApiUnderTest
+        .send("topic", wrap(eventForTopic))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     compareWithExpectedEvent(eventConsumer, eventCaptor, eventForTopic);
 
-    Consumer<EventMessage> newConsumer = mockEventConsumer();
+    Consumer<Event> newConsumer = mockEventConsumer();
 
     clearInvocations(eventConsumer);
 
     brokerApiUnderTest.disconnect();
     brokerApiUnderTest.receiveAsync("topic", newConsumer);
-    brokerApiUnderTest.send("topic", wrap(eventForTopic));
+    brokerApiUnderTest
+        .send("topic", wrap(eventForTopic))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     compareWithExpectedEvent(newConsumer, newConsumerArgCaptor, eventForTopic);
     verify(eventConsumer, never()).accept(eventCaptor.capture());
   }
 
   @Test
-  public void shouldDisconnectSubscribers() {
+  public void shouldDisconnectSubscribers()
+      throws InterruptedException, TimeoutException, ExecutionException {
     ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
     brokerApiUnderTest.disconnect();
 
-    brokerApiUnderTest.send("topic", wrap(eventForTopic));
+    brokerApiUnderTest
+        .send("topic", wrap(eventForTopic))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     verify(eventConsumer, never()).accept(eventCaptor.capture());
   }
 
   @Test
-  public void shouldBeAbleToSwitchBrokerAndReconnectSubscribers() {
-    ArgumentCaptor<EventMessage> newConsumerArgCaptor = ArgumentCaptor.forClass(EventMessage.class);
+  public void shouldBeAbleToSwitchBrokerAndReconnectSubscribers()
+      throws InterruptedException, TimeoutException, ExecutionException {
+    ArgumentCaptor<Event> newConsumerArgCaptor = ArgumentCaptor.forClass(Event.class);
 
     ProjectCreatedEvent eventForTopic = testProjectCreatedEvent("Project name");
 
@@ -194,22 +220,29 @@
 
     clearInvocations(eventConsumer);
 
-    brokerApiUnderTest.send("topic", wrap(eventForTopic));
+    brokerApiUnderTest
+        .send("topic", wrap(eventForTopic))
+        .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
     verify(eventConsumer, never()).accept(eventCaptor.capture());
 
     clearInvocations(eventConsumer);
-    secondaryBroker.send("topic", wrap(eventForTopic));
+    secondaryBroker.send("topic", wrap(eventForTopic)).get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS);
 
     compareWithExpectedEvent(eventConsumer, newConsumerArgCaptor, eventForTopic);
   }
 
   @Test
-  public void shouldReplayAllEvents() {
+  public void shouldReplayAllEvents()
+      throws InterruptedException, TimeoutException, ExecutionException {
     ProjectCreatedEvent event = new ProjectCreatedEvent();
 
     brokerApiUnderTest.receiveAsync("topic", eventConsumer);
 
-    assertThat(brokerApiUnderTest.send("topic", wrap(event))).isTrue();
+    assertThat(
+            brokerApiUnderTest
+                .send("topic", wrap(event))
+                .get(SEND_FUTURE_TIMEOUT, TimeUnit.SECONDS))
+        .isTrue();
 
     verify(eventConsumer, times(1)).accept(eventCaptor.capture());
     compareWithExpectedEvent(eventConsumer, eventCaptor, event);
@@ -232,11 +265,11 @@
     return eventForTopic;
   }
 
-  private interface Subscriber extends Consumer<EventMessage> {
+  private interface Subscriber extends Consumer<Event> {
 
     @Override
     @Subscribe
-    void accept(EventMessage eventMessage);
+    void accept(Event eventMessage);
   }
 
   @SuppressWarnings("unchecked")
@@ -245,11 +278,9 @@
   }
 
   private void compareWithExpectedEvent(
-      Consumer<EventMessage> eventConsumer,
-      ArgumentCaptor<EventMessage> eventCaptor,
-      Event expectedEvent) {
+      Consumer<Event> eventConsumer, ArgumentCaptor<Event> eventCaptor, Event expectedEvent) {
     verify(eventConsumer, times(1)).accept(eventCaptor.capture());
-    assertThat(eventCaptor.getValue().getEvent()).isEqualTo(expectedEvent);
+    assertThat(eventCaptor.getValue()).isEqualTo(expectedEvent);
   }
 
   private JsonObject eventToJson(Event event) {
diff --git a/src/test/java/com/gerritforge/gerrit/eventbroker/EventDeserializerTest.java b/src/test/java/com/gerritforge/gerrit/eventbroker/EventDeserializerTest.java
new file mode 100644
index 0000000..1bc2535
--- /dev/null
+++ b/src/test/java/com/gerritforge/gerrit/eventbroker/EventDeserializerTest.java
@@ -0,0 +1,95 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.gerrit.eventbroker;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGsonProvider;
+import com.google.gson.Gson;
+import java.util.UUID;
+import org.junit.Before;
+import org.junit.Test;
+
+public class EventDeserializerTest {
+  private EventDeserializer deserializer;
+
+  @Before
+  public void setUp() {
+    final Gson gson = new EventGsonProvider().get();
+    deserializer = new EventDeserializer(gson);
+  }
+
+  @Test
+  public void eventDeserializerShouldParseEventMessage() {
+    final UUID eventId = UUID.randomUUID();
+    final String eventType = "event-type";
+    final String sourceInstanceId = UUID.randomUUID().toString();
+    final long eventCreatedOn = 10L;
+    final String eventJson =
+        String.format(
+            "{ "
+                + "\"header\": { \"eventId\": \"%s\", \"eventType\": \"%s\", \"sourceInstanceId\": \"%s\", \"eventCreatedOn\": %d },"
+                + "\"body\": { \"type\": \"project-created\" }"
+                + "}",
+            eventId, eventType, sourceInstanceId, eventCreatedOn);
+    final Event event = deserializer.deserialize(eventJson);
+
+    assertThat(event.instanceId).isEqualTo(sourceInstanceId);
+  }
+
+  @Test
+  public void eventDeserializerShouldParseEvent() {
+    final String eventJson = "{ \"type\": \"project-created\", \"instanceId\":\"instance-id\" }";
+    final Event event = deserializer.deserialize(eventJson);
+
+    assertThat(event.instanceId).isEqualTo("instance-id");
+  }
+
+  @Test
+  public void eventDeserializerShouldParseEventWithoutInstanceId() {
+    final String eventJson = "{ \"type\": \"project-created\" }";
+    final Event event = deserializer.deserialize(eventJson);
+
+    assertThat(event.instanceId).isNull();
+  }
+
+  @Test
+  public void eventDeserializerShouldParseEventWhenInstanceIdIsEmpty() {
+    final String eventJson = "{ \"type\": \"project-created\", \"instanceId\":\"\" }";
+    final Event event = deserializer.deserialize(eventJson);
+
+    assertThat(event.instanceId).isEmpty();
+  }
+
+  @Test
+  public void eventDeserializerShouldParseEventWithHeaderAndBodyProjectName() {
+    final String eventJson =
+        "{\"projectName\":\"header_body_parser_project\",\"type\":\"project-created\", \"instanceId\":\"instance-id\"}";
+    final Event event = deserializer.deserialize(eventJson);
+
+    assertThat(event.instanceId).isEqualTo("instance-id");
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void eventDeserializerShouldFailForInvalidJson() {
+    deserializer.deserialize("this is not a JSON string");
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void eventDeserializerShouldFailForInvalidObjectButValidJSON() {
+    deserializer.deserialize("{}");
+  }
+}