Add organization information

Use organization value specified in CSV file or extract it
from email as previously implemented.

Change-Id: I890fdbad48c9086c03c980d98742a7236776f3c5
diff --git a/ b/
index 4504d44..65d1059 100644
--- a/
+++ b/
@@ -25,21 +25,26 @@
     the system temporary directory
 - -a --email-aliases (*optional*) "emails to author alias" input data path.
-  CSVs with 2 columns are expected in input.
+  CSVs with 3 columns are expected in input.
   Here an example of the required files structure:
-  author,email
-  John Smith,
-  John Smith,
-  David Smith,
-  David Smith,
+  author,email,organization
+  John Smith,,John's Company
+  John Smith,,John's Company
+  David Smith,,Indipendent
+  David Smith,,Indipendent
-  You can use the following command to quickly extract the list of authors and emails to create an input CSV file:
+  You can use the following command to quickly extract the list of authors and emails to create part of an input CSV file:
   echo -e "author,email\n$(git log --pretty="%an,%ae%n%cn,%ce"|sort |uniq )" > /tmp/my_aliases.csv
+  Once you have it, you just have to add the organization column.
+  *NOTE:*
+  * **organization** will be extracted from the committer email if not specified
+  * **author** will be defaulted to the committer name if not specified
 ## Development environment
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index c6ca959..c9db628 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -55,7 +55,7 @@
-  def getEmailAliasDF(emailAliases: Option[String])(implicit spark: SparkSession): Option[DataFrame] = {
+  def getAliasDF(emailAliases: Option[String])(implicit spark: SparkSession): Option[DataFrame] = { { path =>
         .option("header", "true")
@@ -99,26 +99,31 @@
           "json.num_commits as num_commits", "json.last_commit_date as last_commit_date")
-    def handleAuthorEMailAliases(emailAliasesDF: Option[DataFrame])(implicit spark: SparkSession): DataFrame = {
-      emailAliasesDF
+    def handleAliases(aliasesDF: Option[DataFrame])(implicit spark: SparkSession): DataFrame = {
+      aliasesDF
         .map{ eaDF =>
                 val renamedAliasesDF = eaDF
                                         .withColumnRenamed("email", "email_alias")
                                         .withColumnRenamed("author", "author_alias")
+                                        .withColumnRenamed("organization", "organization_alias")
                 df.join(renamedAliasesDF, df("email") === renamedAliasesDF("email_alias"), "left_outer" )
+                  .withColumn("organization",
+                    when(renamedAliasesDF("organization_alias").notEqual(""), renamedAliasesDF("organization_alias"))
+                      .otherwise(df("organization")) )
                   .withColumn("author", coalesce(renamedAliasesDF("author_alias"), df("author")))
-                  .drop("email_alias","author_alias")
+                  .drop("email_alias","author_alias", "organization_alias")
     def convertDates(columnName: String)(implicit spark: SparkSession): DataFrame = {
       df.withColumn(columnName, longDateToISOUdf(col(columnName)))
-    def addOrganization()(implicit spark: SparkSession): DataFrame = {
+    def addOrganization()(implicit spark: SparkSession): DataFrame =
       df.withColumn("organization", emailToDomainUdf(col("email")))
-    }
   private def emailToDomain(email: String): String = {
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index 02943b6..5a658ef 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -69,16 +69,17 @@
     import spark.sqlContext.implicits._ // toDF
     val sc = spark.sparkContext
     val projects = sc.parallelize(GerritProjects(Source.fromURL(s"${config.baseUrl}/projects/")))
-    val emailAliasesDF = getEmailAliasDF(config.emailAlias)
+    val aliasesDF = getAliasDF(config.emailAlias)
       .toDF("project", "json")
-      .handleAuthorEMailAliases(emailAliasesDF)
-      .convertDates("last_commit_date")
+      .handleAliases(aliasesDF)
+      .convertDates("last_commit_date")
   def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
     import org.elasticsearch.spark.sql._
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 965f036..f940d75 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -118,26 +118,57 @@
-  "handleAuthorEMailAliases" should "enrich the data with author from the alias DF" in {
+  "handleAliases" should "enrich the data with author from the alias DF" in {
     import spark.implicits._
-    val goodAliasDF = sc.parallelize(Seq(
-      ("aliased_author", "")
-    )).toDF("author", "email")
+    val aliasDF = sc.parallelize(Seq(
+      ("aliased_author", "", "")
+    )).toDF("author", "email", "organization")
     val inputSampleDF = sc.parallelize(Seq(
-      ("author_from_name_a", ""),
-      ("author_from_name_b", "")
-    )).toDF("author", "email")
+      ("author_from_name_a", "", ""),
+      ("author_from_name_b", "", "")
+    )).toDF("author", "email", "organization")
     val expectedDF = sc.parallelize(Seq(
-      ("author_from_name_a", ""),
-      ("aliased_author", "")
-    )).toDF("author", "email")
+      ("author_from_name_a", "", ""),
+      ("aliased_author", "", "")
+    )).toDF("author", "email", "organization")
-    val df = inputSampleDF.handleAuthorEMailAliases(Some(goodAliasDF))
+    val df = inputSampleDF.handleAliases(Some(aliasDF)) should contain allOf(
-      "author", "email")
+      "author", "email", "organization")
+    df.collect should contain theSameElementsAs expectedDF.collect
+  }
+  it should "enrich the data with organization from the alias DF when available" in {
+    import spark.implicits._
+    val aliasDF = sc.parallelize(Seq(
+      ("aliased_author_with_organization", "", "aliased_organization"),
+      ("aliased_author_empty_organization", "", ""),
+      ("aliased_author_null_organization", "", null)
+    )).toDF("author", "email", "organization")
+    val inputSampleDF = sc.parallelize(Seq(
+      ("author_from_name_a", "", ""),
+      ("author_from_name_b", "", ""),
+      ("author_from_name_c", "", "")
+    )).toDF("author", "email", "organization")
+    val expectedDF = sc.parallelize(Seq(
+      ("aliased_author_with_organization", "", "aliased_organization"),
+      ("aliased_author_empty_organization", "", ""),
+      ("aliased_author_null_organization", "", "")
+    )).toDF("author", "email", "organization")
+    val df = inputSampleDF.handleAliases(Some(aliasDF))
+ should contain allOf(
+      "author", "email", "organization")
     df.collect should contain theSameElementsAs expectedDF.collect
@@ -145,27 +176,27 @@
   it should "return correct columns when alias DF is defined" in {
     import spark.implicits._
     val inputSampleDF = sc.parallelize(Seq(
-      ("author_name", "")
-    )).toDF("author", "email")
+      ("author_name", "", "an_organization")
+    )).toDF("author", "email", "organization")
     val aliasDF = sc.parallelize(Seq(
-      ("a_random_author", "")
-    )).toDF("author", "email")
+      ("a_random_author", "", "a_random_organization")
+    )).toDF("author", "email", "organization")
-    val df = inputSampleDF.handleAuthorEMailAliases(Some(aliasDF))
+    val df = inputSampleDF.handleAliases(Some(aliasDF))
- should contain allOf("author", "email")
+ should contain allOf("author", "email", "organization")
   it should "return correct columns when alias DF is not defined" in {
     import spark.implicits._
     val inputSampleDF = sc.parallelize(Seq(
-      ("author_name", "")
-    )).toDF("author", "email")
+      ("author_name", "", "an_organization")
+    )).toDF("author", "email", "organization")
-    val df = inputSampleDF.handleAuthorEMailAliases(None)
+    val df = inputSampleDF.handleAliases(None)
- should contain allOf("author", "email")
+ should contain allOf("author", "email", "organization")
   "addOrganization" should "compute organization column from the email" in {
@@ -188,7 +219,6 @@
       Row("not an email", ""),
       Row("email@domain", "domain")
   "convertDates" should "process specific column from Long to ISO date" in {