| // Copyright (C) 2018 GerritForge Ltd |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.gerritforge.analytics.engine.events |
| |
| import java.time.format.DateTimeFormatter |
| import java.time.temporal.ChronoField.{MILLI_OF_SECOND, NANO_OF_SECOND} |
| import java.time.{ZoneId, ZonedDateTime} |
| |
| import com.gerritforge.analytics.SparkTestSupport |
| import com.gerritforge.analytics.engine.GerritAnalyticsTransformations.{CommitInfo, UserActivitySummary} |
| import com.gerritforge.analytics.engine.events.GerritEventsTransformations.NotParsableJsonEvent |
| import org.apache.spark.rdd.RDD |
| import org.scalatest.{Inside, Matchers, WordSpec} |
| |
| class GerritEventsTransformationsSpec extends WordSpec with Matchers with SparkTestSupport with Inside with EventFixture { |
| |
| "tryParseGerritTriggeredEvent" should { |
| |
| implicit val eventParser: GerritJsonEventParser = EventParser |
| |
| "Parse a correctly formed event" in new EventFixture { |
| GerritEventsTransformations.tryParseGerritTriggeredEvent(refUpdated.json) shouldBe Right(refUpdated.event) |
| } |
| |
| "Return a description of the failure in with the original event source a Left object if the JSON provided is invalid" in { |
| val invalidJson = "invalid json string" |
| GerritEventsTransformations.tryParseGerritTriggeredEvent(invalidJson) shouldBe Left(NotParsableJsonEvent(invalidJson, "unknown token i - Near: i")) |
| } |
| |
| "Return a description of the failure with the original event source in a Left object if the JSON event is not supported" in { |
| val unsupportedEvent = """{"type":"ref-updated-UNSUPPORTED","eventCreatedOn":1516531868}""" |
| |
| GerritEventsTransformations.tryParseGerritTriggeredEvent(unsupportedEvent) shouldBe Left(NotParsableJsonEvent(unsupportedEvent, "Unsupported event type 'ref-updated-UNSUPPORTED'")) |
| } |
| } |
| |
| "PimpedJsonRDD" should { |
| "Convert an RDD of JSON events into an RDD of events or unparsed json strings" in { |
| val jsonRdd = sc.parallelize(Seq(refUpdated.json, changeMerged.json, "invalid json string")) |
| |
| import GerritEventsTransformations._ |
| |
| jsonRdd.parseEvents(EventParser).collect() should contain only( |
| Right(refUpdated.event), |
| Right(changeMerged.event), |
| Left(NotParsableJsonEvent("invalid json string", "unknown token i - Near: i")) |
| ) |
| } |
| } |
| |
| "extractUserActivitySummary" should { |
| implicit val eventParser: GerritJsonEventParser = EventParser |
| |
| "Build one UserActivitySummary object if given a series of non-merge commits" in { |
| val events: Seq[ChangeMergedEvent] = Seq( |
| aChangeMergedEvent("1", 1001l, newRev = "rev1", insertions = 2, deletions = 1), |
| aChangeMergedEvent("2", 1002l, newRev = "rev2", insertions = 3, deletions = 0), |
| aChangeMergedEvent("3", 1003l, newRev = "rev3", insertions = 1, deletions = 4), |
| aChangeMergedEvent("4", 1004l, newRev = "rev4", insertions = 0, deletions = 2), |
| aChangeMergedEvent("5", 1005l, newRev = "rev5", insertions = 1, deletions = 1) |
| ).map(_.event) |
| |
| val summaries: Iterable[UserActivitySummary] = GerritEventsTransformations.extractUserActivitySummary( |
| changes = events, |
| year = 2018, month = 1, day = 10, hour = 1 |
| ) |
| |
| summaries should have size 1 |
| |
| inside(summaries.head) { |
| case UserActivitySummary(year, month, day, hour, name, email, num_commits, _, _, added_lines, deleted_lines, commits, last_commit_date, is_merge) => |
| year shouldBe 2018 |
| month shouldBe 1 |
| day shouldBe 10 |
| hour shouldBe 1 |
| name shouldBe "Administrator" |
| email shouldBe "admin@example.com" |
| num_commits shouldBe events.size |
| last_commit_date shouldBe 1005000l |
| is_merge shouldBe false |
| added_lines shouldBe 7 |
| deleted_lines shouldBe 8 |
| commits should contain only( |
| CommitInfo("rev1", 1001000l, false), |
| CommitInfo("rev2", 1002000l, false), |
| CommitInfo("rev3", 1003000l, false), |
| CommitInfo("rev4", 1004000l, false), |
| CommitInfo("rev5", 1005000l, false) |
| ) |
| } |
| } |
| |
| "Build two UserActivitySummaries object if given a mixed series of merge and non merge commits" in { |
| val events: Seq[ChangeMergedEvent] = Seq( |
| aChangeMergedEvent("1", 1001l, newRev = "rev1", isMergeCommit = true), |
| aChangeMergedEvent("2", 1002l, newRev = "rev2"), |
| aChangeMergedEvent("3", 1003l, newRev = "rev3", isMergeCommit = true), |
| aChangeMergedEvent("4", 1004l, newRev = "rev4"), |
| aChangeMergedEvent("5", 1005l, newRev = "rev5") |
| ).map(_.event) |
| |
| val summaries: Iterable[UserActivitySummary] = GerritEventsTransformations.extractUserActivitySummary( |
| changes = events, |
| year = 2018, month = 1, day = 10, hour = 1 |
| ) |
| |
| summaries should have size 2 |
| |
| summaries.foreach { summary => |
| inside(summary) { |
| case UserActivitySummary(year, month, day, hour, name, email,_, _, _, _, _, _, _, _) => |
| year shouldBe 2018 |
| month shouldBe 1 |
| day shouldBe 10 |
| hour shouldBe 1 |
| name shouldBe "Administrator" |
| email shouldBe "admin@example.com" |
| } |
| } |
| |
| summaries.foreach { summary => |
| inside(summary) { |
| case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, last_commit_date, false) => |
| num_commits shouldBe 3 |
| last_commit_date shouldBe 1005000l |
| commits should contain only( |
| CommitInfo("rev2", 1002000l, false), |
| CommitInfo("rev4", 1004000l, false), |
| CommitInfo("rev5", 1005000l, false) |
| ) |
| |
| case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, last_commit_date, true) => |
| num_commits shouldBe 2 |
| last_commit_date shouldBe 1003000l |
| commits should contain only( |
| CommitInfo("rev1", 1001000l, true), |
| CommitInfo("rev3", 1003000l, true) |
| ) |
| } |
| } |
| |
| } |
| } |
| |
| "Pimped Per Project UserActivitySummary RDD" should { |
| "Allow conversion to a DataFrame equivalent to what extracted from the Analytics plugin" in { |
| import GerritEventsTransformations._ |
| import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._ |
| import spark.implicits._ |
| |
| val aliasDF = sc.parallelize(Seq( |
| ("stefano_alias", "stefano@galarraga-org.com", "") |
| )).toDF("author", "email", "organization") |
| |
| val expectedDate = System.currentTimeMillis |
| |
| val analyticsJobOutput = |
| sc.parallelize(Seq( |
| "project1" -> UserActivitySummary(2018, 1, 20, 10, "Stefano", "stefano@galarraga-org.com", 1, 2, 1, 10, 4, Array(CommitInfo("sha1", expectedDate, false)), |
| expectedDate, false) |
| )) |
| .asEtlDataFrame(sql) |
| .addOrganization() |
| .handleAliases(Some(aliasDF)) |
| .dropCommits |
| |
| val expected = sc.parallelize(Seq( |
| ("project1", "stefano_alias", "stefano@galarraga-org.com", 2018, 1, 20, 10, 2, 1, 10, 4, 1, expectedDate, false, "galarraga-org") |
| )).toDF("project", "author", "email", "year", "month", "day", "hour", "num_files", "num_distinct_files", |
| "added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "organization") |
| |
| analyticsJobOutput.collect() should contain theSameElementsAs expected.collect() |
| } |
| } |
| |
| "removeEventsForCommits" should { |
| "remove any event modifying a gerrit repo to go toward the commits to be excluded" in { |
| import GerritEventsTransformations._ |
| |
| val toKeep1 = aRefUpdatedEvent("oldRev", "RevToKeep") |
| val toKeep2 = aChangeMergedEvent(changeId = "changeId2", newRev = "RevToKeep2") |
| val events: RDD[GerritRefHasNewRevisionEvent] = sc.parallelize(Seq( |
| aRefUpdatedEvent("oldRev", "RevToExclude1"), |
| toKeep1, |
| aRefUpdatedEvent("oldRev", "RevToExclude2"), |
| aChangeMergedEvent(changeId = "changeId1", newRev = "RevToExclude3"), |
| toKeep2 |
| ).map(_.event)) |
| |
| events |
| .removeEventsForCommits(sc.parallelize(Seq("RevToExclude1", "RevToExclude2", "RevToExclude3"))) |
| .collect() should contain only (toKeep1.event, toKeep2.event ) |
| } |
| } |
| } |
| |
| trait EventFixture { |
| |
| // Forcing early type failures |
| case class JsonEvent[T <: GerritJsonEvent](json: String) { |
| val event: T = EventParser.fromJson(json).get.asInstanceOf[T] |
| } |
| |
| val refUpdated: JsonEvent[RefUpdatedEvent] = aRefUpdatedEvent(oldRev = "863b64002f2a9922deba69407804a44703c996e0", newRev = "d3131be8d7c920badd28b70d8c039682568c8de5") |
| |
| val changeMerged: JsonEvent[ChangeMergedEvent] = aChangeMergedEvent("I5e6b5a3bbe8a29fb0393e4a28da536e0a198b755") |
| |
| def aRefUpdatedEvent(oldRev: String, newRev: String, createdOn: Long = 1000l) = JsonEvent[RefUpdatedEvent]( |
| s"""{"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"}, |
| | "refUpdate":{"oldRev":"$oldRev", |
| | "newRev":"$newRev", |
| | "refName": "refs/heads/master","project":"subcut"}, |
| |"type":"ref-updated","eventCreatedOn":$createdOn}""".stripMargin) |
| |
| def aChangeMergedEvent(changeId: String, createdOnInSecs: Long = 1000l, newRev: String = "863b64002f2a9922deba69407804a44703c996e0", |
| isMergeCommit: Boolean = false, insertions: Integer = 0, deletions: Integer = 0) = JsonEvent[ChangeMergedEvent]( |
| s"""{ |
| |"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"}, |
| |"newRev":"$newRev", |
| |"patchSet":{ |
| | "number":1, |
| | "revision":"$newRev", |
| | "parents": ${if (isMergeCommit) """["4a4e59272f1f88824d805c0f4233c1ee7331e986", "4a4e59272f1f88824d805c0f4233c1ee7331e987"]""" else """["4a4e59272f1f88824d805c0f4233c1ee7331e986"]"""}, |
| | "ref":"refs/changes/01/1/1", |
| | "uploader":{"name":"Administrator","email":"admin@example.com","username":"admin"}, |
| | "createdOn":1516530259, |
| | "author":{"name":"Stefano Galarraga","email":"galarragas@gmail.com","username":""}, |
| | "isDraft":false, |
| | "kind":"REWORK", |
| | "sizeInsertions":$insertions, |
| | "sizeDeletions":$deletions |
| |}, |
| |"change":{ |
| | "project":"subcut","branch":"master","topic":"TestEvents","id":"$changeId","number":1,"subject":"Generating some changes to test events", |
| | "owner":{"name":"Administrator","email":"admin@example.com","username":"admin"}, |
| | "url":"http://842860da5b33:8080/1","commitMessage":"Generating some changes to test events Change-Id: $changeId", |
| | "createdOn":1516530259,"status":"MERGED" |
| |}, |
| |"project":{"name":"subcut"}, |
| |"refName":"refs/heads/master", |
| |"changeKey":{"id":"$changeId"}, |
| |"type":"change-merged", |
| |"eventCreatedOn": $createdOnInSecs |
| |}""".stripMargin) |
| |
| } |