Extract organization field to ElasticSearch.
Obtained by the @domain part of committer email.
Change-Id: Ib5b3b9996ec78780007cd6f95a47e2dccd570364
diff --git a/kibana/organizations-visualizations.json b/kibana/organizations-visualizations.json
new file mode 100644
index 0000000..c6a3752
--- /dev/null
+++ b/kibana/organizations-visualizations.json
@@ -0,0 +1,30 @@
+[
+ {
+ "_id": "6d3a02f0-8b37-11e7-b340-5faa4252a25c",
+ "_type": "visualization",
+ "_source": {
+ "title": "Organization Pie",
+ "visState": "{\"title\":\"Organization Pie\",\"type\":\"pie\",\"params\":{\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"right\",\"isDonut\":false},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"organization.keyword\",\"size\":500,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
+ "uiStateJSON": "{}",
+ "description": "",
+ "version": 1,
+ "kibanaSavedObjectMeta": {
+ "searchSourceJSON": "{\"index\":\"gerrit\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
+ }
+ }
+ },
+ {
+ "_id": "d0c09180-8b38-11e7-b340-5faa4252a25c",
+ "_type": "visualization",
+ "_source": {
+ "title": "Organization Table",
+ "visState": "{\"title\":\"Organization Table\",\"type\":\"table\",\"params\":{\"perPage\":10,\"showPartialRows\":false,\"showMeticsAtAllLevels\":false,\"sort\":{\"columnIndex\":null,\"direction\":null},\"showTotal\":false,\"totalFunc\":\"sum\"},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"sum\",\"schema\":\"metric\",\"params\":{\"field\":\"num_commits\",\"customLabel\":\"# Commits\"}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"bucket\",\"params\":{\"field\":\"organization.keyword\",\"size\":10,\"order\":\"desc\",\"orderBy\":\"1\"}},{\"id\":\"3\",\"enabled\":true,\"type\":\"cardinality\",\"schema\":\"metric\",\"params\":{\"field\":\"project.keyword\",\"customLabel\":\"# Projects\"}},{\"id\":\"4\",\"enabled\":true,\"type\":\"cardinality\",\"schema\":\"metric\",\"params\":{\"field\":\"email.keyword\",\"customLabel\":\"# Authors\"}}],\"listeners\":{}}",
+ "uiStateJSON": "{\"vis\":{\"params\":{\"sort\":{\"columnIndex\":null,\"direction\":null}}}}",
+ "description": "",
+ "version": 1,
+ "kibanaSavedObjectMeta": {
+ "searchSourceJSON": "{\"index\":\"gerrit\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
+ }
+ }
+ }
+]
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala
index 8756178..01e6dc5 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala
@@ -3,8 +3,8 @@
import java.io.{BufferedReader, IOException, InputStreamReader}
import java.net.URL
import java.nio.charset.StandardCharsets
-import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
import java.time.format.DateTimeFormatter
+import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
import com.gerritforge.analytics.model.{GerritEndpointConfig, ProjectContribution, ProjectContributionSource}
import org.apache.spark.rdd.RDD
@@ -23,6 +23,11 @@
ZoneOffset.UTC, ZoneId.of("Z")
) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
+ private[analytics] val emailToDomain: PartialFunction[JValue,String] = {
+ case JString(email) if email.contains("@") && !email.endsWith("@") =>
+ email.split("@").last.toLowerCase
+ }
+
private[analytics] def transformLongDateToISO(in: String): JObject = {
parse(in).transformField {
case JField(fieldName, JInt(v)) if (fieldName=="date" || fieldName=="last_commit_date") =>
@@ -30,12 +35,19 @@
}.asInstanceOf[JObject]
}
+ private[analytics] def transformAddOrganization(in: JObject): JObject = {
+ Some(in \ "email")
+ .collect(emailToDomain)
+ .fold(in)(org => ("organization" -> org) ~ in)
+ }
+
def getFileContentAsProjectContributions(sourceUrl: String, projectName: String): Iterator[ProjectContribution] = {
val is = new URL(sourceUrl).openConnection.getInputStream
new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))
.lines.iterator().asScala
.filterNot(_.trim.isEmpty)
.map(transformLongDateToISO)
+ .map(transformAddOrganization)
.map(ProjectContribution(projectName, _))
}
@@ -74,6 +86,3 @@
}
}
-
-
-
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index e26013e..b846df6 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -6,9 +6,9 @@
import org.json4s.JsonDSL._
import org.json4s._
import org.json4s.jackson.JsonMethods._
-import org.scalatest.{FlatSpec, Inside, Matchers}
+import org.scalatest.{FlatSpec, Inside, Matchers, PartialFunctionValues}
-class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers with SparkTestSupport with Inside {
+class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers with SparkTestSupport with Inside with PartialFunctionValues {
import com.gerritforge.analytics.engine.GerritAnalyticsTrasformations._
@@ -30,21 +30,6 @@
}
}
- it should "return one line enriched with its contributor" in {
- val contributorsJson: JObject = ("foo" -> "bar")
- val projectSource = ProjectContributionSource("project", newSource(contributorsJson))
-
- withSparkContext { sc =>
- val projectContributions = sc.parallelize(Seq(projectSource))
- .fetchContributors
- .collect()
-
- projectContributions should contain(
- ProjectContribution("project", contributorsJson)
- )
- }
- }
-
it should "return two lines enriched with its two contributors" in {
val contributor1: JObject = ("foo" -> "bar")
val contributor2: JObject = ("first" -> "John")
@@ -66,16 +51,17 @@
val pc1 = ProjectContribution("project", ("foo" -> "bar"))
val pc2 = ProjectContribution("project", ("foo2" -> "bar2"))
withSparkContext { sc =>
- sc.parallelize(Seq(pc1,pc2)).toJson.collect should contain allOf(
- """{"project":"project","foo":"bar"}""",
- """{"project":"project","foo2":"bar2"}""")
+ sc.parallelize(Seq(pc1, pc2)).toJson.collect should contain allOf(
+ """{"project":"project","foo":"bar"}""",
+ """{"project":"project","foo2":"bar2"}""")
}
}
+
"getFileContentAsProjectContributions" should "collect contributors and handle utf-8" in {
val contributor1: JObject = ("foo" -> "bar")
- val contributor2: JObject = ("first" -> "A with macron in unicode is: \u0100")
+ val contributor2: JObject = ("first" -> "(A with macron) in unicode is: \u0100")
val url = newSource(contributor1, contributor2)
- val projectContributions = getFileContentAsProjectContributions(url,"project").toArray
+ val projectContributions = getFileContentAsProjectContributions(url, "project").toArray
projectContributions should contain allOf(
ProjectContribution("project", contributor1),
@@ -83,6 +69,16 @@
)
}
+ "emailToDomain" should "parse domain" in {
+ emailToDomain.valueAt("a@x.y") should be ("x.y")
+ }
+
+ "transformAddOrganization" should "add organization" in {
+ val contributor: JObject = ("name" -> "contributor1") ~ ("email" -> "name1@domain1")
+ val transformed = transformAddOrganization(contributor)
+ transformed \ "organization" should be(JString("domain1"))
+ }
+
private def newSource(contributorsJson: JObject*): String = {
val tmpFile = File.createTempFile(System.getProperty("java.io.tmpdir"),
s"${getClass.getName}-${System.nanoTime()}")