Allow ETL job to extract branches from analytics

When provided `--extract-branches true` argument the ETL
job retrieves branch information too and stores them in
elasticsearch.

Feature: Issue 9864
Change-Id: I342290054262eb5e19cfb1c2e5432fe8b6b91df4
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index ff75fb8..a0733bc 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -83,6 +83,7 @@
                                  added_lines: Integer,
                                  deleted_lines: Integer,
                                  commits: Array[CommitInfo],
+                                 branches: Array[String],
                                  last_commit_date: Long,
                                  is_merge: Boolean)
 
@@ -114,7 +115,7 @@
           "json.num_files as num_files", "json.num_distinct_files as num_distinct_files",
           "json.added_lines as added_lines", "json.deleted_lines as deleted_lines",
           "json.num_commits as num_commits", "json.last_commit_date as last_commit_date",
-          "json.is_merge as is_merge", "json.commits as commits"
+          "json.is_merge as is_merge", "json.commits as commits", "json.branches as branches"
         )
     }
 
diff --git a/src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala
index 4b03b72..121b98c 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala
@@ -105,6 +105,7 @@
       commits = changes.map(changeMerged =>
         CommitInfo(changeMerged.newRev, changeMerged.eventCreatedOn * 1000, changeMerged.patchSet.parents.size > 1)
       ).toArray,
+      branches = changes.map(changeMerged => changeMerged.change.branch).toArray,
       last_commit_date = changes.map(_.eventCreatedOn).max * 1000,
       added_lines = changes.map(_.patchSet.sizeInsertions).sum,
       deleted_lines = changes.map(_.patchSet.sizeDeletions).sum,
@@ -142,6 +143,7 @@
         added_lines = 0,
         deleted_lines = 0,
         commits = Array.empty,
+        branches = Array.empty,
         last_commit_date = 0l,
         is_merge = false)
 
@@ -170,9 +172,9 @@
 
     self.map { case (project, summary) =>
       (project, summary.name, summary.email, summary.year, summary.month, summary.day, summary.hour, summary.num_files, summary.num_distinct_files,
-        summary.added_lines, summary.deleted_lines, Option(summary.num_commits), Option(summary.last_commit_date), Option(summary.is_merge), summary.commits)
+        summary.added_lines, summary.deleted_lines, Option(summary.num_commits), Option(summary.last_commit_date), Option(summary.is_merge), summary.commits, summary.branches)
     }.toDF("project", "author", "email", "year", "month", "day", "hour", "num_files", "num_distinct_files",
-      "added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "commits")
+      "added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "commits", "branches")
   }
 
   def getContributorStatsFromGerritEvents(events: RDD[GerritRefHasNewRevisionEvent],
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index 9549155..099efac 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -95,6 +95,10 @@
       c.copy(password = Some(input))
     } text "Gerrit API Password"
 
+    opt[Boolean]('r', "extract-branches") optional() action { (input, c) =>
+      c.copy(extractBranches = Some(input))
+    } text "enables branches extraction for each commit"
+
   }
 
   cliOptionParser.parse(args, GerritEndpointConfig()) match {
@@ -165,7 +169,7 @@
     val configWithOverriddenUntil = firstEventDateMaybe.fold(config) { firstEventDate =>
       val lastAggregationDate = firstEventDate.plusMonths(1)
       if (lastAggregationDate.isBefore(LocalDate.now())) {
-        logger.info(s"Overriding 'until' date '${config.until}' with '$lastAggregationDate' since events ara available until $firstEventDate")
+        logger.info(s"Overriding 'until' date '${config.until}' with '$lastAggregationDate' since events are available until $firstEventDate")
         config.copy(until = Some(lastAggregationDate))
       } else {
         config
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
index 596e0a5..d76743d 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
@@ -31,7 +31,8 @@
                                 eventsPath: Option[String] = None,
                                 eventsFailureOutputPath: Option[String] = None,
                                 username: Option[String] = None,
-                                password: Option[String] = None
+                                password: Option[String] = None,
+                                extractBranches: Option[Boolean] = None
                                ) {
 
 
@@ -47,8 +48,12 @@
 
   @transient
   private lazy val format: DateTimeFormatter = AnalyticsDateTimeFormater.yyyy_MM_dd.withZone(ZoneOffset.UTC)
-  val queryString = Seq("since" -> since.map(format.format), "until" -> until.map(format.format), "aggregate" -> aggregate)
-    .flatMap(queryOpt).mkString("?", "&", "")
+  val queryString = Seq(
+    "since" -> since.map(format.format),
+    "until" -> until.map(format.format),
+    "aggregate" -> aggregate,
+    "extract-branches" -> extractBranches.map(_.toString)
+  ).flatMap(queryOpt).mkString("?", "&", "")
 
   def contributorsUrl(projectName: String): Option[String] =
     baseUrl.map { url => s"$url/projects/$projectName/analytics~contributors$queryString" }
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala b/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala
index b2d9da5..9832df5 100644
--- a/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala
+++ b/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala
@@ -40,6 +40,9 @@
   @ArgOption(name = "--email-aliases", aliases = Array("-a"), usage = "\"emails to author alias\" input data path")
   var emailAlias: String = null
 
+  @ArgOption(name = "--extract-branches", aliases = Array("-r"), usage = "enables branches extraction for each commit")
+  var extractBranches: Boolean = false
+
   override def run() {
     implicit val config = GerritEndpointConfig(gerritConfig.getListenUrl(), prefix = Option(projectControl).map(_.getProject.getName), "", elasticIndex,
       beginDate, endDate, aggregate, emailAlias)
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 8678f6b..ec9d3e5 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -118,32 +118,32 @@
     import sql.implicits._
 
     val rdd = sc.parallelize(Seq(
-      ("p1","""{"name":"a","email":"a@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":1, "num_files": 2, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":0, "is_merge": false, "commits":[{ "sha1": "e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":false, "files": ["file1.txt", "file2.txt"]}] }"""),
-      ("p2","""{"name":"b","email":"b@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":428, "num_files": 2, "num_distinct_files": 3, "added_lines":1, "deleted_lines":1, "last_commit_date":1500000000000, "is_merge": true, "commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true, "files": ["file3.txt", "file4.txt"] },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1500000000000,"merge":true, "files": ["file1.txt", "file4.txt"]}]}"""),
+      ("p1","""{"name":"a","email":"a@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":1, "num_files": 2, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":0, "is_merge": false, "commits":[{ "sha1": "e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":false, "files": ["file1.txt", "file2.txt"]}], "branches": ["master", "stable-2.14"] }"""),
+      ("p2","""{"name":"b","email":"b@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":428, "num_files": 2, "num_distinct_files": 3, "added_lines":1, "deleted_lines":1, "last_commit_date":1500000000000, "is_merge": true, "commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true, "files": ["file3.txt", "file4.txt"] },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1500000000000,"merge":true, "files": ["file1.txt", "file4.txt"]}]}, "branches":[]"""),
       // last commit is missing hour,day,month,year to check optionality
-      ("p3","""{"name":"c","email":"c@mail.com","num_commits":12,"num_files": 4, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":1600000000000,"is_merge": true,"commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true, "files": ["file1.txt", "file2.txt"] },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1600000000000,"merge":true, "files": ["file1.txt", "file2.txt"]}]}""")
+      ("p3","""{"name":"c","email":"c@mail.com","num_commits":12,"num_files": 4, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":1600000000000,"is_merge": true,"commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true, "files": ["file1.txt", "file2.txt"] },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1600000000000,"merge":true, "files": ["file1.txt", "file2.txt"]}]}, "branches":[]""")
     ))
 
     val df = rdd.toDF("project", "json")
       .transformCommitterInfo
 
     df.count should be(3)
-    val collected = df.collect
+    val collected: Array[Row] = df.collect
 
     df.schema.fields.map(_.name) should contain inOrder(
       "project", "author", "email",
       "year", "month", "day", "hour",
       "num_files", "num_distinct_files", "added_lines", "deleted_lines",
       "num_commits", "last_commit_date",
-      "is_merge", "commits")
+      "is_merge", "commits", "branches")
 
     collected should contain allOf(
       Row("p1", "a", "a@mail.com", 2017, 9, 11, 23, 2, 2, 1, 1, 1, 0, false,
-        new mutable.WrappedArray.ofRef(Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, false)))),
+        new mutable.WrappedArray.ofRef(Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, false))), new mutable.WrappedArray.ofRef(Array("master", "stable-2.14"))),
       Row("p2", "b", "b@mail.com", 2017, 9, 11, 23, 2, 3, 1, 1, 428, 1500000000000L, true,
-        new mutable.WrappedArray.ofRef[Row](Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, true), Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 1500000000000L, true)))),
+        new mutable.WrappedArray.ofRef[Row](Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, true), Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 1500000000000L, true))), null),
       Row("p3", "c", "c@mail.com", null, null, null, null, 4, 2, 1, 1, 12, 1600000000000L, true,
-        new mutable.WrappedArray.ofRef[Row](Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, true), Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 1600000000000L, true))))
+        new mutable.WrappedArray.ofRef[Row](Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, true), Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 1600000000000L, true))), null)
     )
   }
 
diff --git a/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala
index 05b98dd..434662f 100644
--- a/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala
@@ -14,10 +14,6 @@
 
 package com.gerritforge.analytics.engine.events
 
-import java.time.format.DateTimeFormatter
-import java.time.temporal.ChronoField.{MILLI_OF_SECOND, NANO_OF_SECOND}
-import java.time.{ZoneId, ZonedDateTime}
-
 import com.gerritforge.analytics.SparkTestSupport
 import com.gerritforge.analytics.engine.GerritAnalyticsTransformations.{CommitInfo, UserActivitySummary}
 import com.gerritforge.analytics.engine.events.GerritEventsTransformations.NotParsableJsonEvent
@@ -65,11 +61,11 @@
 
     "Build one UserActivitySummary object if given a series of non-merge commits" in {
       val events: Seq[ChangeMergedEvent] = Seq(
-        aChangeMergedEvent("1", 1001l, newRev = "rev1", insertions = 2, deletions = 1),
-        aChangeMergedEvent("2", 1002l, newRev = "rev2", insertions = 3, deletions = 0),
-        aChangeMergedEvent("3", 1003l, newRev = "rev3", insertions = 1, deletions = 4),
-        aChangeMergedEvent("4", 1004l, newRev = "rev4", insertions = 0, deletions = 2),
-        aChangeMergedEvent("5", 1005l, newRev = "rev5", insertions = 1, deletions = 1)
+        aChangeMergedEvent("1", 1001l, newRev = "rev1", insertions = 2, deletions = 1, branch = "stable-1.14"),
+        aChangeMergedEvent("2", 1002l, newRev = "rev2", insertions = 3, deletions = 0, branch = "stable-1.14"),
+        aChangeMergedEvent("3", 1003l, newRev = "rev3", insertions = 1, deletions = 4, branch = "stable-1.14"),
+        aChangeMergedEvent("4", 1004l, newRev = "rev4", insertions = 0, deletions = 2, branch = "stable-1.14"),
+        aChangeMergedEvent("5", 1005l, newRev = "rev5", insertions = 1, deletions = 1, branch = "stable-1.14")
       ).map(_.event)
 
       val summaries: Iterable[UserActivitySummary] = GerritEventsTransformations.extractUserActivitySummary(
@@ -80,7 +76,7 @@
       summaries should have size 1
 
       inside(summaries.head) {
-        case UserActivitySummary(year, month, day, hour, name, email, num_commits, _, _, added_lines, deleted_lines, commits, last_commit_date, is_merge) =>
+        case UserActivitySummary(year, month, day, hour, name, email, num_commits, _, _, added_lines, deleted_lines, commits, branches, last_commit_date, is_merge) =>
           year shouldBe 2018
           month shouldBe 1
           day shouldBe 10
@@ -99,6 +95,7 @@
             CommitInfo("rev4", 1004000l, false),
             CommitInfo("rev5", 1005000l, false)
           )
+          branches should contain only "stable-1.14"
       }
     }
 
@@ -120,7 +117,7 @@
 
       summaries.foreach { summary =>
         inside(summary) {
-          case UserActivitySummary(year, month, day, hour, name, email,_, _, _, _, _, _, _, _) =>
+          case UserActivitySummary(year, month, day, hour, name, email,_, _, _, _, _, _, _, _, _) =>
             year shouldBe 2018
             month shouldBe 1
             day shouldBe 10
@@ -132,7 +129,7 @@
 
       summaries.foreach { summary =>
         inside(summary) {
-          case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, last_commit_date, false) =>
+          case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, _, last_commit_date, false) =>
             num_commits shouldBe 3
             last_commit_date shouldBe 1005000l
             commits should contain only(
@@ -141,7 +138,7 @@
               CommitInfo("rev5", 1005000l, false)
             )
 
-          case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, last_commit_date, true) =>
+          case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, _, last_commit_date, true) =>
             num_commits shouldBe 2
             last_commit_date shouldBe 1003000l
             commits should contain only(
@@ -169,7 +166,7 @@
       val analyticsJobOutput =
         sc.parallelize(Seq(
           "project1" -> UserActivitySummary(2018, 1, 20, 10, "Stefano", "stefano@galarraga-org.com", 1, 2, 1, 10, 4, Array(CommitInfo("sha1", expectedDate, false)),
-            expectedDate, false)
+            Array("master", "stable-2.14"), expectedDate, false)
         ))
           .asEtlDataFrame(sql)
           .addOrganization()
@@ -177,11 +174,13 @@
           .dropCommits
 
       val expected = sc.parallelize(Seq(
-        ("project1", "stefano_alias", "stefano@galarraga-org.com", 2018, 1, 20, 10, 2, 1, 10, 4, 1, expectedDate, false, "galarraga-org")
+        ("project1", "stefano_alias", "stefano@galarraga-org.com", 2018, 1, 20, 10, 2, 1, 10, 4, 1, expectedDate, false, Array("master", "stable-2.14"), "galarraga-org")
       )).toDF("project", "author", "email", "year", "month", "day", "hour", "num_files", "num_distinct_files",
-        "added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "organization")
+        "added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "branches", "organization")
 
-      analyticsJobOutput.collect() should contain theSameElementsAs expected.collect()
+      val collected = analyticsJobOutput.collect()
+
+      collected should contain theSameElementsAs expected.collect()
     }
   }
 
@@ -225,7 +224,7 @@
        |"type":"ref-updated","eventCreatedOn":$createdOn}""".stripMargin)
 
   def aChangeMergedEvent(changeId: String, createdOnInSecs: Long = 1000l, newRev: String = "863b64002f2a9922deba69407804a44703c996e0",
-                         isMergeCommit: Boolean = false, insertions: Integer = 0, deletions: Integer = 0) = JsonEvent[ChangeMergedEvent](
+                         isMergeCommit: Boolean = false, insertions: Integer = 0, deletions: Integer = 0, branch: String = "master") = JsonEvent[ChangeMergedEvent](
     s"""{
        |"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"},
        |"newRev":"$newRev",
@@ -243,13 +242,13 @@
        | "sizeDeletions":$deletions
        |},
        |"change":{
-       | "project":"subcut","branch":"master","topic":"TestEvents","id":"$changeId","number":1,"subject":"Generating some changes to test events",
+       | "project":"subcut","branch":"$branch","topic":"TestEvents","id":"$changeId","number":1,"subject":"Generating some changes to test events",
        | "owner":{"name":"Administrator","email":"admin@example.com","username":"admin"},
        | "url":"http://842860da5b33:8080/1","commitMessage":"Generating some changes to test events Change-Id: $changeId",
        | "createdOn":1516530259,"status":"MERGED"
        |},
        |"project":{"name":"subcut"},
-       |"refName":"refs/heads/master",
+       |"refName":"refs/heads/$branch",
        |"changeKey":{"id":"$changeId"},
        |"type":"change-merged",
        |"eventCreatedOn": $createdOnInSecs