Allow ETL job to extract branches from analytics
When provided `--extract-branches true` argument the ETL
job retrieves branch information too and stores them in
elasticsearch.
Feature: Issue 9864
Change-Id: I342290054262eb5e19cfb1c2e5432fe8b6b91df4
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index ff75fb8..a0733bc 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -83,6 +83,7 @@
added_lines: Integer,
deleted_lines: Integer,
commits: Array[CommitInfo],
+ branches: Array[String],
last_commit_date: Long,
is_merge: Boolean)
@@ -114,7 +115,7 @@
"json.num_files as num_files", "json.num_distinct_files as num_distinct_files",
"json.added_lines as added_lines", "json.deleted_lines as deleted_lines",
"json.num_commits as num_commits", "json.last_commit_date as last_commit_date",
- "json.is_merge as is_merge", "json.commits as commits"
+ "json.is_merge as is_merge", "json.commits as commits", "json.branches as branches"
)
}
diff --git a/src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala
index 4b03b72..121b98c 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala
@@ -105,6 +105,7 @@
commits = changes.map(changeMerged =>
CommitInfo(changeMerged.newRev, changeMerged.eventCreatedOn * 1000, changeMerged.patchSet.parents.size > 1)
).toArray,
+ branches = changes.map(changeMerged => changeMerged.change.branch).toArray,
last_commit_date = changes.map(_.eventCreatedOn).max * 1000,
added_lines = changes.map(_.patchSet.sizeInsertions).sum,
deleted_lines = changes.map(_.patchSet.sizeDeletions).sum,
@@ -142,6 +143,7 @@
added_lines = 0,
deleted_lines = 0,
commits = Array.empty,
+ branches = Array.empty,
last_commit_date = 0l,
is_merge = false)
@@ -170,9 +172,9 @@
self.map { case (project, summary) =>
(project, summary.name, summary.email, summary.year, summary.month, summary.day, summary.hour, summary.num_files, summary.num_distinct_files,
- summary.added_lines, summary.deleted_lines, Option(summary.num_commits), Option(summary.last_commit_date), Option(summary.is_merge), summary.commits)
+ summary.added_lines, summary.deleted_lines, Option(summary.num_commits), Option(summary.last_commit_date), Option(summary.is_merge), summary.commits, summary.branches)
}.toDF("project", "author", "email", "year", "month", "day", "hour", "num_files", "num_distinct_files",
- "added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "commits")
+ "added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "commits", "branches")
}
def getContributorStatsFromGerritEvents(events: RDD[GerritRefHasNewRevisionEvent],
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index 9549155..099efac 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -95,6 +95,10 @@
c.copy(password = Some(input))
} text "Gerrit API Password"
+ opt[Boolean]('r', "extract-branches") optional() action { (input, c) =>
+ c.copy(extractBranches = Some(input))
+ } text "enables branches extraction for each commit"
+
}
cliOptionParser.parse(args, GerritEndpointConfig()) match {
@@ -165,7 +169,7 @@
val configWithOverriddenUntil = firstEventDateMaybe.fold(config) { firstEventDate =>
val lastAggregationDate = firstEventDate.plusMonths(1)
if (lastAggregationDate.isBefore(LocalDate.now())) {
- logger.info(s"Overriding 'until' date '${config.until}' with '$lastAggregationDate' since events ara available until $firstEventDate")
+ logger.info(s"Overriding 'until' date '${config.until}' with '$lastAggregationDate' since events are available until $firstEventDate")
config.copy(until = Some(lastAggregationDate))
} else {
config
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
index 596e0a5..d76743d 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
@@ -31,7 +31,8 @@
eventsPath: Option[String] = None,
eventsFailureOutputPath: Option[String] = None,
username: Option[String] = None,
- password: Option[String] = None
+ password: Option[String] = None,
+ extractBranches: Option[Boolean] = None
) {
@@ -47,8 +48,12 @@
@transient
private lazy val format: DateTimeFormatter = AnalyticsDateTimeFormater.yyyy_MM_dd.withZone(ZoneOffset.UTC)
- val queryString = Seq("since" -> since.map(format.format), "until" -> until.map(format.format), "aggregate" -> aggregate)
- .flatMap(queryOpt).mkString("?", "&", "")
+ val queryString = Seq(
+ "since" -> since.map(format.format),
+ "until" -> until.map(format.format),
+ "aggregate" -> aggregate,
+ "extract-branches" -> extractBranches.map(_.toString)
+ ).flatMap(queryOpt).mkString("?", "&", "")
def contributorsUrl(projectName: String): Option[String] =
baseUrl.map { url => s"$url/projects/$projectName/analytics~contributors$queryString" }
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala b/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala
index b2d9da5..9832df5 100644
--- a/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala
+++ b/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala
@@ -40,6 +40,9 @@
@ArgOption(name = "--email-aliases", aliases = Array("-a"), usage = "\"emails to author alias\" input data path")
var emailAlias: String = null
+ @ArgOption(name = "--extract-branches", aliases = Array("-r"), usage = "enables branches extraction for each commit")
+ var extractBranches: Boolean = false
+
override def run() {
implicit val config = GerritEndpointConfig(gerritConfig.getListenUrl(), prefix = Option(projectControl).map(_.getProject.getName), "", elasticIndex,
beginDate, endDate, aggregate, emailAlias)
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 8678f6b..ec9d3e5 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -118,32 +118,32 @@
import sql.implicits._
val rdd = sc.parallelize(Seq(
- ("p1","""{"name":"a","email":"a@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":1, "num_files": 2, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":0, "is_merge": false, "commits":[{ "sha1": "e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":false, "files": ["file1.txt", "file2.txt"]}] }"""),
- ("p2","""{"name":"b","email":"b@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":428, "num_files": 2, "num_distinct_files": 3, "added_lines":1, "deleted_lines":1, "last_commit_date":1500000000000, "is_merge": true, "commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true, "files": ["file3.txt", "file4.txt"] },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1500000000000,"merge":true, "files": ["file1.txt", "file4.txt"]}]}"""),
+ ("p1","""{"name":"a","email":"a@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":1, "num_files": 2, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":0, "is_merge": false, "commits":[{ "sha1": "e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":false, "files": ["file1.txt", "file2.txt"]}], "branches": ["master", "stable-2.14"] }"""),
+ ("p2","""{"name":"b","email":"b@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":428, "num_files": 2, "num_distinct_files": 3, "added_lines":1, "deleted_lines":1, "last_commit_date":1500000000000, "is_merge": true, "commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true, "files": ["file3.txt", "file4.txt"] },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1500000000000,"merge":true, "files": ["file1.txt", "file4.txt"]}]}, "branches":[]"""),
// last commit is missing hour,day,month,year to check optionality
- ("p3","""{"name":"c","email":"c@mail.com","num_commits":12,"num_files": 4, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":1600000000000,"is_merge": true,"commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true, "files": ["file1.txt", "file2.txt"] },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1600000000000,"merge":true, "files": ["file1.txt", "file2.txt"]}]}""")
+ ("p3","""{"name":"c","email":"c@mail.com","num_commits":12,"num_files": 4, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":1600000000000,"is_merge": true,"commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true, "files": ["file1.txt", "file2.txt"] },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1600000000000,"merge":true, "files": ["file1.txt", "file2.txt"]}]}, "branches":[]""")
))
val df = rdd.toDF("project", "json")
.transformCommitterInfo
df.count should be(3)
- val collected = df.collect
+ val collected: Array[Row] = df.collect
df.schema.fields.map(_.name) should contain inOrder(
"project", "author", "email",
"year", "month", "day", "hour",
"num_files", "num_distinct_files", "added_lines", "deleted_lines",
"num_commits", "last_commit_date",
- "is_merge", "commits")
+ "is_merge", "commits", "branches")
collected should contain allOf(
Row("p1", "a", "a@mail.com", 2017, 9, 11, 23, 2, 2, 1, 1, 1, 0, false,
- new mutable.WrappedArray.ofRef(Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, false)))),
+ new mutable.WrappedArray.ofRef(Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, false))), new mutable.WrappedArray.ofRef(Array("master", "stable-2.14"))),
Row("p2", "b", "b@mail.com", 2017, 9, 11, 23, 2, 3, 1, 1, 428, 1500000000000L, true,
- new mutable.WrappedArray.ofRef[Row](Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, true), Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 1500000000000L, true)))),
+ new mutable.WrappedArray.ofRef[Row](Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, true), Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 1500000000000L, true))), null),
Row("p3", "c", "c@mail.com", null, null, null, null, 4, 2, 1, 1, 12, 1600000000000L, true,
- new mutable.WrappedArray.ofRef[Row](Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, true), Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 1600000000000L, true))))
+ new mutable.WrappedArray.ofRef[Row](Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, true), Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 1600000000000L, true))), null)
)
}
diff --git a/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala
index 05b98dd..434662f 100644
--- a/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala
@@ -14,10 +14,6 @@
package com.gerritforge.analytics.engine.events
-import java.time.format.DateTimeFormatter
-import java.time.temporal.ChronoField.{MILLI_OF_SECOND, NANO_OF_SECOND}
-import java.time.{ZoneId, ZonedDateTime}
-
import com.gerritforge.analytics.SparkTestSupport
import com.gerritforge.analytics.engine.GerritAnalyticsTransformations.{CommitInfo, UserActivitySummary}
import com.gerritforge.analytics.engine.events.GerritEventsTransformations.NotParsableJsonEvent
@@ -65,11 +61,11 @@
"Build one UserActivitySummary object if given a series of non-merge commits" in {
val events: Seq[ChangeMergedEvent] = Seq(
- aChangeMergedEvent("1", 1001l, newRev = "rev1", insertions = 2, deletions = 1),
- aChangeMergedEvent("2", 1002l, newRev = "rev2", insertions = 3, deletions = 0),
- aChangeMergedEvent("3", 1003l, newRev = "rev3", insertions = 1, deletions = 4),
- aChangeMergedEvent("4", 1004l, newRev = "rev4", insertions = 0, deletions = 2),
- aChangeMergedEvent("5", 1005l, newRev = "rev5", insertions = 1, deletions = 1)
+ aChangeMergedEvent("1", 1001l, newRev = "rev1", insertions = 2, deletions = 1, branch = "stable-1.14"),
+ aChangeMergedEvent("2", 1002l, newRev = "rev2", insertions = 3, deletions = 0, branch = "stable-1.14"),
+ aChangeMergedEvent("3", 1003l, newRev = "rev3", insertions = 1, deletions = 4, branch = "stable-1.14"),
+ aChangeMergedEvent("4", 1004l, newRev = "rev4", insertions = 0, deletions = 2, branch = "stable-1.14"),
+ aChangeMergedEvent("5", 1005l, newRev = "rev5", insertions = 1, deletions = 1, branch = "stable-1.14")
).map(_.event)
val summaries: Iterable[UserActivitySummary] = GerritEventsTransformations.extractUserActivitySummary(
@@ -80,7 +76,7 @@
summaries should have size 1
inside(summaries.head) {
- case UserActivitySummary(year, month, day, hour, name, email, num_commits, _, _, added_lines, deleted_lines, commits, last_commit_date, is_merge) =>
+ case UserActivitySummary(year, month, day, hour, name, email, num_commits, _, _, added_lines, deleted_lines, commits, branches, last_commit_date, is_merge) =>
year shouldBe 2018
month shouldBe 1
day shouldBe 10
@@ -99,6 +95,7 @@
CommitInfo("rev4", 1004000l, false),
CommitInfo("rev5", 1005000l, false)
)
+ branches should contain only "stable-1.14"
}
}
@@ -120,7 +117,7 @@
summaries.foreach { summary =>
inside(summary) {
- case UserActivitySummary(year, month, day, hour, name, email,_, _, _, _, _, _, _, _) =>
+ case UserActivitySummary(year, month, day, hour, name, email,_, _, _, _, _, _, _, _, _) =>
year shouldBe 2018
month shouldBe 1
day shouldBe 10
@@ -132,7 +129,7 @@
summaries.foreach { summary =>
inside(summary) {
- case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, last_commit_date, false) =>
+ case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, _, last_commit_date, false) =>
num_commits shouldBe 3
last_commit_date shouldBe 1005000l
commits should contain only(
@@ -141,7 +138,7 @@
CommitInfo("rev5", 1005000l, false)
)
- case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, last_commit_date, true) =>
+ case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, _, last_commit_date, true) =>
num_commits shouldBe 2
last_commit_date shouldBe 1003000l
commits should contain only(
@@ -169,7 +166,7 @@
val analyticsJobOutput =
sc.parallelize(Seq(
"project1" -> UserActivitySummary(2018, 1, 20, 10, "Stefano", "stefano@galarraga-org.com", 1, 2, 1, 10, 4, Array(CommitInfo("sha1", expectedDate, false)),
- expectedDate, false)
+ Array("master", "stable-2.14"), expectedDate, false)
))
.asEtlDataFrame(sql)
.addOrganization()
@@ -177,11 +174,13 @@
.dropCommits
val expected = sc.parallelize(Seq(
- ("project1", "stefano_alias", "stefano@galarraga-org.com", 2018, 1, 20, 10, 2, 1, 10, 4, 1, expectedDate, false, "galarraga-org")
+ ("project1", "stefano_alias", "stefano@galarraga-org.com", 2018, 1, 20, 10, 2, 1, 10, 4, 1, expectedDate, false, Array("master", "stable-2.14"), "galarraga-org")
)).toDF("project", "author", "email", "year", "month", "day", "hour", "num_files", "num_distinct_files",
- "added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "organization")
+ "added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "branches", "organization")
- analyticsJobOutput.collect() should contain theSameElementsAs expected.collect()
+ val collected = analyticsJobOutput.collect()
+
+ collected should contain theSameElementsAs expected.collect()
}
}
@@ -225,7 +224,7 @@
|"type":"ref-updated","eventCreatedOn":$createdOn}""".stripMargin)
def aChangeMergedEvent(changeId: String, createdOnInSecs: Long = 1000l, newRev: String = "863b64002f2a9922deba69407804a44703c996e0",
- isMergeCommit: Boolean = false, insertions: Integer = 0, deletions: Integer = 0) = JsonEvent[ChangeMergedEvent](
+ isMergeCommit: Boolean = false, insertions: Integer = 0, deletions: Integer = 0, branch: String = "master") = JsonEvent[ChangeMergedEvent](
s"""{
|"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"},
|"newRev":"$newRev",
@@ -243,13 +242,13 @@
| "sizeDeletions":$deletions
|},
|"change":{
- | "project":"subcut","branch":"master","topic":"TestEvents","id":"$changeId","number":1,"subject":"Generating some changes to test events",
+ | "project":"subcut","branch":"$branch","topic":"TestEvents","id":"$changeId","number":1,"subject":"Generating some changes to test events",
| "owner":{"name":"Administrator","email":"admin@example.com","username":"admin"},
| "url":"http://842860da5b33:8080/1","commitMessage":"Generating some changes to test events Change-Id: $changeId",
| "createdOn":1516530259,"status":"MERGED"
|},
|"project":{"name":"subcut"},
- |"refName":"refs/heads/master",
+ |"refName":"refs/heads/$branch",
|"changeKey":{"id":"$changeId"},
|"type":"change-merged",
|"eventCreatedOn": $createdOnInSecs