EventConsumerIT: Assert on event content

This change modifies the tests not only to assert the event types, but
the content of the events as well.

Change-Id: I4e323761b41b32acb3501e8288ca3aeb7483f7d4
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index fcedfaf..69bd4e9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -15,27 +15,42 @@
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
 import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.toSet;
 
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.LogThreshold;
 import com.google.gerrit.acceptance.NoHttpd;
-import com.google.gerrit.acceptance.PushOneCommit;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.extensions.api.changes.ReviewInput;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.data.PatchSetAttribute;
+import com.google.gerrit.server.events.CommentAddedEvent;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.PatchSetCreatedEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.query.change.ChangeData;
 import com.google.inject.Inject;
 import com.google.inject.Key;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.Module;
 import com.googlesource.gerrit.plugins.multisite.NoteDbStatus;
+import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.revwalk.RevWalk;
 import org.junit.Before;
 import org.junit.Test;
 import org.testcontainers.containers.KafkaContainer;
@@ -112,28 +127,87 @@
     LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
     drainQueue(droppedEventsQueue);
 
-    createChange();
-    List<String> createdChangeEvents = receiveFromQueue(droppedEventsQueue);
+    ChangeData change = createChange().getChange();
+    String project = change.project().get();
+    int changeNum = change.getId().get();
+    String changeNotesRef = change.notes().getRefName();
+    int patchsetNum = change.currentPatchSet().getPatchSetId();
+    String patchsetRevision = change.currentPatchSet().getRevision().get();
+    String patchsetRef = change.currentPatchSet().getRefName();
 
-    assertThat(createdChangeEvents).contains("change-index");
-    assertThat(createdChangeEvents).contains("ref-updated");
-    assertThat(createdChangeEvents).contains("patchset-created");
+    Map<String, List<Event>> eventsByType = receiveEventsByType(droppedEventsQueue);
+    assertThat(eventsByType.get("change-index"))
+        .containsExactly(createChangeIndexEvent(project, changeNum, getParentCommit(change)));
+
+    assertThat(
+            eventsByType
+                .get("ref-updated")
+                .stream()
+                .map(e -> ((RefUpdatedEvent) e).getRefName())
+                .collect(toSet()))
+        .containsAllOf(
+            changeNotesRef,
+            patchsetRef); // 'refs/sequences/changes' not always updated thus not checked
+
+    List<Event> patchSetCreatedEvents = eventsByType.get("patchset-created");
+    assertThat(patchSetCreatedEvents).hasSize(1);
+    assertPatchSetAttributes(
+        (PatchSetCreatedEvent) patchSetCreatedEvents.get(0),
+        patchsetNum,
+        patchsetRevision,
+        patchsetRef);
+  }
+
+  private void assertPatchSetAttributes(
+      PatchSetCreatedEvent patchSetCreated,
+      int patchsetNum,
+      String patchsetRevision,
+      String patchsetRef) {
+    PatchSetAttribute patchSetAttribute = patchSetCreated.patchSet.get();
+    assertThat(patchSetAttribute.number).isEqualTo(patchsetNum);
+    assertThat(patchSetAttribute.revision).isEqualTo(patchsetRevision);
+    assertThat(patchSetAttribute.ref).isEqualTo(patchsetRef);
   }
 
   @Test
   public void reviewChangeShouldPropagateChangeIndexAndCommentAdded() throws Exception {
     LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
-    PushOneCommit.Result r = createChange();
+    ChangeData change = createChange().getChange();
+    String project = change.project().get();
+    int changeNum = change.getId().get();
     drainQueue(droppedEventsQueue);
 
     ReviewInput in = ReviewInput.recommend();
     in.message = "LGTM";
-    gApi.changes().id(r.getChangeId()).revision("current").review(in);
+    gApi.changes().id(changeNum).revision("current").review(in);
 
-    List<String> createdChangeEvents = receiveFromQueue(droppedEventsQueue);
+    Map<String, List<Event>> eventsByType = receiveEventsByType(droppedEventsQueue);
 
-    assertThat(createdChangeEvents).contains("change-index");
-    assertThat(createdChangeEvents).contains("comment-added");
+    assertThat(eventsByType.get("change-index"))
+        .containsExactly(createChangeIndexEvent(project, changeNum, getParentCommit(change)));
+
+    List<Event> commentAddedEvents = eventsByType.get("comment-added");
+    assertThat(commentAddedEvents).hasSize(1);
+    assertThat(((CommentAddedEvent) commentAddedEvents.get(0)).comment)
+        .isEqualTo("Patch Set 1: Code-Review+1\n\n" + in.message);
+  }
+
+  private String getParentCommit(ChangeData change) throws Exception {
+    RevCommit parent;
+    try (Repository repo = repoManager.openRepository(change.project());
+        RevWalk walk = new RevWalk(repo)) {
+      RevCommit commit =
+          walk.parseCommit(ObjectId.fromString(change.currentPatchSet().getRevision().get()));
+      parent = commit.getParent(0);
+    }
+    return parent.getId().name();
+  }
+
+  private ChangeIndexEvent createChangeIndexEvent(
+      String projectName, int changeId, String targetSha1) {
+    ChangeIndexEvent event = new ChangeIndexEvent(projectName, changeId, false);
+    event.targetSha = targetSha1;
+    return event;
   }
 
   private LinkedBlockingQueue<SourceAwareEventWrapper> captureDroppedEvents() throws Exception {
@@ -155,20 +229,22 @@
     return droppedEvents;
   }
 
-  private List<String> receiveFromQueue(LinkedBlockingQueue<SourceAwareEventWrapper> queue)
-      throws InterruptedException {
-    List<String> eventsList = new ArrayList<>();
-    SourceAwareEventWrapper event;
-    while ((event = queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS)) != null) {
-      eventsList.add(event.getHeader().getEventType());
-    }
-    return eventsList;
+  private Map<String, List<Event>> receiveEventsByType(
+      LinkedBlockingQueue<SourceAwareEventWrapper> queue) throws InterruptedException {
+    return drainQueue(queue)
+        .stream()
+        .sorted(Comparator.comparing(e -> e.type))
+        .collect(Collectors.groupingBy(e -> e.type));
   }
 
-  private void drainQueue(LinkedBlockingQueue<SourceAwareEventWrapper> queue)
+  private List<Event> drainQueue(LinkedBlockingQueue<SourceAwareEventWrapper> queue)
       throws InterruptedException {
-    while (queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS) != null) {
-      // Just consume the event
+    GsonProvider gsonProvider = plugin.getSysInjector().getInstance(Key.get(GsonProvider.class));
+    SourceAwareEventWrapper event;
+    List<Event> eventsList = new ArrayList<>();
+    while ((event = queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS)) != null) {
+      eventsList.add(event.getEventBody(gsonProvider));
     }
+    return eventsList;
   }
 }