Allow aggregation per hashtags

Added the capability of aggregating by the hash-tags linked to each change.

Feature: Issue 10202
Change-Id: I06eabbe00ded15fa0d5f47528c46d25f1c4421e4
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala
index 37f9edd..cfe7960 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala
@@ -99,6 +99,7 @@
       added_lines: Integer,
       deleted_lines: Integer,
       commits: Array[CommitInfo],
+      hash_tags: Array[String],
       branches: Array[String],
       last_commit_date: Long,
       is_merge: Boolean,
@@ -143,7 +144,8 @@
           "json.is_merge as is_merge",
           "json.commits as commits",
           "json.branches as branches",
-          "json.is_bot_like"
+          "json.is_bot_like",
+          "json.hash_tags as hash_tags"
         )
     }
 
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
index 447102a..b35a8f5 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
@@ -85,6 +85,7 @@
       opt[Boolean]('r', "extract-branches") optional () action { (input, c) =>
         c.copy(extractBranches = Some(input))
       } text "enables branches extraction for each commit"
+
     }
 
   cliOptionParser.parse(args, GerritEndpointConfig()) match {
diff --git a/gitcommits/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/gitcommits/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 3947759..cf7ef08 100644
--- a/gitcommits/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/gitcommits/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -157,7 +157,7 @@
     "year", "month", "day", "hour",
     "num_files", "num_distinct_files", "added_lines", "deleted_lines",
     "num_commits", "last_commit_date",
-    "is_merge", "commits", "branches", "is_bot_like")
+    "is_merge", "commits", "branches", "is_bot_like", "hash_tags")
 
     collected should contain allOf (
       Row(
@@ -179,7 +179,8 @@
           Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0L, false))
         ),
         new mutable.WrappedArray.ofRef(Array("master", "stable-2.14")),
-        false
+        false,
+        null
       ),
       Row(
         "p2",
@@ -203,7 +204,8 @@
           )
         ),
         null,
-        true
+        true,
+        null
       ),
       Row(
         "p3",
@@ -227,11 +229,39 @@
           )
         ),
         null,
-        false
+        false,
+        null
       )
     )
   }
 
+  it should "extract single hashtag" in {
+    import sql.implicits._
+
+    val hashTag = "test-hash-tag"
+    val rdd     = sc.parallelize(Seq(("p1", s"""{"hash_tags": ["$hashTag"] }""")))
+
+    val df = rdd.toDF("project", "json").transformCommitterInfo
+
+    df.count should be(1)
+    df.schema.fields.map(_.name) should contain inOrder ("project", "hash_tags")
+    df.collect.head.getAs[Array[String]]("hash_tags") shouldBe Array(hashTag)
+  }
+
+  it should "extract multiple hashtags" in {
+    import sql.implicits._
+
+    val hashTag1 = "test-hash-tag-1"
+    val hashTag2 = "test-hash-tag-2"
+    val rdd      = sc.parallelize(Seq(("p1", s"""{"hash_tags": ["$hashTag1", "$hashTag2"] }""")))
+
+    val df = rdd.toDF("project", "json").transformCommitterInfo
+
+    df.count should be(1)
+    df.schema.fields.map(_.name) should contain inOrder ("project", "hash_tags")
+    df.collect.head.getAs[Array[String]]("hash_tags") shouldBe Array(hashTag1, hashTag2)
+  }
+
   "handleAliases" should "enrich the data with author from the alias DF" in {
     import spark.implicits._