EventConsumerIT: Assert on event content
This change modifies the tests not only to assert the event types, but
the content of the events as well.
Change-Id: I4e323761b41b32acb3501e8288ca3aeb7483f7d4
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index fcedfaf..69bd4e9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -15,27 +15,42 @@
package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.toSet;
import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
import com.google.gerrit.acceptance.LogThreshold;
import com.google.gerrit.acceptance.NoHttpd;
-import com.google.gerrit.acceptance.PushOneCommit;
import com.google.gerrit.acceptance.TestPlugin;
import com.google.gerrit.extensions.api.changes.ReviewInput;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.data.PatchSetAttribute;
+import com.google.gerrit.server.events.CommentAddedEvent;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.PatchSetCreatedEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.query.change.ChangeData;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.Module;
import com.googlesource.gerrit.plugins.multisite.NoteDbStatus;
+import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.revwalk.RevWalk;
import org.junit.Before;
import org.junit.Test;
import org.testcontainers.containers.KafkaContainer;
@@ -112,28 +127,87 @@
LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
drainQueue(droppedEventsQueue);
- createChange();
- List<String> createdChangeEvents = receiveFromQueue(droppedEventsQueue);
+ ChangeData change = createChange().getChange();
+ String project = change.project().get();
+ int changeNum = change.getId().get();
+ String changeNotesRef = change.notes().getRefName();
+ int patchsetNum = change.currentPatchSet().getPatchSetId();
+ String patchsetRevision = change.currentPatchSet().getRevision().get();
+ String patchsetRef = change.currentPatchSet().getRefName();
- assertThat(createdChangeEvents).contains("change-index");
- assertThat(createdChangeEvents).contains("ref-updated");
- assertThat(createdChangeEvents).contains("patchset-created");
+ Map<String, List<Event>> eventsByType = receiveEventsByType(droppedEventsQueue);
+ assertThat(eventsByType.get("change-index"))
+ .containsExactly(createChangeIndexEvent(project, changeNum, getParentCommit(change)));
+
+ assertThat(
+ eventsByType
+ .get("ref-updated")
+ .stream()
+ .map(e -> ((RefUpdatedEvent) e).getRefName())
+ .collect(toSet()))
+ .containsAllOf(
+ changeNotesRef,
+ patchsetRef); // 'refs/sequences/changes' not always updated thus not checked
+
+ List<Event> patchSetCreatedEvents = eventsByType.get("patchset-created");
+ assertThat(patchSetCreatedEvents).hasSize(1);
+ assertPatchSetAttributes(
+ (PatchSetCreatedEvent) patchSetCreatedEvents.get(0),
+ patchsetNum,
+ patchsetRevision,
+ patchsetRef);
+ }
+
+ private void assertPatchSetAttributes(
+ PatchSetCreatedEvent patchSetCreated,
+ int patchsetNum,
+ String patchsetRevision,
+ String patchsetRef) {
+ PatchSetAttribute patchSetAttribute = patchSetCreated.patchSet.get();
+ assertThat(patchSetAttribute.number).isEqualTo(patchsetNum);
+ assertThat(patchSetAttribute.revision).isEqualTo(patchsetRevision);
+ assertThat(patchSetAttribute.ref).isEqualTo(patchsetRef);
}
@Test
public void reviewChangeShouldPropagateChangeIndexAndCommentAdded() throws Exception {
LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
- PushOneCommit.Result r = createChange();
+ ChangeData change = createChange().getChange();
+ String project = change.project().get();
+ int changeNum = change.getId().get();
drainQueue(droppedEventsQueue);
ReviewInput in = ReviewInput.recommend();
in.message = "LGTM";
- gApi.changes().id(r.getChangeId()).revision("current").review(in);
+ gApi.changes().id(changeNum).revision("current").review(in);
- List<String> createdChangeEvents = receiveFromQueue(droppedEventsQueue);
+ Map<String, List<Event>> eventsByType = receiveEventsByType(droppedEventsQueue);
- assertThat(createdChangeEvents).contains("change-index");
- assertThat(createdChangeEvents).contains("comment-added");
+ assertThat(eventsByType.get("change-index"))
+ .containsExactly(createChangeIndexEvent(project, changeNum, getParentCommit(change)));
+
+ List<Event> commentAddedEvents = eventsByType.get("comment-added");
+ assertThat(commentAddedEvents).hasSize(1);
+ assertThat(((CommentAddedEvent) commentAddedEvents.get(0)).comment)
+ .isEqualTo("Patch Set 1: Code-Review+1\n\n" + in.message);
+ }
+
+ private String getParentCommit(ChangeData change) throws Exception {
+ RevCommit parent;
+ try (Repository repo = repoManager.openRepository(change.project());
+ RevWalk walk = new RevWalk(repo)) {
+ RevCommit commit =
+ walk.parseCommit(ObjectId.fromString(change.currentPatchSet().getRevision().get()));
+ parent = commit.getParent(0);
+ }
+ return parent.getId().name();
+ }
+
+ private ChangeIndexEvent createChangeIndexEvent(
+ String projectName, int changeId, String targetSha1) {
+ ChangeIndexEvent event = new ChangeIndexEvent(projectName, changeId, false);
+ event.targetSha = targetSha1;
+ return event;
}
private LinkedBlockingQueue<SourceAwareEventWrapper> captureDroppedEvents() throws Exception {
@@ -155,20 +229,22 @@
return droppedEvents;
}
- private List<String> receiveFromQueue(LinkedBlockingQueue<SourceAwareEventWrapper> queue)
- throws InterruptedException {
- List<String> eventsList = new ArrayList<>();
- SourceAwareEventWrapper event;
- while ((event = queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS)) != null) {
- eventsList.add(event.getHeader().getEventType());
- }
- return eventsList;
+ private Map<String, List<Event>> receiveEventsByType(
+ LinkedBlockingQueue<SourceAwareEventWrapper> queue) throws InterruptedException {
+ return drainQueue(queue)
+ .stream()
+ .sorted(Comparator.comparing(e -> e.type))
+ .collect(Collectors.groupingBy(e -> e.type));
}
- private void drainQueue(LinkedBlockingQueue<SourceAwareEventWrapper> queue)
+ private List<Event> drainQueue(LinkedBlockingQueue<SourceAwareEventWrapper> queue)
throws InterruptedException {
- while (queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS) != null) {
- // Just consume the event
+ GsonProvider gsonProvider = plugin.getSysInjector().getInstance(Key.get(GsonProvider.class));
+ SourceAwareEventWrapper event;
+ List<Event> eventsList = new ArrayList<>();
+ while ((event = queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS)) != null) {
+ eventsList.add(event.getEventBody(gsonProvider));
}
+ return eventsList;
}
}