blob: bde070550cb2ac4e83e7501d0d102dd56b83c96c [file] [log] [blame]
// Copyright (C) 2018 GerritForge Ltd
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoField.{MILLI_OF_SECOND, NANO_OF_SECOND}
import java.time.{ZoneId, ZonedDateTime}
import{CommitInfo, UserActivitySummary}
import org.apache.spark.rdd.RDD
import org.scalatest.{Inside, Matchers, WordSpec}
class GerritEventsTransformationsSpec extends WordSpec with Matchers with SparkTestSupport with Inside with EventFixture {
"tryParseGerritTriggeredEvent" should {
implicit val eventParser: GerritJsonEventParser = EventParser
"Parse a correctly formed event" in new EventFixture {
GerritEventsTransformations.tryParseGerritTriggeredEvent(refUpdated.json) shouldBe Right(refUpdated.event)
"Return a description of the failure in with the original event source a Left object if the JSON provided is invalid" in {
val invalidJson = "invalid json string"
GerritEventsTransformations.tryParseGerritTriggeredEvent(invalidJson) shouldBe Left(NotParsableJsonEvent(invalidJson, "unknown token i - Near: i"))
"Return a description of the failure with the original event source in a Left object if the JSON event is not supported" in {
val unsupportedEvent = """{"type":"ref-updated-UNSUPPORTED","eventCreatedOn":1516531868}"""
GerritEventsTransformations.tryParseGerritTriggeredEvent(unsupportedEvent) shouldBe Left(NotParsableJsonEvent(unsupportedEvent, "Unsupported event type 'ref-updated-UNSUPPORTED'"))
"PimpedJsonRDD" should {
"Convert an RDD of JSON events into an RDD of events or unparsed json strings" in {
val jsonRdd = sc.parallelize(Seq(refUpdated.json, changeMerged.json, "invalid json string"))
import GerritEventsTransformations._
jsonRdd.parseEvents(EventParser).collect() should contain only(
Left(NotParsableJsonEvent("invalid json string", "unknown token i - Near: i"))
"extractUserActivitySummary" should {
implicit val eventParser: GerritJsonEventParser = EventParser
"Build one UserActivitySummary object if given a series of non-merge commits" in {
val events: Seq[ChangeMergedEvent] = Seq(
aChangeMergedEvent("1", 1001l, newRev = "rev1", insertions = 2, deletions = 1),
aChangeMergedEvent("2", 1002l, newRev = "rev2", insertions = 3, deletions = 0),
aChangeMergedEvent("3", 1003l, newRev = "rev3", insertions = 1, deletions = 4),
aChangeMergedEvent("4", 1004l, newRev = "rev4", insertions = 0, deletions = 2),
aChangeMergedEvent("5", 1005l, newRev = "rev5", insertions = 1, deletions = 1)
val summaries: Iterable[UserActivitySummary] = GerritEventsTransformations.extractUserActivitySummary(
changes = events,
year = 2018, month = 1, day = 10, hour = 1
summaries should have size 1
inside(summaries.head) {
case UserActivitySummary(year, month, day, hour, name, email, num_commits, _, _, added_lines, deleted_lines, commits, last_commit_date, is_merge) =>
year shouldBe 2018
month shouldBe 1
day shouldBe 10
hour shouldBe 1
name shouldBe "Administrator"
email shouldBe ""
num_commits shouldBe events.size
last_commit_date shouldBe 1005000l
is_merge shouldBe false
added_lines shouldBe 7
deleted_lines shouldBe 8
commits should contain only(
CommitInfo("rev1", 1001000l, false),
CommitInfo("rev2", 1002000l, false),
CommitInfo("rev3", 1003000l, false),
CommitInfo("rev4", 1004000l, false),
CommitInfo("rev5", 1005000l, false)
"Build two UserActivitySummaries object if given a mixed series of merge and non merge commits" in {
val events: Seq[ChangeMergedEvent] = Seq(
aChangeMergedEvent("1", 1001l, newRev = "rev1", isMergeCommit = true),
aChangeMergedEvent("2", 1002l, newRev = "rev2"),
aChangeMergedEvent("3", 1003l, newRev = "rev3", isMergeCommit = true),
aChangeMergedEvent("4", 1004l, newRev = "rev4"),
aChangeMergedEvent("5", 1005l, newRev = "rev5")
val summaries: Iterable[UserActivitySummary] = GerritEventsTransformations.extractUserActivitySummary(
changes = events,
year = 2018, month = 1, day = 10, hour = 1
summaries should have size 2
summaries.foreach { summary =>
inside(summary) {
case UserActivitySummary(year, month, day, hour, name, email,_, _, _, _, _, _, _, _) =>
year shouldBe 2018
month shouldBe 1
day shouldBe 10
hour shouldBe 1
name shouldBe "Administrator"
email shouldBe ""
summaries.foreach { summary =>
inside(summary) {
case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, last_commit_date, false) =>
num_commits shouldBe 3
last_commit_date shouldBe 1005000l
commits should contain only(
CommitInfo("rev2", 1002000l, false),
CommitInfo("rev4", 1004000l, false),
CommitInfo("rev5", 1005000l, false)
case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, last_commit_date, true) =>
num_commits shouldBe 2
last_commit_date shouldBe 1003000l
commits should contain only(
CommitInfo("rev1", 1001000l, true),
CommitInfo("rev3", 1003000l, true)
"Pimped Per Project UserActivitySummary RDD" should {
"Allow conversion to a DataFrame equivalent to what extracted from the Analytics plugin" in {
import GerritEventsTransformations._
import spark.implicits._
val aliasDF = sc.parallelize(Seq(
("stefano_alias", "", "")
)).toDF("author", "email", "organization")
val expectedDate : ZonedDateTime ="UTC")).`with`(MILLI_OF_SECOND, 0).`with`(NANO_OF_SECOND, 0)
val analyticsJobOutput =
"project1" -> UserActivitySummary(2018, 1, 20, 10, "Stefano", "", 1, 2, 1, 10, 4, Array(CommitInfo("sha1", expectedDate.toInstant.toEpochMilli, false)),
expectedDate.toInstant.toEpochMilli, false)
val expected = sc.parallelize(Seq(
("project1", "stefano_alias", "", 2018, 1, 20, 10, 2, 1, 10, 4, 1, expectedDate.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME), false, "galarraga-org")
)).toDF("project", "author", "email", "year", "month", "day", "hour", "num_files", "num_distinct_files",
"added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "organization")
analyticsJobOutput.collect() should contain theSameElementsAs expected.collect()
"removeEventsForCommits" should {
"remove any event modifying a gerrit repo to go toward the commits to be excluded" in {
import GerritEventsTransformations._
val toKeep1 = aRefUpdatedEvent("oldRev", "RevToKeep")
val toKeep2 = aChangeMergedEvent(changeId = "changeId2", newRev = "RevToKeep2")
val events: RDD[GerritRefHasNewRevisionEvent] = sc.parallelize(Seq(
aRefUpdatedEvent("oldRev", "RevToExclude1"),
aRefUpdatedEvent("oldRev", "RevToExclude2"),
aChangeMergedEvent(changeId = "changeId1", newRev = "RevToExclude3"),
.removeEventsForCommits(sc.parallelize(Seq("RevToExclude1", "RevToExclude2", "RevToExclude3")))
.collect() should contain only (toKeep1.event, toKeep2.event )
trait EventFixture {
// Forcing early type failures
case class JsonEvent[T <: GerritJsonEvent](json: String) {
val event: T = EventParser.fromJson(json).get.asInstanceOf[T]
val refUpdated: JsonEvent[RefUpdatedEvent] = aRefUpdatedEvent(oldRev = "863b64002f2a9922deba69407804a44703c996e0", newRev = "d3131be8d7c920badd28b70d8c039682568c8de5")
val changeMerged: JsonEvent[ChangeMergedEvent] = aChangeMergedEvent("I5e6b5a3bbe8a29fb0393e4a28da536e0a198b755")
def aRefUpdatedEvent(oldRev: String, newRev: String, createdOn: Long = 1000l) = JsonEvent[RefUpdatedEvent](
| "refUpdate":{"oldRev":"$oldRev",
| "newRev":"$newRev",
| "refName": "refs/heads/master","project":"subcut"},
def aChangeMergedEvent(changeId: String, createdOnInSecs: Long = 1000l, newRev: String = "863b64002f2a9922deba69407804a44703c996e0",
isMergeCommit: Boolean = false, insertions: Integer = 0, deletions: Integer = 0) = JsonEvent[ChangeMergedEvent](
| "number":1,
| "revision":"$newRev",
| "parents": ${if (isMergeCommit) """["4a4e59272f1f88824d805c0f4233c1ee7331e986", "4a4e59272f1f88824d805c0f4233c1ee7331e987"]""" else """["4a4e59272f1f88824d805c0f4233c1ee7331e986"]"""},
| "ref":"refs/changes/01/1/1",
| "uploader":{"name":"Administrator","email":"","username":"admin"},
| "createdOn":1516530259,
| "author":{"name":"Stefano Galarraga","email":"","username":""},
| "isDraft":false,
| "kind":"REWORK",
| "sizeInsertions":$insertions,
| "sizeDeletions":$deletions
| "project":"subcut","branch":"master","topic":"TestEvents","id":"$changeId","number":1,"subject":"Generating some changes to test events",
| "owner":{"name":"Administrator","email":"","username":"admin"},
| "url":"http://842860da5b33:8080/1","commitMessage":"Generating some changes to test events Change-Id: $changeId",
| "createdOn":1516530259,"status":"MERGED"
|"eventCreatedOn": $createdOnInSecs