| // Copyright (C) 2018 GerritForge Ltd |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.gerritforge.analytics.gitcommits.engine.events |
| import com.gerritforge.analytics.SparkTestSupport |
| import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations.{CommitInfo, UserActivitySummary} |
| import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations.NotParsableJsonEvent |
| import org.apache.spark.rdd.RDD |
| import org.scalatest.{Inside, Matchers, WordSpec} |
| |
| class GerritEventsTransformationsSpec |
| extends WordSpec |
| with Matchers |
| with SparkTestSupport |
| with Inside |
| with EventFixture { |
| |
| "tryParseGerritTriggeredEvent" should { |
| |
| implicit val eventParser: GerritJsonEventParser = EventParser |
| |
| "Parse a correctly formed event" in new EventFixture { |
| GerritEventsTransformations.tryParseGerritTriggeredEvent(refUpdated.json) shouldBe Right( |
| refUpdated.event) |
| } |
| |
| "Return a description of the failure in with the original event source a Left object if the JSON provided is invalid" in { |
| val invalidJson = "invalid json string" |
| GerritEventsTransformations.tryParseGerritTriggeredEvent(invalidJson) shouldBe Left( |
| NotParsableJsonEvent(invalidJson, "unknown token i - Near: i")) |
| } |
| |
| "Return a description of the failure with the original event source in a Left object if the JSON event is not supported" in { |
| val unsupportedEvent = """{"type":"ref-updated-UNSUPPORTED","eventCreatedOn":1516531868}""" |
| |
| GerritEventsTransformations.tryParseGerritTriggeredEvent(unsupportedEvent) shouldBe Left( |
| NotParsableJsonEvent(unsupportedEvent, "Unsupported event type 'ref-updated-UNSUPPORTED'")) |
| } |
| } |
| |
| "PimpedJsonRDD" should { |
| "Convert an RDD of JSON events into an RDD of events or unparsed json strings" in { |
| val jsonRdd = sc.parallelize(Seq(refUpdated.json, changeMerged.json, "invalid json string")) |
| |
| import GerritEventsTransformations._ |
| |
| jsonRdd.parseEvents(EventParser).collect() should contain only ( |
| Right(refUpdated.event), |
| Right(changeMerged.event), |
| Left(NotParsableJsonEvent("invalid json string", "unknown token i - Near: i")) |
| ) |
| } |
| } |
| |
| "extractUserActivitySummary" should { |
| implicit val eventParser: GerritJsonEventParser = EventParser |
| |
| "Build one UserActivitySummary object if given a series of non-merge commits" in { |
| val events: Seq[ChangeMergedEvent] = Seq( |
| aChangeMergedEvent("1", |
| 1001l, |
| newRev = "rev1", |
| insertions = 2, |
| deletions = 1, |
| branch = "stable-1.14"), |
| aChangeMergedEvent("2", |
| 1002l, |
| newRev = "rev2", |
| insertions = 3, |
| deletions = 0, |
| branch = "stable-1.14"), |
| aChangeMergedEvent("3", |
| 1003l, |
| newRev = "rev3", |
| insertions = 1, |
| deletions = 4, |
| branch = "stable-1.14"), |
| aChangeMergedEvent("4", |
| 1004l, |
| newRev = "rev4", |
| insertions = 0, |
| deletions = 2, |
| branch = "stable-1.14"), |
| aChangeMergedEvent("5", |
| 1005l, |
| newRev = "rev5", |
| insertions = 1, |
| deletions = 1, |
| branch = "stable-1.14") |
| ).map(_.event) |
| |
| val summaries: Iterable[UserActivitySummary] = |
| GerritEventsTransformations.extractUserActivitySummary( |
| changes = events, |
| year = 2018, |
| month = 1, |
| day = 10, |
| hour = 1 |
| ) |
| |
| summaries should have size 1 |
| |
| inside(summaries.head) { |
| case UserActivitySummary(year, |
| month, |
| day, |
| hour, |
| name, |
| email, |
| num_commits, |
| _, |
| _, |
| added_lines, |
| deleted_lines, |
| commits, |
| branches, |
| last_commit_date, |
| is_merge) => |
| year shouldBe 2018 |
| month shouldBe 1 |
| day shouldBe 10 |
| hour shouldBe 1 |
| name shouldBe "Administrator" |
| email shouldBe "admin@example.com" |
| num_commits shouldBe events.size |
| last_commit_date shouldBe 1005000l |
| is_merge shouldBe false |
| added_lines shouldBe 7 |
| deleted_lines shouldBe 8 |
| commits should contain only ( |
| CommitInfo("rev1", 1001000l, false), |
| CommitInfo("rev2", 1002000l, false), |
| CommitInfo("rev3", 1003000l, false), |
| CommitInfo("rev4", 1004000l, false), |
| CommitInfo("rev5", 1005000l, false) |
| ) |
| branches should contain only "stable-1.14" |
| } |
| } |
| |
| "Build two UserActivitySummaries object if given a mixed series of merge and non merge commits" in { |
| val events: Seq[ChangeMergedEvent] = Seq( |
| aChangeMergedEvent("1", 1001l, newRev = "rev1", isMergeCommit = true), |
| aChangeMergedEvent("2", 1002l, newRev = "rev2"), |
| aChangeMergedEvent("3", 1003l, newRev = "rev3", isMergeCommit = true), |
| aChangeMergedEvent("4", 1004l, newRev = "rev4"), |
| aChangeMergedEvent("5", 1005l, newRev = "rev5") |
| ).map(_.event) |
| |
| val summaries: Iterable[UserActivitySummary] = |
| GerritEventsTransformations.extractUserActivitySummary( |
| changes = events, |
| year = 2018, |
| month = 1, |
| day = 10, |
| hour = 1 |
| ) |
| |
| summaries should have size 2 |
| |
| summaries.foreach { summary => |
| inside(summary) { |
| case UserActivitySummary(year, |
| month, |
| day, |
| hour, |
| name, |
| email, |
| _, |
| _, |
| _, |
| _, |
| _, |
| _, |
| _, |
| _, |
| _) => |
| year shouldBe 2018 |
| month shouldBe 1 |
| day shouldBe 10 |
| hour shouldBe 1 |
| name shouldBe "Administrator" |
| email shouldBe "admin@example.com" |
| } |
| } |
| |
| summaries.foreach { summary => |
| inside(summary) { |
| case UserActivitySummary(_, |
| _, |
| _, |
| _, |
| _, |
| _, |
| num_commits, |
| _, |
| _, |
| _, |
| _, |
| commits, |
| _, |
| last_commit_date, |
| false) => |
| num_commits shouldBe 3 |
| last_commit_date shouldBe 1005000l |
| commits should contain only ( |
| CommitInfo("rev2", 1002000l, false), |
| CommitInfo("rev4", 1004000l, false), |
| CommitInfo("rev5", 1005000l, false) |
| ) |
| |
| case UserActivitySummary(_, |
| _, |
| _, |
| _, |
| _, |
| _, |
| num_commits, |
| _, |
| _, |
| _, |
| _, |
| commits, |
| _, |
| last_commit_date, |
| true) => |
| num_commits shouldBe 2 |
| last_commit_date shouldBe 1003000l |
| commits should contain only ( |
| CommitInfo("rev1", 1001000l, true), |
| CommitInfo("rev3", 1003000l, true) |
| ) |
| } |
| } |
| |
| } |
| } |
| |
| "Pimped Per Project UserActivitySummary RDD" should { |
| "Allow conversion to a DataFrame equivalent to what extracted from the Analytics plugin" in { |
| import GerritEventsTransformations._ |
| import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations._ |
| import spark.implicits._ |
| |
| val aliasDF = sc |
| .parallelize( |
| Seq( |
| ("stefano_alias", "stefano@galarraga-org.com", "") |
| )) |
| .toDF("author", "email", "organization") |
| |
| val expectedDate = System.currentTimeMillis |
| |
| val analyticsJobOutput = |
| sc.parallelize( |
| Seq( |
| "project1" -> UserActivitySummary( |
| 2018, |
| 1, |
| 20, |
| 10, |
| "Stefano", |
| "stefano@galarraga-org.com", |
| 1, |
| 2, |
| 1, |
| 10, |
| 4, |
| Array(CommitInfo("sha1", expectedDate, false)), |
| Array("master", "stable-2.14"), |
| expectedDate, |
| false |
| ) |
| )) |
| .asEtlDataFrame(sql) |
| .addOrganization() |
| .handleAliases(Some(aliasDF)) |
| .dropCommits |
| |
| val expected = sc |
| .parallelize( |
| Seq( |
| ("project1", |
| "stefano_alias", |
| "stefano@galarraga-org.com", |
| 2018, |
| 1, |
| 20, |
| 10, |
| 2, |
| 1, |
| 10, |
| 4, |
| 1, |
| expectedDate, |
| false, |
| Array("master", "stable-2.14"), |
| "galarraga-org") |
| )) |
| .toDF( |
| "project", |
| "author", |
| "email", |
| "year", |
| "month", |
| "day", |
| "hour", |
| "num_files", |
| "num_distinct_files", |
| "added_lines", |
| "deleted_lines", |
| "num_commits", |
| "last_commit_date", |
| "is_merge", |
| "branches", |
| "organization" |
| ) |
| |
| val collected = analyticsJobOutput.collect() |
| |
| collected should contain theSameElementsAs expected.collect() |
| } |
| } |
| |
| "removeEventsForCommits" should { |
| "remove any event modifying a gerrit repo to go toward the commits to be excluded" in { |
| import GerritEventsTransformations._ |
| |
| val toKeep1 = aRefUpdatedEvent("oldRev", "RevToKeep") |
| val toKeep2 = aChangeMergedEvent(changeId = "changeId2", newRev = "RevToKeep2") |
| val events: RDD[GerritRefHasNewRevisionEvent] = sc.parallelize( |
| Seq( |
| aRefUpdatedEvent("oldRev", "RevToExclude1"), |
| toKeep1, |
| aRefUpdatedEvent("oldRev", "RevToExclude2"), |
| aChangeMergedEvent(changeId = "changeId1", newRev = "RevToExclude3"), |
| toKeep2 |
| ).map(_.event)) |
| |
| events |
| .removeEventsForCommits( |
| sc.parallelize(Seq("RevToExclude1", "RevToExclude2", "RevToExclude3"))) |
| .collect() should contain only (toKeep1.event, toKeep2.event) |
| } |
| } |
| } |
| |
| trait EventFixture { |
| |
| // Forcing early type failures |
| case class JsonEvent[T <: GerritJsonEvent](json: String) { |
| val event: T = EventParser.fromJson(json).get.asInstanceOf[T] |
| } |
| |
| val refUpdated: JsonEvent[RefUpdatedEvent] = aRefUpdatedEvent( |
| oldRev = "863b64002f2a9922deba69407804a44703c996e0", |
| newRev = "d3131be8d7c920badd28b70d8c039682568c8de5") |
| |
| val changeMerged: JsonEvent[ChangeMergedEvent] = aChangeMergedEvent( |
| "I5e6b5a3bbe8a29fb0393e4a28da536e0a198b755") |
| |
| def aRefUpdatedEvent(oldRev: String, newRev: String, createdOn: Long = 1000l) = |
| JsonEvent[RefUpdatedEvent]( |
| s"""{"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"}, |
| | "refUpdate":{"oldRev":"$oldRev", |
| | "newRev":"$newRev", |
| | "refName": "refs/heads/master","project":"subcut"}, |
| |"type":"ref-updated","eventCreatedOn":$createdOn}""".stripMargin) |
| |
| def aChangeMergedEvent(changeId: String, |
| createdOnInSecs: Long = 1000l, |
| newRev: String = "863b64002f2a9922deba69407804a44703c996e0", |
| isMergeCommit: Boolean = false, |
| insertions: Integer = 0, |
| deletions: Integer = 0, |
| branch: String = "master") = |
| JsonEvent[ChangeMergedEvent]( |
| s"""{ |
| |"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"}, |
| |"newRev":"$newRev", |
| |"patchSet":{ |
| | "number":1, |
| | "revision":"$newRev", |
| | "parents": ${if (isMergeCommit) |
| """["4a4e59272f1f88824d805c0f4233c1ee7331e986", "4a4e59272f1f88824d805c0f4233c1ee7331e987"]""" |
| else """["4a4e59272f1f88824d805c0f4233c1ee7331e986"]"""}, |
| | "ref":"refs/changes/01/1/1", |
| | "uploader":{"name":"Administrator","email":"admin@example.com","username":"admin"}, |
| | "createdOn":1516530259, |
| | "author":{"name":"Stefano Galarraga","email":"galarragas@gmail.com","username":""}, |
| | "isDraft":false, |
| | "kind":"REWORK", |
| | "sizeInsertions":$insertions, |
| | "sizeDeletions":$deletions |
| |}, |
| |"change":{ |
| | "project":"subcut","branch":"$branch","topic":"TestEvents","id":"$changeId","number":1,"subject":"Generating some changes to test events", |
| | "owner":{"name":"Administrator","email":"admin@example.com","username":"admin"}, |
| | "url":"http://842860da5b33:8080/1","commitMessage":"Generating some changes to test events Change-Id: $changeId", |
| | "createdOn":1516530259,"status":"MERGED" |
| |}, |
| |"project":{"name":"subcut"}, |
| |"refName":"refs/heads/$branch", |
| |"changeKey":{"id":"$changeId"}, |
| |"type":"change-merged", |
| |"eventCreatedOn": $createdOnInSecs |
| |}""".stripMargin) |
| |
| } |