Extract organization field to ElasticSearch.

Obtained by the @domain part of committer email.

Change-Id: Ib5b3b9996ec78780007cd6f95a47e2dccd570364
diff --git a/kibana/organizations-visualizations.json b/kibana/organizations-visualizations.json
new file mode 100644
index 0000000..c6a3752
--- /dev/null
+++ b/kibana/organizations-visualizations.json
@@ -0,0 +1,30 @@
+[
+  {
+    "_id": "6d3a02f0-8b37-11e7-b340-5faa4252a25c",
+    "_type": "visualization",
+    "_source": {
+      "title": "Organization Pie",
+      "visState": "{\"title\":\"Organization Pie\",\"type\":\"pie\",\"params\":{\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"right\",\"isDonut\":false},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"organization.keyword\",\"size\":500,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
+      "uiStateJSON": "{}",
+      "description": "",
+      "version": 1,
+      "kibanaSavedObjectMeta": {
+        "searchSourceJSON": "{\"index\":\"gerrit\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
+      }
+    }
+  },
+  {
+    "_id": "d0c09180-8b38-11e7-b340-5faa4252a25c",
+    "_type": "visualization",
+    "_source": {
+      "title": "Organization Table",
+      "visState": "{\"title\":\"Organization Table\",\"type\":\"table\",\"params\":{\"perPage\":10,\"showPartialRows\":false,\"showMeticsAtAllLevels\":false,\"sort\":{\"columnIndex\":null,\"direction\":null},\"showTotal\":false,\"totalFunc\":\"sum\"},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"sum\",\"schema\":\"metric\",\"params\":{\"field\":\"num_commits\",\"customLabel\":\"# Commits\"}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"bucket\",\"params\":{\"field\":\"organization.keyword\",\"size\":10,\"order\":\"desc\",\"orderBy\":\"1\"}},{\"id\":\"3\",\"enabled\":true,\"type\":\"cardinality\",\"schema\":\"metric\",\"params\":{\"field\":\"project.keyword\",\"customLabel\":\"# Projects\"}},{\"id\":\"4\",\"enabled\":true,\"type\":\"cardinality\",\"schema\":\"metric\",\"params\":{\"field\":\"email.keyword\",\"customLabel\":\"# Authors\"}}],\"listeners\":{}}",
+      "uiStateJSON": "{\"vis\":{\"params\":{\"sort\":{\"columnIndex\":null,\"direction\":null}}}}",
+      "description": "",
+      "version": 1,
+      "kibanaSavedObjectMeta": {
+        "searchSourceJSON": "{\"index\":\"gerrit\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
+      }
+    }
+  }
+]
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala
index 8756178..01e6dc5 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala
@@ -3,8 +3,8 @@
 import java.io.{BufferedReader, IOException, InputStreamReader}
 import java.net.URL
 import java.nio.charset.StandardCharsets
-import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
 import java.time.format.DateTimeFormatter
+import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
 
 import com.gerritforge.analytics.model.{GerritEndpointConfig, ProjectContribution, ProjectContributionSource}
 import org.apache.spark.rdd.RDD
@@ -23,6 +23,11 @@
       ZoneOffset.UTC, ZoneId.of("Z")
     ) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
 
+  private[analytics] val emailToDomain: PartialFunction[JValue,String] = {
+    case JString(email) if email.contains("@") && !email.endsWith("@") =>
+      email.split("@").last.toLowerCase
+  }
+
   private[analytics] def transformLongDateToISO(in: String): JObject = {
     parse(in).transformField {
       case JField(fieldName, JInt(v)) if (fieldName=="date" || fieldName=="last_commit_date") =>
@@ -30,12 +35,19 @@
     }.asInstanceOf[JObject]
   }
 
+  private[analytics] def transformAddOrganization(in: JObject): JObject = {
+    Some(in \ "email")
+      .collect(emailToDomain)
+      .fold(in)(org => ("organization" -> org) ~ in)
+  }
+
   def getFileContentAsProjectContributions(sourceUrl: String, projectName: String): Iterator[ProjectContribution] = {
     val is = new URL(sourceUrl).openConnection.getInputStream
     new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))
       .lines.iterator().asScala
       .filterNot(_.trim.isEmpty)
       .map(transformLongDateToISO)
+      .map(transformAddOrganization)
       .map(ProjectContribution(projectName, _))
 
   }
@@ -74,6 +86,3 @@
   }
 
 }
-
-
-
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index e26013e..b846df6 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -6,9 +6,9 @@
 import org.json4s.JsonDSL._
 import org.json4s._
 import org.json4s.jackson.JsonMethods._
-import org.scalatest.{FlatSpec, Inside, Matchers}
+import org.scalatest.{FlatSpec, Inside, Matchers, PartialFunctionValues}
 
-class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers with SparkTestSupport with Inside {
+class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers with SparkTestSupport with Inside with PartialFunctionValues {
 
   import com.gerritforge.analytics.engine.GerritAnalyticsTrasformations._
 
@@ -30,21 +30,6 @@
     }
   }
 
-  it should "return one line enriched with its contributor" in {
-    val contributorsJson: JObject = ("foo" -> "bar")
-    val projectSource = ProjectContributionSource("project", newSource(contributorsJson))
-
-    withSparkContext { sc =>
-      val projectContributions = sc.parallelize(Seq(projectSource))
-        .fetchContributors
-        .collect()
-
-      projectContributions should contain(
-        ProjectContribution("project", contributorsJson)
-      )
-    }
-  }
-
   it should "return two lines enriched with its two contributors" in {
     val contributor1: JObject = ("foo" -> "bar")
     val contributor2: JObject = ("first" -> "John")
@@ -66,16 +51,17 @@
     val pc1 = ProjectContribution("project", ("foo" -> "bar"))
     val pc2 = ProjectContribution("project", ("foo2" -> "bar2"))
     withSparkContext { sc =>
-      sc.parallelize(Seq(pc1,pc2)).toJson.collect should contain allOf(
-      """{"project":"project","foo":"bar"}""",
-      """{"project":"project","foo2":"bar2"}""")
+      sc.parallelize(Seq(pc1, pc2)).toJson.collect should contain allOf(
+        """{"project":"project","foo":"bar"}""",
+        """{"project":"project","foo2":"bar2"}""")
     }
   }
+
   "getFileContentAsProjectContributions" should "collect contributors and handle utf-8" in {
     val contributor1: JObject = ("foo" -> "bar")
-    val contributor2: JObject = ("first" -> "A with macron in unicode is: \u0100")
+    val contributor2: JObject = ("first" -> "(A with macron) in unicode is: \u0100")
     val url = newSource(contributor1, contributor2)
-    val projectContributions = getFileContentAsProjectContributions(url,"project").toArray
+    val projectContributions = getFileContentAsProjectContributions(url, "project").toArray
 
     projectContributions should contain allOf(
       ProjectContribution("project", contributor1),
@@ -83,6 +69,16 @@
     )
   }
 
+  "emailToDomain" should "parse domain" in {
+    emailToDomain.valueAt("a@x.y") should be ("x.y")
+  }
+
+  "transformAddOrganization" should "add organization" in {
+    val contributor: JObject = ("name" -> "contributor1") ~ ("email" -> "name1@domain1")
+    val transformed = transformAddOrganization(contributor)
+    transformed \ "organization" should be(JString("domain1"))
+  }
+
   private def newSource(contributorsJson: JObject*): String = {
     val tmpFile = File.createTempFile(System.getProperty("java.io.tmpdir"),
       s"${getClass.getName}-${System.nanoTime()}")