Input aliases from CSV instead of JSON

CSVs can be easily generated with simple git commands.
Furthermore the code is simplified compared to the JSON version.

Change-Id: I4332497407df40ce1e68f0de304fad45ad5f6b8e
diff --git a/README.md b/README.md
index cf1c8a7..4504d44 100644
--- a/README.md
+++ b/README.md
@@ -23,10 +23,22 @@
 - -o --out folder location for storing the output as JSON files
     if not provided data is saved to </tmp>/analytics-<NNNN> where </tmp> is
     the system temporary directory
-- -a --email-aliases (*optional*) "emails to author alias" input data path. Here an example of the required files structure:
-  ```json
-  {"author": "John", "emails": ["john@email.com", "john@anotheremail.com"]}
-  {"author": "David", "emails": ["david.smith@email.com", "david@myemail.com"]}
+- -a --email-aliases (*optional*) "emails to author alias" input data path.
+
+  CSVs with 2 columns are expected in input.
+
+  Here an example of the required files structure:
+  ```csv
+  author,email
+  John Smith,john@email.com
+  John Smith,john@anotheremail.com
+  David Smith,david.smith@email.com
+  David Smith,david@myemail.com
+  ```
+
+  You can use the following command to quickly extract the list of authors and emails to create an input CSV file:
+  ```bash
+  echo -e "author,email\n$(git log --pretty="%an,%ae%n%cn,%ce"|sort |uniq )" > /tmp/my_aliases.csv
   ```
 
 ## Development environment
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index 5167296..c6ca959 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -26,7 +26,6 @@
 import org.apache.spark.sql.{DataFrame, SparkSession}
 
 import scala.collection.JavaConverters._
-import scala.util.Try
 
 object GerritAnalyticsTransformations {
 
@@ -56,12 +55,15 @@
     }
   }
 
-  def getEmailAliasDF(emailAliasesRDD: RDD[String])(implicit spark: SparkSession): DataFrame = {
-    import spark.sqlContext.implicits._
-    Try {
-      val df = spark.sqlContext.read.json(emailAliasesRDD).toDF()
-      df.withColumn("email_alias", explode(df("emails"))).drop("emails")
-    }.getOrElse(spark.emptyDataset[CommitterInfo].toDF())
+  def getEmailAliasDF(emailAliases: Option[String])(implicit spark: SparkSession): Option[DataFrame] = {
+    emailAliases.map { path =>
+      spark.sqlContext.read
+        .option("header", "true")
+        .option("mode", "DROPMALFORMED")
+        .option("inferSchema", "true")
+        .csv(path)
+        .toDF()
+    }
   }
 
   case class CommitterInfo(author: String, email_alias: String)
@@ -91,16 +93,23 @@
       import spark.sqlContext.implicits._
       df.withColumn("json", from_json($"json", schema))
         .selectExpr(
-          "project", "json.name as name", "json.email as email",
+          "project", "json.name as author", "json.email as email",
           "json.year as year", "json.month as month", "json.day as day", "json.hour as hour",
           "json.num_files as num_files", "json.added_lines as added_lines", "json.deleted_lines as deleted_lines",
           "json.num_commits as num_commits", "json.last_commit_date as last_commit_date")
     }
 
-    def handleAuthorEMailAliases(emailAliasesDF: DataFrame)(implicit spark: SparkSession): DataFrame = {
-      import spark.sqlContext.implicits._
-      df.join(emailAliasesDF, df("email") === emailAliasesDF("email_alias"), "left_outer" )
-        .withColumn("author", coalesce($"author", $"name")).drop("name", "email_alias")
+    def handleAuthorEMailAliases(emailAliasesDF: Option[DataFrame])(implicit spark: SparkSession): DataFrame = {
+      emailAliasesDF
+        .map{ eaDF =>
+                val renamedAliasesDF = eaDF
+                                        .withColumnRenamed("email", "email_alias")
+                                        .withColumnRenamed("author", "author_alias")
+                df.join(renamedAliasesDF, df("email") === renamedAliasesDF("email_alias"), "left_outer" )
+                  .withColumn("author", coalesce(renamedAliasesDF("author_alias"), df("author")))
+                  .drop("email_alias","author_alias")
+            }
+        .getOrElse(df)
     }
 
     def convertDates(columnName: String)(implicit spark: SparkSession): DataFrame = {
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index 2967931..02943b6 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -69,8 +69,8 @@
     import spark.sqlContext.implicits._ // toDF
     val sc = spark.sparkContext
     val projects = sc.parallelize(GerritProjects(Source.fromURL(s"${config.baseUrl}/projects/")))
-    val aliasesRDD = config.emailAlias.map(path => sc.textFile(path)).getOrElse(sc.emptyRDD)
-    val emailAliasesDF = getEmailAliasDF(aliasesRDD)
+    val emailAliasesDF = getEmailAliasDF(config.emailAlias)
+
     projects
       .enrichWithSource
       .fetchRawContributors
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index c0bbd53..965f036 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -106,7 +106,7 @@
     val collected = df.collect
 
     df.schema.fields.map(_.name) should contain allOf(
-      "project", "name", "email",
+      "project", "author", "email",
       "year", "month", "day", "hour",
       "num_files", "added_lines", "deleted_lines",
       "num_commits", "last_commit_date")
@@ -122,19 +122,19 @@
     import spark.implicits._
     val goodAliasDF = sc.parallelize(Seq(
       ("aliased_author", "aliased_email@mail.com")
-    )).toDF("author", "email_alias")
+    )).toDF("author", "email")
 
     val inputSampleDF = sc.parallelize(Seq(
       ("author_from_name_a", "non_aliased_email@mail.com"),
       ("author_from_name_b", "aliased_email@mail.com")
-    )).toDF("name", "email")
+    )).toDF("author", "email")
 
     val expectedDF = sc.parallelize(Seq(
-      ("non_aliased_email@mail.com", "author_from_name_a"),
-      ("aliased_email@mail.com", "aliased_author")
-    )).toDF("email", "author")
+      ("author_from_name_a", "non_aliased_email@mail.com"),
+      ("aliased_author", "aliased_email@mail.com")
+    )).toDF("author", "email")
 
-    val df = inputSampleDF.handleAuthorEMailAliases(goodAliasDF)
+    val df = inputSampleDF.handleAuthorEMailAliases(Some(goodAliasDF))
 
     df.schema.fields.map(_.name) should contain allOf(
       "author", "email")
@@ -142,19 +142,30 @@
     df.collect should contain theSameElementsAs expectedDF.collect
   }
 
-  "getEmailAliasDF" should "not choke on syntax errors in json file" in {
-    val wrongAliasesJSON = """{"unexpected_field": "unexpected_value"}"""
+  it should "return correct columns when alias DF is defined" in {
+    import spark.implicits._
+    val inputSampleDF = sc.parallelize(Seq(
+      ("author_name", "email@mail.com")
+    )).toDF("author", "email")
 
-    val df = getEmailAliasDF(sc.parallelize(Seq(wrongAliasesJSON)))
-    df.collect() should be(empty)
+    val aliasDF = sc.parallelize(Seq(
+      ("a_random_author", "a_random_email@mail.com")
+    )).toDF("author", "email")
+
+    val df = inputSampleDF.handleAuthorEMailAliases(Some(aliasDF))
+
+    df.schema.fields.map(_.name) should contain allOf("author", "email")
   }
 
-  it should "return a DF with authors and email aliases" in {
-    val aliasesJSON = """{"author": "author_from_alias_file", "emails": ["aliased_email@mail.com"]}"""
+  it should "return correct columns when alias DF is not defined" in {
+    import spark.implicits._
+    val inputSampleDF = sc.parallelize(Seq(
+      ("author_name", "email@mail.com")
+    )).toDF("author", "email")
 
-    val df = getEmailAliasDF(sc.parallelize(Seq(aliasesJSON)))
+    val df = inputSampleDF.handleAuthorEMailAliases(None)
 
-    df.schema.fields.map(_.name) should contain allOf("author", "email_alias")
+    df.schema.fields.map(_.name) should contain allOf("author", "email")
   }
 
   "addOrganization" should "compute organization column from the email" in {