Refactoring using DataFrames.

General refactoring making use of DataFrame instead of complex json4s
computations.
DataFrames allow to specify a better and more readable semantics while
transforming data.

Change-Id: Idee33f08bba110f76e68124e3264f6c5203d8534
diff --git a/build.sbt b/build.sbt
index 14e8af9..cb38e0e 100644
--- a/build.sbt
+++ b/build.sbt
@@ -4,13 +4,15 @@
 
 scalaVersion := "2.11.8"
 
+val sparkVersion = "2.1.1"
+
 libraryDependencies ++= Seq(
-  "org.apache.spark" %% "spark-core" % "2.1.1" % "provided"
+  "org.apache.spark" %% "spark-core" % sparkVersion % "provided"
     exclude("org.spark-project.spark", "unused"),
+  "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
   "org.elasticsearch" %% "elasticsearch-spark-20" % "5.0.2"
     excludeAll ExclusionRule(organization = "org.apache.spark"),
-
-  // fixed versions for apache spark 2.1.1
+  // json4s still needed by GerritProjects
   "org.json4s" %% "json4s-native" % "3.2.11",
 
   "com.github.scopt" %% "scopt" % "3.6.0",
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
new file mode 100644
index 0000000..7736300
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -0,0 +1,127 @@
+// Copyright (C) 2017 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.engine
+
+import java.io.{BufferedReader, IOException, InputStreamReader}
+import java.net.URL
+import java.nio.charset.StandardCharsets
+import java.time.format.DateTimeFormatter
+import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
+
+import com.gerritforge.analytics.model.{GerritEndpointConfig, ProjectContributionSource}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{udf, _}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+import scala.collection.JavaConverters._
+
+object GerritAnalyticsTransformations {
+
+  implicit class PimpedRDDString(val rdd: RDD[String]) extends AnyVal {
+
+    def enrichWithSource(implicit config: GerritEndpointConfig): RDD[ProjectContributionSource] = {
+      rdd.map { projectName =>
+        ProjectContributionSource(projectName, config.contributorsUrl(projectName))
+      }
+    }
+  }
+
+  def getLinesFromURL(sourceURL: String): Iterator[String] = {
+    val is = new URL(sourceURL).openConnection.getInputStream
+    new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))
+      .lines.iterator().asScala
+      .filterNot(_.trim.isEmpty)
+  }
+
+  def getProjectJsonContributorsArray(project: String, sourceURL: String): Array[(String, String)] = {
+    try {
+      getLinesFromURL(sourceURL)
+        .map(s => (project, s))
+        .toArray
+    } catch {
+      case e: IOException => Array()
+    }
+  }
+
+  case class CommitInfo(sha1: String, date: Long, isMerge: Boolean)
+
+  case class UserActivitySummary(year: Integer,
+                                 month: Integer,
+                                 day: Integer,
+                                 hour: Integer,
+                                 name: String,
+                                 email: String,
+                                 num_commits: Integer,
+                                 num_files: Integer,
+                                 added_lines: Integer,
+                                 deleted_lines: Integer,
+                                 commits: Array[CommitInfo],
+                                 last_commit_date: Long)
+
+  import org.apache.spark.sql.Encoders
+
+  val schema = Encoders.product[UserActivitySummary].schema
+
+  implicit class PimpedDataFrame(val df: DataFrame) extends AnyVal {
+    def transformWorkableDF(implicit spark: SparkSession): DataFrame = {
+      import org.apache.spark.sql.functions.from_json
+      import spark.sqlContext.implicits._
+      val decoded = df.withColumn("json", from_json($"json", schema))
+      decoded.selectExpr(
+        "project", "json.name as name", "json.email as email",
+        "json.year as year", "json.month as month", "json.day as day", "json.hour as hour",
+        "json.num_files as num_files", "json.added_lines as added_lines", "json.deleted_lines as deleted_lines",
+        "json.num_commits as num_commits", "json.last_commit_date as last_commit_date")
+    }
+
+    def convertDates(columnName: String)(implicit spark: SparkSession): DataFrame = {
+      df.withColumn(columnName, longDateToISOUdf(col(columnName)))
+    }
+
+    def addOrganization()(implicit spark: SparkSession): DataFrame = {
+      df.withColumn("organization", emailToDomainUdf(col("email")))
+    }
+  }
+
+  private def emailToDomain(email: String): String = {
+    email split "@" match {
+      case parts if (parts.length == 2) => parts.last.toLowerCase
+      case _ => ""
+    }
+  }
+
+  private def emailToDomainUdf = udf(emailToDomain(_: String))
+
+
+  implicit class PimpedRDDProjectContributionSource(val projectsAndUrls: RDD[ProjectContributionSource]) extends AnyVal {
+
+    def fetchRawContributors(implicit spark: SparkSession): RDD[(String, String)] = {
+      projectsAndUrls.flatMap {
+        p => getProjectJsonContributorsArray(p.name, p.contributorsUrl)
+      }
+    }
+  }
+
+  import org.apache.spark.sql.functions.udf
+
+  val longDateToISOUdf = udf(longDateToISO(_: Number))
+
+  def longDateToISO(in: Number): String =
+    ZonedDateTime.ofInstant(
+      LocalDateTime.ofEpochSecond(in.longValue() / 1000L, 0, ZoneOffset.UTC),
+      ZoneOffset.UTC, ZoneId.of("Z")
+    ) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
+
+}
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala
deleted file mode 100644
index 57b2803..0000000
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright (C) 2017 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics.engine
-
-import java.io.{BufferedReader, IOException, InputStreamReader}
-import java.net.URL
-import java.nio.charset.StandardCharsets
-import java.time.format.DateTimeFormatter
-import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
-
-import com.gerritforge.analytics.model.{GerritEndpointConfig, ProjectContribution, ProjectContributionSource}
-import org.apache.spark.rdd.RDD
-import org.json4s.JsonAST.{JField, JInt, JString}
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-
-import scala.collection.JavaConverters._
-
-object GerritAnalyticsTrasformations {
-
-  private[analytics] def longDateToISO(in: Number): String =
-    ZonedDateTime.ofInstant(
-      LocalDateTime.ofEpochSecond(in.longValue() / 1000L, 0, ZoneOffset.UTC),
-      ZoneOffset.UTC, ZoneId.of("Z")
-    ) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
-
-  private[analytics] val emailToDomain: PartialFunction[JValue,String] = {
-    case JString(email) if email.contains("@") && !email.endsWith("@") =>
-      email.split("@").last.toLowerCase
-  }
-
-  private[analytics] def transformLongDateToISO(in: String): JObject = {
-    parse(in).transformField {
-      case JField(fieldName, JInt(v)) if (fieldName=="date" || fieldName=="last_commit_date") =>
-        JField(fieldName, JString(longDateToISO(v)))
-    }.asInstanceOf[JObject]
-  }
-
-  private[analytics] def transformAddOrganization(in: JObject): JObject = {
-    Some(in \ "email")
-      .collect(emailToDomain)
-      .fold(in)(org => ("organization" -> org) ~ in)
-  }
-
-  def getFileContentAsProjectContributions(sourceUrl: String, projectName: String): Iterator[ProjectContribution] = {
-    val is = new URL(sourceUrl).openConnection.getInputStream
-    new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))
-      .lines.iterator().asScala
-      .filterNot(_.trim.isEmpty)
-      .map(transformLongDateToISO)
-      .map(transformAddOrganization)
-      .map(ProjectContribution(projectName, _))
-
-  }
-
-  implicit class PimpedRddOfProjectContributionSource(val rdd: RDD[ProjectContributionSource]) extends AnyVal {
-
-    def fetchContributors: RDD[ProjectContribution] = {
-      rdd.flatMap {
-        case ProjectContributionSource(projectName, sourceUrl) =>
-          try {
-            getFileContentAsProjectContributions(sourceUrl, projectName)
-          } catch {
-            case ioex: IOException => None
-          }
-      }
-    }
-  }
-
-  implicit class PimpedRddOfProjects(val rdd: RDD[String]) extends AnyVal {
-
-    def enrichWithSource(config: GerritEndpointConfig) = {
-      rdd.map { projectName =>
-        ProjectContributionSource(projectName, config.contributorsUrl(projectName))
-      }
-    }
-  }
-
-  implicit class PimpedRddOfProjects2Json(val rdd: RDD[ProjectContribution]) extends AnyVal {
-    def toJson() = {
-      rdd.map(pc =>
-        compact(render(
-          ("project" -> pc.projectName) ~ pc.authorContribution)
-        )
-      )
-    }
-  }
-
-}
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index df5406a..089b21c 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -14,12 +14,11 @@
 
 package com.gerritforge.analytics.job
 
-import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjects, ProjectContribution}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.{SparkConf, SparkContext}
+import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
+import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjects}
+import org.apache.spark.sql.{DataFrame, SparkSession}
 
 import scala.io.{Codec, Source}
-import org.elasticsearch.spark._
 
 object Main extends App with Job {
 
@@ -45,14 +44,13 @@
     } text "aggregate email/email_hour/email_day/email_month/email_year"
   }.parse(args, GerritEndpointConfig()) match {
     case Some(config) =>
-      val sparkConf = new SparkConf().setAppName("Gerrit Analytics ETL")
-      val sc = new SparkContext(sparkConf)
-
-      val outRDD = run(config, sc)
-      outRDD.saveAsTextFile(config.outputDir)
-      saveES(config,outRDD)
-
-
+      implicit val spark = SparkSession.builder()
+        .appName("Gerrit Analytics ETL")
+        .getOrCreate()
+      implicit val implicitConfig = config;
+      val dataFrame = run()
+      dataFrame.write.json(config.outputDir)
+      saveES(dataFrame)
     case None => // invalid configuration usage has been displayed
   }
 }
@@ -60,15 +58,25 @@
 trait Job {
   implicit val codec = Codec.ISO8859
 
-  import com.gerritforge.analytics.engine.GerritAnalyticsTrasformations._
+  def run()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
+    import spark.sqlContext.implicits._ // toDF
 
-  def run(implicit config: GerritEndpointConfig, sc: SparkContext): RDD[ProjectContribution] = {
-    val rdd: RDD[String] = sc.parallelize(GerritProjects(Source.fromURL(s"${config.baseUrl}/projects/")))
+    val sc = spark.sparkContext
+    val projects = sc.parallelize(GerritProjects(Source.fromURL(s"${config.baseUrl}/projects/")))
 
-    rdd.enrichWithSource(config).fetchContributors
+    val processedDF = projects
+      .enrichWithSource
+      .fetchRawContributors
+      .toDF("project", "json")
+      .transformWorkableDF
+      .convertDates("last_commit_date")
+      .addOrganization()
+
+    processedDF
   }
-  def saveES(implicit config: GerritEndpointConfig, rdd: RDD[ProjectContribution]) = {
-      config.elasticIndex.map(rdd.toJson().saveJsonToEs(_))
+  def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
+    import org.elasticsearch.spark.sql._
+    config.elasticIndex.map(df.saveToEs(_))
   }
 }
 
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritProjects.scala b/src/main/scala/com/gerritforge/analytics/model/GerritProjects.scala
index 5d677af..0e80468 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritProjects.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritProjects.scala
@@ -19,9 +19,6 @@
 
 import scala.io.Source
 
-/**
-  * Created by lucamilanesio on 22/08/2017.
-  */
 object GerritProjects {
 
   type GerritProjects = Seq[String]
@@ -38,5 +35,3 @@
 
 case class ProjectContributionSource(name: String, contributorsUrl: String)
 
-case class ProjectContribution(projectName: String, authorContribution: JObject)
-
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 1bc4d8b..b2c80e1 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -16,81 +16,152 @@
 
 import java.io.{File, FileWriter}
 
-import com.gerritforge.analytics.model.{GerritEndpointConfig, ProjectContribution, ProjectContributionSource}
+import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjects, ProjectContributionSource}
+import org.apache.spark.sql.Row
 import org.json4s.JsonDSL._
 import org.json4s._
-import org.json4s.jackson.JsonMethods._
-import org.scalatest.{FlatSpec, Inside, Matchers, PartialFunctionValues}
+import org.scalatest.{FlatSpec, Inside, Matchers}
+import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
+import org.json4s.jackson.JsonMethods.{compact, render}
 
-class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers with SparkTestSupport with Inside with PartialFunctionValues {
+import scala.io.Source
 
-  import com.gerritforge.analytics.engine.GerritAnalyticsTrasformations._
+class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers
+  with SparkTestSupport with Inside {
 
-  "A project" should "be enriched with its contributors URL endpoint" in {
-    withSparkContext { sc =>
-      val projectRdd = sc.parallelize(Seq("project"))
-      val config = GerritEndpointConfig("http://somewhere.com")
+  "GerritProjects" should "parse project names" in {
 
-      val projectWithSource = projectRdd.enrichWithSource(config).collect
+    val projectNames = GerritProjects(Source.fromString(
+      """)]}'
+        |{
+        | "All-Projects": {
+        |   "id": "All-Projects",
+        | },
+        | "Test": {
+        |   "id": "Test",
+        | }
+        |}
+        |""".stripMargin))
 
-      projectWithSource should have size 1
+    projectNames should have size 2
+    projectNames should contain allOf("All-Projects", "Test")
+  }
 
-      inside(projectWithSource.head) {
-        case ProjectContributionSource(project, url) => {
-          project should be("project")
-          url should startWith("http://somewhere.com")
-        }
+
+  "enrichWithSource" should "enrich project RDD object with its source" in {
+
+    val projectRdd = sc.parallelize(Seq("project"))
+    implicit val config = GerritEndpointConfig("http://somewhere.com")
+
+    val projectWithSource = projectRdd
+      .enrichWithSource
+      .collect
+
+    projectWithSource should have size 1
+    inside(projectWithSource.head) {
+      case ProjectContributionSource(project, url) => {
+        project should be("project")
+        url should startWith("http://somewhere.com")
       }
     }
   }
 
-  it should "return two lines enriched with its two contributors" in {
-    val contributor1: JObject = ("foo" -> "bar")
-    val contributor2: JObject = ("first" -> "John")
-    val projectSource = ProjectContributionSource("project", newSource(contributor1, contributor2))
+  "fetchRawContributors" should "fetch file content from the initial list of project names and file names" in {
 
-    withSparkContext { sc =>
-      val projectContributions = sc.parallelize(Seq(projectSource))
-        .fetchContributors
-        .collect()
+    val line1 = "foo" -> "bar"
+    val line2 = "foo1" -> "bar1"
+    val line3 = "foo2" -> "bar2\u0100" // checks UTF-8 as well
+    val line3b = "foo3" -> "bar3\u0101"
 
-      projectContributions should contain allOf(
-        ProjectContribution("project", contributor1),
-        ProjectContribution("project", contributor2)
-      )
-    }
-  }
+    val projectSource1 = ProjectContributionSource("p1", newSource(line1, line2, line3))
+    val projectSource2 = ProjectContributionSource("p2", newSource(line3b))
 
-  "A ProjectContribution RDD" should "serialize to Json" in {
-    val pc1 = ProjectContribution("project", ("foo" -> "bar"))
-    val pc2 = ProjectContribution("project", ("foo2" -> "bar2"))
-    withSparkContext { sc =>
-      sc.parallelize(Seq(pc1, pc2)).toJson.collect should contain allOf(
-        """{"project":"project","foo":"bar"}""",
-        """{"project":"project","foo2":"bar2"}""")
-    }
-  }
+    val rawContributors = sc.parallelize(Seq(projectSource1, projectSource2))
+      .fetchRawContributors
+      .collect
 
-  "getFileContentAsProjectContributions" should "collect contributors and handle utf-8" in {
-    val contributor1: JObject = ("foo" -> "bar")
-    val contributor2: JObject = ("first" -> "(A with macron) in unicode is: \u0100")
-    val url = newSource(contributor1, contributor2)
-    val projectContributions = getFileContentAsProjectContributions(url, "project").toArray
-
-    projectContributions should contain allOf(
-      ProjectContribution("project", contributor1),
-      ProjectContribution("project", contributor2)
+    rawContributors should have size (4)
+    rawContributors should contain allOf(
+      ("p1","""{"foo":"bar"}"""),
+      ("p1","""{"foo1":"bar1"}"""),
+      ("p1", "{\"foo2\":\"bar2\u0100\"}"),
+      ("p2", "{\"foo3\":\"bar3\u0101\"}")
     )
   }
 
-  "emailToDomain" should "parse domain" in {
-    emailToDomain.valueAt("a@x.y") should be ("x.y")
+  "transformWorkableDF" should "transform a DataFrame with project and json to a workable DF with separated columns" in {
+
+    import sql.implicits._
+
+    val rdd = sc.parallelize(Seq(
+      ("p1","""{"name":"a","email":"a@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":1, "num_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":0, "commits":[{ "sha1": "e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":false}]}"""),
+      ("p2","""{"name":"b","email":"b@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":428, "num_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":1500000000000,"commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1500000000000,"merge":true}]}"""),
+      // last commit is missing hour,day,month,year to check optionality
+      ("p3","""{"name":"c","email":"c@mail.com","num_commits":12,"num_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":1600000000000,"commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1600000000000,"merge":true}]}""")
+    ))
+
+    val df = rdd.toDF("project", "json")
+      .transformWorkableDF
+
+    df.count should be(3)
+    val collected = df.collect
+
+    df.schema.fields.map(_.name) should contain allOf(
+      "project", "name", "email",
+      "year", "month", "day", "hour",
+      "num_files", "added_lines", "deleted_lines",
+      "num_commits", "last_commit_date")
+
+    collected should contain allOf(
+      Row("p1", "a", "a@mail.com", 2017, 9, 11, 23, 2, 1, 1, 1, 0),
+      Row("p2", "b", "b@mail.com", 2017, 9, 11, 23, 2, 1, 1, 428, 1500000000000L),
+      Row("p3", "c", "c@mail.com", null, null, null, null, 2, 1, 1, 12, 1600000000000L)
+    )
   }
 
-  "transformAddOrganization" should "add organization" in {
-    val contributor: JObject = ("name" -> "contributor1") ~ ("email" -> "name1@domain1")
-    val transformed = transformAddOrganization(contributor)
-    transformed \ "organization" should be(JString("domain1"))
+  "addOrganization" should "compute organization column from the email" in {
+    import sql.implicits._
+
+    val df = sc.parallelize(Seq(
+      "",
+      "@", // corner case
+      "not an email",
+      "email@domain"
+    )).toDF("email")
+
+    val transformed = df.addOrganization()
+
+    transformed.schema.fields.map(_.name) should contain allOf("email","organization")
+
+    transformed.collect should contain allOf(
+      Row("", ""),
+      Row("@", ""),
+      Row("not an email", ""),
+      Row("email@domain", "domain")
+    )
+
+  }
+
+  "convertDates" should "process specific column from Long to ISO date" in {
+    // some notable dates converted in UnixMillisecs and ISO format
+    val DATES = Map(
+      0L -> "1970-01-01T00:00:00Z",
+      1500000000000L -> "2017-07-14T02:40:00Z",
+      1600000000000L -> "2020-09-13T12:26:40Z")
+    import sql.implicits._
+    val df = sc.parallelize(Seq(
+      ("a", 0L, 1),
+      ("b", 1500000000000L, 2),
+      ("c", 1600000000000L, 3))).toDF("name", "date", "num")
+
+    val converted = df
+      .convertDates("date")
+
+    converted.collect should contain allOf(
+      Row("a", DATES(0), 1),
+      Row("b", DATES(1500000000000L), 2),
+      Row("c", DATES(1600000000000L), 3)
+    )
   }
 
   private def newSource(contributorsJson: JObject*): String = {
diff --git a/src/test/scala/com/gerritforge/analytics/GerritProjectsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritProjectsSpec.scala
deleted file mode 100644
index 3a827a6..0000000
--- a/src/test/scala/com/gerritforge/analytics/GerritProjectsSpec.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-// Copyright (C) 2017 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics
-
-import com.gerritforge.analytics.model.GerritProjects
-import org.scalatest.{FlatSpec, Matchers}
-
-import scala.io.{Codec, Source}
-
-class GerritProjectsSpec extends FlatSpec with Matchers {
-
-  "GerritProjects" should "parseProjects" in {
-    val projectNames = GerritProjects(Source.fromString(
-      """)]}'
-        |{
-        | "All-Projects": {
-        |   "id": "All-Projects",
-        |   "state": "ACTIVE"
-        | },
-        | "Test": {
-        |   "id": "Test",
-        |   "state": "ACTIVE"
-        | }
-        |}
-        |""".stripMargin))
-
-    projectNames should have size 2
-    projectNames should contain allOf("All-Projects", "Test")
-  }
-
-}
diff --git a/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala b/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
index e55cdbc..d800069 100644
--- a/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
+++ b/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
@@ -14,19 +14,14 @@
 
 package com.gerritforge.analytics
 
-import org.apache.commons.lang.RandomStringUtils
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.log4j.{Level, Logger}
+import org.apache.spark.sql.SparkSession
 
 trait SparkTestSupport {
-
-  def withSparkContext(test: SparkContext => Unit): Unit = {
-    val sc = new SparkContext("local[4]", RandomStringUtils.randomAlphabetic(10))
-    try {
-      test(sc)
-    } finally {
-      sc.stop()
-      // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
-      System.clearProperty("spark.master.port")
-    }
-  }
+  Logger.getLogger("org").setLevel(Level.ERROR)
+  implicit val spark = SparkSession.builder()
+    .master("local[4]")
+    .getOrCreate()
+  implicit val sc = spark.sparkContext
+  implicit val sql = spark.sqlContext
 }
\ No newline at end of file
diff --git a/src/test/scala/com/gerritforge/analytics/engine/DateConverterTest.scala b/src/test/scala/com/gerritforge/analytics/engine/DateConverterTest.scala
deleted file mode 100644
index 8bdf688..0000000
--- a/src/test/scala/com/gerritforge/analytics/engine/DateConverterTest.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-// Copyright (C) 2017 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics.engine
-
-import com.gerritforge.analytics.engine.GerritAnalyticsTrasformations._
-import org.json4s.JsonAST.{JArray, JString}
-import org.json4s.native.JsonMethods.{compact, render}
-import org.scalatest.matchers.{MatchResult, Matcher}
-import org.scalatest.{FlatSpec, Inside, Matchers}
-
-class DateConverterTest extends FlatSpec with Matchers with Inside {
-
-  "converter" should "convert 3 canonical numbers in a nested json object" in {
-    val DATES = Map(
-      0L -> "1970-01-01T00:00:00Z",
-      1500000000000L -> "2017-07-14T02:40:00Z",
-      1600000000000L -> "2020-09-13T12:26:40Z")
-    val json =
-      """{
-         "date": 0,
-         "name": "foo",
-         "commits": [
-            {"sha1": "xxx", "last_commit_date": 1500000000000},
-            {"sha1": "yyy", "last_commit_date": 1600000000000}
-          }
-      }"""
-    val out = transformLongDateToISO(json)
-    inside (out \ "date") { case JString(s0) => s0 should equal(DATES(0L)) }
-    inside (out \ "commits") { case JArray(List(o1,o2)) =>
-      inside (o1 \ "last_commit_date") { case JString(s15) => s15 should equal(DATES(1500000000000L))}
-      inside (o2 \ "last_commit_date") { case JString(s16) => s16 should equal(DATES(1600000000000L))}
-    }
-  }
-}