Input aliases from CSV instead of JSON
CSVs can be easily generated with simple git commands.
Furthermore the code is simplified compared to the JSON version.
Change-Id: I4332497407df40ce1e68f0de304fad45ad5f6b8e
diff --git a/README.md b/README.md
index cf1c8a7..4504d44 100644
--- a/README.md
+++ b/README.md
@@ -23,10 +23,22 @@
- -o --out folder location for storing the output as JSON files
if not provided data is saved to </tmp>/analytics-<NNNN> where </tmp> is
the system temporary directory
-- -a --email-aliases (*optional*) "emails to author alias" input data path. Here an example of the required files structure:
- ```json
- {"author": "John", "emails": ["john@email.com", "john@anotheremail.com"]}
- {"author": "David", "emails": ["david.smith@email.com", "david@myemail.com"]}
+- -a --email-aliases (*optional*) "emails to author alias" input data path.
+
+ CSVs with 2 columns are expected in input.
+
+ Here an example of the required files structure:
+ ```csv
+ author,email
+ John Smith,john@email.com
+ John Smith,john@anotheremail.com
+ David Smith,david.smith@email.com
+ David Smith,david@myemail.com
+ ```
+
+ You can use the following command to quickly extract the list of authors and emails to create an input CSV file:
+ ```bash
+ echo -e "author,email\n$(git log --pretty="%an,%ae%n%cn,%ce"|sort |uniq )" > /tmp/my_aliases.csv
```
## Development environment
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index 5167296..c6ca959 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -26,7 +26,6 @@
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.JavaConverters._
-import scala.util.Try
object GerritAnalyticsTransformations {
@@ -56,12 +55,15 @@
}
}
- def getEmailAliasDF(emailAliasesRDD: RDD[String])(implicit spark: SparkSession): DataFrame = {
- import spark.sqlContext.implicits._
- Try {
- val df = spark.sqlContext.read.json(emailAliasesRDD).toDF()
- df.withColumn("email_alias", explode(df("emails"))).drop("emails")
- }.getOrElse(spark.emptyDataset[CommitterInfo].toDF())
+ def getEmailAliasDF(emailAliases: Option[String])(implicit spark: SparkSession): Option[DataFrame] = {
+ emailAliases.map { path =>
+ spark.sqlContext.read
+ .option("header", "true")
+ .option("mode", "DROPMALFORMED")
+ .option("inferSchema", "true")
+ .csv(path)
+ .toDF()
+ }
}
case class CommitterInfo(author: String, email_alias: String)
@@ -91,16 +93,23 @@
import spark.sqlContext.implicits._
df.withColumn("json", from_json($"json", schema))
.selectExpr(
- "project", "json.name as name", "json.email as email",
+ "project", "json.name as author", "json.email as email",
"json.year as year", "json.month as month", "json.day as day", "json.hour as hour",
"json.num_files as num_files", "json.added_lines as added_lines", "json.deleted_lines as deleted_lines",
"json.num_commits as num_commits", "json.last_commit_date as last_commit_date")
}
- def handleAuthorEMailAliases(emailAliasesDF: DataFrame)(implicit spark: SparkSession): DataFrame = {
- import spark.sqlContext.implicits._
- df.join(emailAliasesDF, df("email") === emailAliasesDF("email_alias"), "left_outer" )
- .withColumn("author", coalesce($"author", $"name")).drop("name", "email_alias")
+ def handleAuthorEMailAliases(emailAliasesDF: Option[DataFrame])(implicit spark: SparkSession): DataFrame = {
+ emailAliasesDF
+ .map{ eaDF =>
+ val renamedAliasesDF = eaDF
+ .withColumnRenamed("email", "email_alias")
+ .withColumnRenamed("author", "author_alias")
+ df.join(renamedAliasesDF, df("email") === renamedAliasesDF("email_alias"), "left_outer" )
+ .withColumn("author", coalesce(renamedAliasesDF("author_alias"), df("author")))
+ .drop("email_alias","author_alias")
+ }
+ .getOrElse(df)
}
def convertDates(columnName: String)(implicit spark: SparkSession): DataFrame = {
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index 2967931..02943b6 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -69,8 +69,8 @@
import spark.sqlContext.implicits._ // toDF
val sc = spark.sparkContext
val projects = sc.parallelize(GerritProjects(Source.fromURL(s"${config.baseUrl}/projects/")))
- val aliasesRDD = config.emailAlias.map(path => sc.textFile(path)).getOrElse(sc.emptyRDD)
- val emailAliasesDF = getEmailAliasDF(aliasesRDD)
+ val emailAliasesDF = getEmailAliasDF(config.emailAlias)
+
projects
.enrichWithSource
.fetchRawContributors
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index c0bbd53..965f036 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -106,7 +106,7 @@
val collected = df.collect
df.schema.fields.map(_.name) should contain allOf(
- "project", "name", "email",
+ "project", "author", "email",
"year", "month", "day", "hour",
"num_files", "added_lines", "deleted_lines",
"num_commits", "last_commit_date")
@@ -122,19 +122,19 @@
import spark.implicits._
val goodAliasDF = sc.parallelize(Seq(
("aliased_author", "aliased_email@mail.com")
- )).toDF("author", "email_alias")
+ )).toDF("author", "email")
val inputSampleDF = sc.parallelize(Seq(
("author_from_name_a", "non_aliased_email@mail.com"),
("author_from_name_b", "aliased_email@mail.com")
- )).toDF("name", "email")
+ )).toDF("author", "email")
val expectedDF = sc.parallelize(Seq(
- ("non_aliased_email@mail.com", "author_from_name_a"),
- ("aliased_email@mail.com", "aliased_author")
- )).toDF("email", "author")
+ ("author_from_name_a", "non_aliased_email@mail.com"),
+ ("aliased_author", "aliased_email@mail.com")
+ )).toDF("author", "email")
- val df = inputSampleDF.handleAuthorEMailAliases(goodAliasDF)
+ val df = inputSampleDF.handleAuthorEMailAliases(Some(goodAliasDF))
df.schema.fields.map(_.name) should contain allOf(
"author", "email")
@@ -142,19 +142,30 @@
df.collect should contain theSameElementsAs expectedDF.collect
}
- "getEmailAliasDF" should "not choke on syntax errors in json file" in {
- val wrongAliasesJSON = """{"unexpected_field": "unexpected_value"}"""
+ it should "return correct columns when alias DF is defined" in {
+ import spark.implicits._
+ val inputSampleDF = sc.parallelize(Seq(
+ ("author_name", "email@mail.com")
+ )).toDF("author", "email")
- val df = getEmailAliasDF(sc.parallelize(Seq(wrongAliasesJSON)))
- df.collect() should be(empty)
+ val aliasDF = sc.parallelize(Seq(
+ ("a_random_author", "a_random_email@mail.com")
+ )).toDF("author", "email")
+
+ val df = inputSampleDF.handleAuthorEMailAliases(Some(aliasDF))
+
+ df.schema.fields.map(_.name) should contain allOf("author", "email")
}
- it should "return a DF with authors and email aliases" in {
- val aliasesJSON = """{"author": "author_from_alias_file", "emails": ["aliased_email@mail.com"]}"""
+ it should "return correct columns when alias DF is not defined" in {
+ import spark.implicits._
+ val inputSampleDF = sc.parallelize(Seq(
+ ("author_name", "email@mail.com")
+ )).toDF("author", "email")
- val df = getEmailAliasDF(sc.parallelize(Seq(aliasesJSON)))
+ val df = inputSampleDF.handleAuthorEMailAliases(None)
- df.schema.fields.map(_.name) should contain allOf("author", "email_alias")
+ df.schema.fields.map(_.name) should contain allOf("author", "email")
}
"addOrganization" should "compute organization column from the email" in {