Add organization information

Use organization value specified in CSV file or extract it
from email as previously implemented.

Change-Id: I890fdbad48c9086c03c980d98742a7236776f3c5
diff --git a/README.md b/README.md
index 4504d44..65d1059 100644
--- a/README.md
+++ b/README.md
@@ -25,21 +25,26 @@
     the system temporary directory
 - -a --email-aliases (*optional*) "emails to author alias" input data path.
 
-  CSVs with 2 columns are expected in input.
+  CSVs with 3 columns are expected in input.
 
   Here an example of the required files structure:
   ```csv
-  author,email
-  John Smith,john@email.com
-  John Smith,john@anotheremail.com
-  David Smith,david.smith@email.com
-  David Smith,david@myemail.com
+  author,email,organization
+  John Smith,john@email.com,John's Company
+  John Smith,john@anotheremail.com,John's Company
+  David Smith,david.smith@email.com,Indipendent
+  David Smith,david@myemail.com,Indipendent
   ```
 
-  You can use the following command to quickly extract the list of authors and emails to create an input CSV file:
+  You can use the following command to quickly extract the list of authors and emails to create part of an input CSV file:
   ```bash
   echo -e "author,email\n$(git log --pretty="%an,%ae%n%cn,%ce"|sort |uniq )" > /tmp/my_aliases.csv
   ```
+  Once you have it, you just have to add the organization column.
+
+  *NOTE:*
+  * **organization** will be extracted from the committer email if not specified
+  * **author** will be defaulted to the committer name if not specified
 
 ## Development environment
 
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index c6ca959..c9db628 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -55,7 +55,7 @@
     }
   }
 
-  def getEmailAliasDF(emailAliases: Option[String])(implicit spark: SparkSession): Option[DataFrame] = {
+  def getAliasDF(emailAliases: Option[String])(implicit spark: SparkSession): Option[DataFrame] = {
     emailAliases.map { path =>
       spark.sqlContext.read
         .option("header", "true")
@@ -99,26 +99,31 @@
           "json.num_commits as num_commits", "json.last_commit_date as last_commit_date")
     }
 
-    def handleAuthorEMailAliases(emailAliasesDF: Option[DataFrame])(implicit spark: SparkSession): DataFrame = {
-      emailAliasesDF
+    def handleAliases(aliasesDF: Option[DataFrame])(implicit spark: SparkSession): DataFrame = {
+      aliasesDF
         .map{ eaDF =>
                 val renamedAliasesDF = eaDF
                                         .withColumnRenamed("email", "email_alias")
                                         .withColumnRenamed("author", "author_alias")
+                                        .withColumnRenamed("organization", "organization_alias")
+
                 df.join(renamedAliasesDF, df("email") === renamedAliasesDF("email_alias"), "left_outer" )
+                  .withColumn("organization",
+                    when(renamedAliasesDF("organization_alias").notEqual(""), renamedAliasesDF("organization_alias"))
+                      .otherwise(df("organization")) )
                   .withColumn("author", coalesce(renamedAliasesDF("author_alias"), df("author")))
-                  .drop("email_alias","author_alias")
+                  .drop("email_alias","author_alias", "organization_alias")
             }
         .getOrElse(df)
     }
 
+
     def convertDates(columnName: String)(implicit spark: SparkSession): DataFrame = {
       df.withColumn(columnName, longDateToISOUdf(col(columnName)))
     }
 
-    def addOrganization()(implicit spark: SparkSession): DataFrame = {
+    def addOrganization()(implicit spark: SparkSession): DataFrame =
       df.withColumn("organization", emailToDomainUdf(col("email")))
-    }
   }
 
   private def emailToDomain(email: String): String = {
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index 02943b6..5a658ef 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -69,16 +69,17 @@
     import spark.sqlContext.implicits._ // toDF
     val sc = spark.sparkContext
     val projects = sc.parallelize(GerritProjects(Source.fromURL(s"${config.baseUrl}/projects/")))
-    val emailAliasesDF = getEmailAliasDF(config.emailAlias)
+    val aliasesDF = getAliasDF(config.emailAlias)
 
     projects
       .enrichWithSource
       .fetchRawContributors
       .toDF("project", "json")
       .transformCommitterInfo
-      .handleAuthorEMailAliases(emailAliasesDF)
-      .convertDates("last_commit_date")
       .addOrganization()
+      .handleAliases(aliasesDF)
+      .convertDates("last_commit_date")
+
   }
   def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
     import org.elasticsearch.spark.sql._
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 965f036..f940d75 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -118,26 +118,57 @@
     )
   }
 
-  "handleAuthorEMailAliases" should "enrich the data with author from the alias DF" in {
+  "handleAliases" should "enrich the data with author from the alias DF" in {
     import spark.implicits._
-    val goodAliasDF = sc.parallelize(Seq(
-      ("aliased_author", "aliased_email@mail.com")
-    )).toDF("author", "email")
+
+    val aliasDF = sc.parallelize(Seq(
+      ("aliased_author", "aliased_email@aliased_author.com", "")
+    )).toDF("author", "email", "organization")
 
     val inputSampleDF = sc.parallelize(Seq(
-      ("author_from_name_a", "non_aliased_email@mail.com"),
-      ("author_from_name_b", "aliased_email@mail.com")
-    )).toDF("author", "email")
+      ("author_from_name_a", "non_aliased_email@a_mail.com", "a_mail.com"),
+      ("author_from_name_b", "aliased_email@aliased_author.com", "aliased_author.com")
+    )).toDF("author", "email", "organization")
 
     val expectedDF = sc.parallelize(Seq(
-      ("author_from_name_a", "non_aliased_email@mail.com"),
-      ("aliased_author", "aliased_email@mail.com")
-    )).toDF("author", "email")
+      ("author_from_name_a", "non_aliased_email@a_mail.com", "a_mail.com"),
+      ("aliased_author", "aliased_email@aliased_author.com", "aliased_author.com")
+    )).toDF("author", "email", "organization")
 
-    val df = inputSampleDF.handleAuthorEMailAliases(Some(goodAliasDF))
+    val df = inputSampleDF.handleAliases(Some(aliasDF))
 
     df.schema.fields.map(_.name) should contain allOf(
-      "author", "email")
+      "author", "email", "organization")
+
+    df.collect should contain theSameElementsAs expectedDF.collect
+  }
+
+  it should "enrich the data with organization from the alias DF when available" in {
+    import spark.implicits._
+
+    val aliasDF = sc.parallelize(Seq(
+      ("aliased_author_with_organization", "aliased_email@aliased_organization.com", "aliased_organization"),
+      ("aliased_author_empty_organization", "aliased_email@emtpy_organization.com", ""),
+      ("aliased_author_null_organization", "aliased_email@null_organization.com", null)
+
+    )).toDF("author", "email", "organization")
+
+    val inputSampleDF = sc.parallelize(Seq(
+      ("author_from_name_a", "aliased_email@aliased_organization.com", "aliased_organization.com"),
+      ("author_from_name_b", "aliased_email@emtpy_organization.com", "emtpy_organization.com"),
+      ("author_from_name_c", "aliased_email@null_organization.com", "null_organization.com")
+    )).toDF("author", "email", "organization")
+
+    val expectedDF = sc.parallelize(Seq(
+      ("aliased_author_with_organization", "aliased_email@aliased_organization.com", "aliased_organization"),
+      ("aliased_author_empty_organization", "aliased_email@emtpy_organization.com", "emtpy_organization.com"),
+      ("aliased_author_null_organization", "aliased_email@null_organization.com", "null_organization.com")
+    )).toDF("author", "email", "organization")
+
+    val df = inputSampleDF.handleAliases(Some(aliasDF))
+
+    df.schema.fields.map(_.name) should contain allOf(
+      "author", "email", "organization")
 
     df.collect should contain theSameElementsAs expectedDF.collect
   }
@@ -145,27 +176,27 @@
   it should "return correct columns when alias DF is defined" in {
     import spark.implicits._
     val inputSampleDF = sc.parallelize(Seq(
-      ("author_name", "email@mail.com")
-    )).toDF("author", "email")
+      ("author_name", "email@mail.com", "an_organization")
+    )).toDF("author", "email", "organization")
 
     val aliasDF = sc.parallelize(Seq(
-      ("a_random_author", "a_random_email@mail.com")
-    )).toDF("author", "email")
+      ("a_random_author", "a_random_email@mail.com", "a_random_organization")
+    )).toDF("author", "email", "organization")
 
-    val df = inputSampleDF.handleAuthorEMailAliases(Some(aliasDF))
+    val df = inputSampleDF.handleAliases(Some(aliasDF))
 
-    df.schema.fields.map(_.name) should contain allOf("author", "email")
+    df.schema.fields.map(_.name) should contain allOf("author", "email", "organization")
   }
 
   it should "return correct columns when alias DF is not defined" in {
     import spark.implicits._
     val inputSampleDF = sc.parallelize(Seq(
-      ("author_name", "email@mail.com")
-    )).toDF("author", "email")
+      ("author_name", "email@mail.com", "an_organization")
+    )).toDF("author", "email", "organization")
 
-    val df = inputSampleDF.handleAuthorEMailAliases(None)
+    val df = inputSampleDF.handleAliases(None)
 
-    df.schema.fields.map(_.name) should contain allOf("author", "email")
+    df.schema.fields.map(_.name) should contain allOf("author", "email", "organization")
   }
 
   "addOrganization" should "compute organization column from the email" in {
@@ -188,7 +219,6 @@
       Row("not an email", ""),
       Row("email@domain", "domain")
     )
-
   }
 
   "convertDates" should "process specific column from Long to ISO date" in {