Allow authors to have multiple emails
Authors can have multiple emails, but it is useful to group together
all the commits coming from the same author.
This change allow the mapping of different emails to the same autor.
The "name" field has been dropped in favour of "author".
author = "author" from alias file || "name" from user activity
Change-Id: I7ee6900f40b51ee9f6785676bc0fc169a7e56a29
diff --git a/README.md b/README.md
index 34a959a..cf1c8a7 100644
--- a/README.md
+++ b/README.md
@@ -23,6 +23,11 @@
- -o --out folder location for storing the output as JSON files
if not provided data is saved to </tmp>/analytics-<NNNN> where </tmp> is
the system temporary directory
+- -a --email-aliases (*optional*) "emails to author alias" input data path. Here an example of the required files structure:
+ ```json
+ {"author": "John", "emails": ["john@email.com", "john@anotheremail.com"]}
+ {"author": "David", "emails": ["david.smith@email.com", "david@myemail.com"]}
+ ```
## Development environment
diff --git a/src/main/resources/email-aliasing.input.example b/src/main/resources/email-aliasing.input.example
new file mode 100644
index 0000000..f18e7f2
--- /dev/null
+++ b/src/main/resources/email-aliasing.input.example
@@ -0,0 +1,2 @@
+{"author": "John", "emails": ["john@email.com", "john@anotheremail.com"]}
+{"author": "David", "emails": ["david.smith@email.com", "david@myemail.com"]}
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index 7736300..5167296 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -26,6 +26,7 @@
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.JavaConverters._
+import scala.util.Try
object GerritAnalyticsTransformations {
@@ -55,6 +56,16 @@
}
}
+ def getEmailAliasDF(emailAliasesRDD: RDD[String])(implicit spark: SparkSession): DataFrame = {
+ import spark.sqlContext.implicits._
+ Try {
+ val df = spark.sqlContext.read.json(emailAliasesRDD).toDF()
+ df.withColumn("email_alias", explode(df("emails"))).drop("emails")
+ }.getOrElse(spark.emptyDataset[CommitterInfo].toDF())
+ }
+
+ case class CommitterInfo(author: String, email_alias: String)
+
case class CommitInfo(sha1: String, date: Long, isMerge: Boolean)
case class UserActivitySummary(year: Integer,
@@ -75,15 +86,21 @@
val schema = Encoders.product[UserActivitySummary].schema
implicit class PimpedDataFrame(val df: DataFrame) extends AnyVal {
- def transformWorkableDF(implicit spark: SparkSession): DataFrame = {
+ def transformCommitterInfo()(implicit spark: SparkSession): DataFrame = {
import org.apache.spark.sql.functions.from_json
import spark.sqlContext.implicits._
- val decoded = df.withColumn("json", from_json($"json", schema))
- decoded.selectExpr(
- "project", "json.name as name", "json.email as email",
- "json.year as year", "json.month as month", "json.day as day", "json.hour as hour",
- "json.num_files as num_files", "json.added_lines as added_lines", "json.deleted_lines as deleted_lines",
- "json.num_commits as num_commits", "json.last_commit_date as last_commit_date")
+ df.withColumn("json", from_json($"json", schema))
+ .selectExpr(
+ "project", "json.name as name", "json.email as email",
+ "json.year as year", "json.month as month", "json.day as day", "json.hour as hour",
+ "json.num_files as num_files", "json.added_lines as added_lines", "json.deleted_lines as deleted_lines",
+ "json.num_commits as num_commits", "json.last_commit_date as last_commit_date")
+ }
+
+ def handleAuthorEMailAliases(emailAliasesDF: DataFrame)(implicit spark: SparkSession): DataFrame = {
+ import spark.sqlContext.implicits._
+ df.join(emailAliasesDF, df("email") === emailAliasesDF("email_alias"), "left_outer" )
+ .withColumn("author", coalesce($"author", $"name")).drop("name", "email_alias")
}
def convertDates(columnName: String)(implicit spark: SparkSession): DataFrame = {
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index 089b21c..2967931 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -42,6 +42,13 @@
opt[String]('g', "aggregate") optional() action { (x, c) =>
c.copy(aggregate = Some(x))
} text "aggregate email/email_hour/email_day/email_month/email_year"
+ opt[String]('a', "email-aliases") optional() action { (path, c) =>
+ if (!new java.io.File(path).exists) {
+ println(s"ERROR: Path '${path}' doesn't exists!")
+ System.exit(1)
+ }
+ c.copy(emailAlias = Some(path))
+ } text "\"emails to author alias\" input data path"
}.parse(args, GerritEndpointConfig()) match {
case Some(config) =>
implicit val spark = SparkSession.builder()
@@ -60,19 +67,18 @@
def run()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
import spark.sqlContext.implicits._ // toDF
-
val sc = spark.sparkContext
val projects = sc.parallelize(GerritProjects(Source.fromURL(s"${config.baseUrl}/projects/")))
-
- val processedDF = projects
+ val aliasesRDD = config.emailAlias.map(path => sc.textFile(path)).getOrElse(sc.emptyRDD)
+ val emailAliasesDF = getEmailAliasDF(aliasesRDD)
+ projects
.enrichWithSource
.fetchRawContributors
.toDF("project", "json")
- .transformWorkableDF
+ .transformCommitterInfo
+ .handleAuthorEMailAliases(emailAliasesDF)
.convertDates("last_commit_date")
.addOrganization()
-
- processedDF
}
def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
import org.elasticsearch.spark.sql._
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
index 206c514..a32abd0 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
@@ -19,7 +19,8 @@
elasticIndex: Option[String] = None,
since: Option[String] = None,
until: Option[String] = None,
- aggregate: Option[String] = None) {
+ aggregate: Option[String] = None,
+ emailAlias: Option[String] = None) {
def queryOpt(opt: (String, Option[String])): Option[String] = {
opt match {
@@ -32,4 +33,4 @@
def contributorsUrl(projectName: String) =
s"${baseUrl}/projects/$projectName/analytics~contributors${queryString}"
-}
+}
\ No newline at end of file
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index b2c80e1..c0bbd53 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -16,13 +16,13 @@
import java.io.{File, FileWriter}
+import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjects, ProjectContributionSource}
import org.apache.spark.sql.Row
import org.json4s.JsonDSL._
import org.json4s._
-import org.scalatest.{FlatSpec, Inside, Matchers}
-import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
import org.json4s.jackson.JsonMethods.{compact, render}
+import org.scalatest.{FlatSpec, Inside, Matchers}
import scala.io.Source
@@ -89,8 +89,7 @@
)
}
- "transformWorkableDF" should "transform a DataFrame with project and json to a workable DF with separated columns" in {
-
+ "transformCommitterInfo" should "transform a DataFrame with project and json to a workable DF with separated columns" in {
import sql.implicits._
val rdd = sc.parallelize(Seq(
@@ -101,7 +100,7 @@
))
val df = rdd.toDF("project", "json")
- .transformWorkableDF
+ .transformCommitterInfo
df.count should be(3)
val collected = df.collect
@@ -119,6 +118,45 @@
)
}
+ "handleAuthorEMailAliases" should "enrich the data with author from the alias DF" in {
+ import spark.implicits._
+ val goodAliasDF = sc.parallelize(Seq(
+ ("aliased_author", "aliased_email@mail.com")
+ )).toDF("author", "email_alias")
+
+ val inputSampleDF = sc.parallelize(Seq(
+ ("author_from_name_a", "non_aliased_email@mail.com"),
+ ("author_from_name_b", "aliased_email@mail.com")
+ )).toDF("name", "email")
+
+ val expectedDF = sc.parallelize(Seq(
+ ("non_aliased_email@mail.com", "author_from_name_a"),
+ ("aliased_email@mail.com", "aliased_author")
+ )).toDF("email", "author")
+
+ val df = inputSampleDF.handleAuthorEMailAliases(goodAliasDF)
+
+ df.schema.fields.map(_.name) should contain allOf(
+ "author", "email")
+
+ df.collect should contain theSameElementsAs expectedDF.collect
+ }
+
+ "getEmailAliasDF" should "not choke on syntax errors in json file" in {
+ val wrongAliasesJSON = """{"unexpected_field": "unexpected_value"}"""
+
+ val df = getEmailAliasDF(sc.parallelize(Seq(wrongAliasesJSON)))
+ df.collect() should be(empty)
+ }
+
+ it should "return a DF with authors and email aliases" in {
+ val aliasesJSON = """{"author": "author_from_alias_file", "emails": ["aliased_email@mail.com"]}"""
+
+ val df = getEmailAliasDF(sc.parallelize(Seq(aliasesJSON)))
+
+ df.schema.fields.map(_.name) should contain allOf("author", "email_alias")
+ }
+
"addOrganization" should "compute organization column from the email" in {
import sql.implicits._
@@ -131,7 +169,7 @@
val transformed = df.addOrganization()
- transformed.schema.fields.map(_.name) should contain allOf("email","organization")
+ transformed.schema.fields.map(_.name) should contain allOf("email", "organization")
transformed.collect should contain allOf(
Row("", ""),