Remove TLD from organisation extracted from email

Keeping the Top-Level domain in the organisation might lead to
a wrong organisation aggregation.

For example both smith@company.com and smith@company.co.uk should
have 'company' as organisation.

Change-Id: Icb9c0bfe928dfcbf02d79d30fd9e2fbd54975159
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index c9db628..3c37d89 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -20,12 +20,13 @@
 import java.time.format.DateTimeFormatter
 import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
 
-import com.gerritforge.analytics.model.{GerritEndpointConfig, ProjectContributionSource}
+import com.gerritforge.analytics.model.{Email, GerritEndpointConfig, ProjectContributionSource}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.functions.{udf, _}
 import org.apache.spark.sql.{DataFrame, SparkSession}
 
 import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
 
 object GerritAnalyticsTransformations {
 
@@ -126,11 +127,9 @@
       df.withColumn("organization", emailToDomainUdf(col("email")))
   }
 
-  private def emailToDomain(email: String): String = {
-    email split "@" match {
-      case parts if (parts.length == 2) => parts.last.toLowerCase
-      case _ => ""
-    }
+  private def emailToDomain(email: String): String = email match {
+    case Email(_, domain) => domain
+    case _ => ""
   }
 
   private def emailToDomainUdf = udf(emailToDomain(_: String))
diff --git a/src/main/scala/com/gerritforge/analytics/model/Email.scala b/src/main/scala/com/gerritforge/analytics/model/Email.scala
new file mode 100644
index 0000000..e439ae7
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/model/Email.scala
@@ -0,0 +1,30 @@
+// Copyright (C) 2017 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.model
+
+case class Email(user: String, domain: String)
+
+object Email {
+  val emailWithOutExtension = """(.*?)@([^.]+)$""".r
+  val emailWithExtension = """(.*?)@(.*?)(?:\.co)?.[a-z]{2,4}$""".r
+
+  def unapply(emailString: String): Option[(String, String)] = {
+    emailString.toLowerCase match {
+      case emailWithOutExtension(u,d) => Some(u,d)
+      case emailWithExtension(u,d) => Some(u,d)
+      case _ => None
+    }
+  }
+}
\ No newline at end of file
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index f940d75..aa4cc3a 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -206,7 +206,14 @@
       "",
       "@", // corner case
       "not an email",
-      "email@domain"
+      "email@domain-simple",
+      "email@domain-com.com",
+      "email@domain-couk.co.uk",
+      "email@domain-info.info",
+      "email@mail.companyname-couk.co.uk",
+      "email@mail.companyname-com.com",
+      "email@mail.companyname-info.info"
+
     )).toDF("email")
 
     val transformed = df.addOrganization()
@@ -217,7 +224,13 @@
       Row("", ""),
       Row("@", ""),
       Row("not an email", ""),
-      Row("email@domain", "domain")
+      Row("email@domain-simple", "domain-simple"),
+      Row("email@domain-com.com", "domain-com"),
+      Row("email@domain-couk.co.uk", "domain-couk"),
+      Row("email@domain-info.info", "domain-info"),
+      Row("email@mail.companyname-couk.co.uk", "mail.companyname-couk"),
+      Row("email@mail.companyname-com.com", "mail.companyname-com"),
+      Row("email@mail.companyname-info.info", "mail.companyname-info")
     )
   }
 
diff --git a/src/test/scala/com/gerritforge/analytics/model/EmailSpec.scala b/src/test/scala/com/gerritforge/analytics/model/EmailSpec.scala
new file mode 100644
index 0000000..e18e328
--- /dev/null
+++ b/src/test/scala/com/gerritforge/analytics/model/EmailSpec.scala
@@ -0,0 +1,84 @@
+// Copyright (C) 2017 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.model
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.util.Try
+
+class EmailSpec extends FlatSpec with Matchers {
+  "Email" should "be instantiated when domain has no extension" in {
+    "user@domain-simple" match {
+      case Email(user, domain) =>
+        user shouldBe "user"
+        domain shouldBe "domain-simple"
+    }
+  }
+
+  it should "be instantiated with com extension" in {
+    "user@domain-com.com" match {
+      case Email(user, domain) =>
+        user shouldBe "user"
+        domain shouldBe "domain-com"
+    }
+  }
+
+  it should "be instantiated with co.uk extension" in {
+    "user@domain-couk.co.uk" match {
+      case Email(user, domain) =>
+        user shouldBe "user"
+        domain shouldBe "domain-couk"
+    }
+  }
+
+  it should "be instantiated with info extension" in {
+    "user@domain-info.info" match {
+      case Email(user, domain) =>
+        user shouldBe "user"
+        domain shouldBe "domain-info"
+    }
+  }
+
+  it should "be instantiated with com extension and 'dotted' company name" in {
+    "user@my.companyname-com.com" match {
+      case Email(user, domain) =>
+        user shouldBe "user"
+        domain shouldBe "my.companyname-com"
+    }
+  }
+
+  it should "be instantiated with co.uk extension and 'dotted' company name" in {
+    "user@my.companyname-couk.co.uk" match {
+      case Email(user, domain) =>
+        user shouldBe "user"
+        domain shouldBe "my.companyname-couk"
+    }
+  }
+
+  it should "be instantiated with info extension and 'dotted' company name" in {
+    "user@my.companyname-info.info" match {
+      case Email(user, domain) =>
+        user shouldBe "user"
+        domain shouldBe "my.companyname-info"
+    }
+  }
+
+  it should "not match an invalid mail format" in {
+    "invalid email" match {
+      case Email(_, _) => fail("Invalid emails should be rejected")
+      case _ =>
+    }
+  }
+}
\ No newline at end of file