Add organization information
Use organization value specified in CSV file or extract it
from email as previously implemented.
Change-Id: I890fdbad48c9086c03c980d98742a7236776f3c5
diff --git a/README.md b/README.md
index 4504d44..65d1059 100644
--- a/README.md
+++ b/README.md
@@ -25,21 +25,26 @@
the system temporary directory
- -a --email-aliases (*optional*) "emails to author alias" input data path.
- CSVs with 2 columns are expected in input.
+ CSVs with 3 columns are expected in input.
Here an example of the required files structure:
```csv
- author,email
- John Smith,john@email.com
- John Smith,john@anotheremail.com
- David Smith,david.smith@email.com
- David Smith,david@myemail.com
+ author,email,organization
+ John Smith,john@email.com,John's Company
+ John Smith,john@anotheremail.com,John's Company
+ David Smith,david.smith@email.com,Indipendent
+ David Smith,david@myemail.com,Indipendent
```
- You can use the following command to quickly extract the list of authors and emails to create an input CSV file:
+ You can use the following command to quickly extract the list of authors and emails to create part of an input CSV file:
```bash
echo -e "author,email\n$(git log --pretty="%an,%ae%n%cn,%ce"|sort |uniq )" > /tmp/my_aliases.csv
```
+ Once you have it, you just have to add the organization column.
+
+ *NOTE:*
+ * **organization** will be extracted from the committer email if not specified
+ * **author** will be defaulted to the committer name if not specified
## Development environment
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index c6ca959..c9db628 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -55,7 +55,7 @@
}
}
- def getEmailAliasDF(emailAliases: Option[String])(implicit spark: SparkSession): Option[DataFrame] = {
+ def getAliasDF(emailAliases: Option[String])(implicit spark: SparkSession): Option[DataFrame] = {
emailAliases.map { path =>
spark.sqlContext.read
.option("header", "true")
@@ -99,26 +99,31 @@
"json.num_commits as num_commits", "json.last_commit_date as last_commit_date")
}
- def handleAuthorEMailAliases(emailAliasesDF: Option[DataFrame])(implicit spark: SparkSession): DataFrame = {
- emailAliasesDF
+ def handleAliases(aliasesDF: Option[DataFrame])(implicit spark: SparkSession): DataFrame = {
+ aliasesDF
.map{ eaDF =>
val renamedAliasesDF = eaDF
.withColumnRenamed("email", "email_alias")
.withColumnRenamed("author", "author_alias")
+ .withColumnRenamed("organization", "organization_alias")
+
df.join(renamedAliasesDF, df("email") === renamedAliasesDF("email_alias"), "left_outer" )
+ .withColumn("organization",
+ when(renamedAliasesDF("organization_alias").notEqual(""), renamedAliasesDF("organization_alias"))
+ .otherwise(df("organization")) )
.withColumn("author", coalesce(renamedAliasesDF("author_alias"), df("author")))
- .drop("email_alias","author_alias")
+ .drop("email_alias","author_alias", "organization_alias")
}
.getOrElse(df)
}
+
def convertDates(columnName: String)(implicit spark: SparkSession): DataFrame = {
df.withColumn(columnName, longDateToISOUdf(col(columnName)))
}
- def addOrganization()(implicit spark: SparkSession): DataFrame = {
+ def addOrganization()(implicit spark: SparkSession): DataFrame =
df.withColumn("organization", emailToDomainUdf(col("email")))
- }
}
private def emailToDomain(email: String): String = {
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index 02943b6..5a658ef 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -69,16 +69,17 @@
import spark.sqlContext.implicits._ // toDF
val sc = spark.sparkContext
val projects = sc.parallelize(GerritProjects(Source.fromURL(s"${config.baseUrl}/projects/")))
- val emailAliasesDF = getEmailAliasDF(config.emailAlias)
+ val aliasesDF = getAliasDF(config.emailAlias)
projects
.enrichWithSource
.fetchRawContributors
.toDF("project", "json")
.transformCommitterInfo
- .handleAuthorEMailAliases(emailAliasesDF)
- .convertDates("last_commit_date")
.addOrganization()
+ .handleAliases(aliasesDF)
+ .convertDates("last_commit_date")
+
}
def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
import org.elasticsearch.spark.sql._
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 965f036..f940d75 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -118,26 +118,57 @@
)
}
- "handleAuthorEMailAliases" should "enrich the data with author from the alias DF" in {
+ "handleAliases" should "enrich the data with author from the alias DF" in {
import spark.implicits._
- val goodAliasDF = sc.parallelize(Seq(
- ("aliased_author", "aliased_email@mail.com")
- )).toDF("author", "email")
+
+ val aliasDF = sc.parallelize(Seq(
+ ("aliased_author", "aliased_email@aliased_author.com", "")
+ )).toDF("author", "email", "organization")
val inputSampleDF = sc.parallelize(Seq(
- ("author_from_name_a", "non_aliased_email@mail.com"),
- ("author_from_name_b", "aliased_email@mail.com")
- )).toDF("author", "email")
+ ("author_from_name_a", "non_aliased_email@a_mail.com", "a_mail.com"),
+ ("author_from_name_b", "aliased_email@aliased_author.com", "aliased_author.com")
+ )).toDF("author", "email", "organization")
val expectedDF = sc.parallelize(Seq(
- ("author_from_name_a", "non_aliased_email@mail.com"),
- ("aliased_author", "aliased_email@mail.com")
- )).toDF("author", "email")
+ ("author_from_name_a", "non_aliased_email@a_mail.com", "a_mail.com"),
+ ("aliased_author", "aliased_email@aliased_author.com", "aliased_author.com")
+ )).toDF("author", "email", "organization")
- val df = inputSampleDF.handleAuthorEMailAliases(Some(goodAliasDF))
+ val df = inputSampleDF.handleAliases(Some(aliasDF))
df.schema.fields.map(_.name) should contain allOf(
- "author", "email")
+ "author", "email", "organization")
+
+ df.collect should contain theSameElementsAs expectedDF.collect
+ }
+
+ it should "enrich the data with organization from the alias DF when available" in {
+ import spark.implicits._
+
+ val aliasDF = sc.parallelize(Seq(
+ ("aliased_author_with_organization", "aliased_email@aliased_organization.com", "aliased_organization"),
+ ("aliased_author_empty_organization", "aliased_email@emtpy_organization.com", ""),
+ ("aliased_author_null_organization", "aliased_email@null_organization.com", null)
+
+ )).toDF("author", "email", "organization")
+
+ val inputSampleDF = sc.parallelize(Seq(
+ ("author_from_name_a", "aliased_email@aliased_organization.com", "aliased_organization.com"),
+ ("author_from_name_b", "aliased_email@emtpy_organization.com", "emtpy_organization.com"),
+ ("author_from_name_c", "aliased_email@null_organization.com", "null_organization.com")
+ )).toDF("author", "email", "organization")
+
+ val expectedDF = sc.parallelize(Seq(
+ ("aliased_author_with_organization", "aliased_email@aliased_organization.com", "aliased_organization"),
+ ("aliased_author_empty_organization", "aliased_email@emtpy_organization.com", "emtpy_organization.com"),
+ ("aliased_author_null_organization", "aliased_email@null_organization.com", "null_organization.com")
+ )).toDF("author", "email", "organization")
+
+ val df = inputSampleDF.handleAliases(Some(aliasDF))
+
+ df.schema.fields.map(_.name) should contain allOf(
+ "author", "email", "organization")
df.collect should contain theSameElementsAs expectedDF.collect
}
@@ -145,27 +176,27 @@
it should "return correct columns when alias DF is defined" in {
import spark.implicits._
val inputSampleDF = sc.parallelize(Seq(
- ("author_name", "email@mail.com")
- )).toDF("author", "email")
+ ("author_name", "email@mail.com", "an_organization")
+ )).toDF("author", "email", "organization")
val aliasDF = sc.parallelize(Seq(
- ("a_random_author", "a_random_email@mail.com")
- )).toDF("author", "email")
+ ("a_random_author", "a_random_email@mail.com", "a_random_organization")
+ )).toDF("author", "email", "organization")
- val df = inputSampleDF.handleAuthorEMailAliases(Some(aliasDF))
+ val df = inputSampleDF.handleAliases(Some(aliasDF))
- df.schema.fields.map(_.name) should contain allOf("author", "email")
+ df.schema.fields.map(_.name) should contain allOf("author", "email", "organization")
}
it should "return correct columns when alias DF is not defined" in {
import spark.implicits._
val inputSampleDF = sc.parallelize(Seq(
- ("author_name", "email@mail.com")
- )).toDF("author", "email")
+ ("author_name", "email@mail.com", "an_organization")
+ )).toDF("author", "email", "organization")
- val df = inputSampleDF.handleAuthorEMailAliases(None)
+ val df = inputSampleDF.handleAliases(None)
- df.schema.fields.map(_.name) should contain allOf("author", "email")
+ df.schema.fields.map(_.name) should contain allOf("author", "email", "organization")
}
"addOrganization" should "compute organization column from the email" in {
@@ -188,7 +219,6 @@
Row("not an email", ""),
Row("email@domain", "domain")
)
-
}
"convertDates" should "process specific column from Long to ISO date" in {