Refactoring using DataFrames.
General refactoring making use of DataFrame instead of complex json4s
computations.
DataFrames allow to specify a better and more readable semantics while
transforming data.
Change-Id: Idee33f08bba110f76e68124e3264f6c5203d8534
diff --git a/build.sbt b/build.sbt
index 14e8af9..cb38e0e 100644
--- a/build.sbt
+++ b/build.sbt
@@ -4,13 +4,15 @@
scalaVersion := "2.11.8"
+val sparkVersion = "2.1.1"
+
libraryDependencies ++= Seq(
- "org.apache.spark" %% "spark-core" % "2.1.1" % "provided"
+ "org.apache.spark" %% "spark-core" % sparkVersion % "provided"
exclude("org.spark-project.spark", "unused"),
+ "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.elasticsearch" %% "elasticsearch-spark-20" % "5.0.2"
excludeAll ExclusionRule(organization = "org.apache.spark"),
-
- // fixed versions for apache spark 2.1.1
+ // json4s still needed by GerritProjects
"org.json4s" %% "json4s-native" % "3.2.11",
"com.github.scopt" %% "scopt" % "3.6.0",
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
new file mode 100644
index 0000000..7736300
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -0,0 +1,127 @@
+// Copyright (C) 2017 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.engine
+
+import java.io.{BufferedReader, IOException, InputStreamReader}
+import java.net.URL
+import java.nio.charset.StandardCharsets
+import java.time.format.DateTimeFormatter
+import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
+
+import com.gerritforge.analytics.model.{GerritEndpointConfig, ProjectContributionSource}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.{udf, _}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+import scala.collection.JavaConverters._
+
+object GerritAnalyticsTransformations {
+
+ implicit class PimpedRDDString(val rdd: RDD[String]) extends AnyVal {
+
+ def enrichWithSource(implicit config: GerritEndpointConfig): RDD[ProjectContributionSource] = {
+ rdd.map { projectName =>
+ ProjectContributionSource(projectName, config.contributorsUrl(projectName))
+ }
+ }
+ }
+
+ def getLinesFromURL(sourceURL: String): Iterator[String] = {
+ val is = new URL(sourceURL).openConnection.getInputStream
+ new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))
+ .lines.iterator().asScala
+ .filterNot(_.trim.isEmpty)
+ }
+
+ def getProjectJsonContributorsArray(project: String, sourceURL: String): Array[(String, String)] = {
+ try {
+ getLinesFromURL(sourceURL)
+ .map(s => (project, s))
+ .toArray
+ } catch {
+ case e: IOException => Array()
+ }
+ }
+
+ case class CommitInfo(sha1: String, date: Long, isMerge: Boolean)
+
+ case class UserActivitySummary(year: Integer,
+ month: Integer,
+ day: Integer,
+ hour: Integer,
+ name: String,
+ email: String,
+ num_commits: Integer,
+ num_files: Integer,
+ added_lines: Integer,
+ deleted_lines: Integer,
+ commits: Array[CommitInfo],
+ last_commit_date: Long)
+
+ import org.apache.spark.sql.Encoders
+
+ val schema = Encoders.product[UserActivitySummary].schema
+
+ implicit class PimpedDataFrame(val df: DataFrame) extends AnyVal {
+ def transformWorkableDF(implicit spark: SparkSession): DataFrame = {
+ import org.apache.spark.sql.functions.from_json
+ import spark.sqlContext.implicits._
+ val decoded = df.withColumn("json", from_json($"json", schema))
+ decoded.selectExpr(
+ "project", "json.name as name", "json.email as email",
+ "json.year as year", "json.month as month", "json.day as day", "json.hour as hour",
+ "json.num_files as num_files", "json.added_lines as added_lines", "json.deleted_lines as deleted_lines",
+ "json.num_commits as num_commits", "json.last_commit_date as last_commit_date")
+ }
+
+ def convertDates(columnName: String)(implicit spark: SparkSession): DataFrame = {
+ df.withColumn(columnName, longDateToISOUdf(col(columnName)))
+ }
+
+ def addOrganization()(implicit spark: SparkSession): DataFrame = {
+ df.withColumn("organization", emailToDomainUdf(col("email")))
+ }
+ }
+
+ private def emailToDomain(email: String): String = {
+ email split "@" match {
+ case parts if (parts.length == 2) => parts.last.toLowerCase
+ case _ => ""
+ }
+ }
+
+ private def emailToDomainUdf = udf(emailToDomain(_: String))
+
+
+ implicit class PimpedRDDProjectContributionSource(val projectsAndUrls: RDD[ProjectContributionSource]) extends AnyVal {
+
+ def fetchRawContributors(implicit spark: SparkSession): RDD[(String, String)] = {
+ projectsAndUrls.flatMap {
+ p => getProjectJsonContributorsArray(p.name, p.contributorsUrl)
+ }
+ }
+ }
+
+ import org.apache.spark.sql.functions.udf
+
+ val longDateToISOUdf = udf(longDateToISO(_: Number))
+
+ def longDateToISO(in: Number): String =
+ ZonedDateTime.ofInstant(
+ LocalDateTime.ofEpochSecond(in.longValue() / 1000L, 0, ZoneOffset.UTC),
+ ZoneOffset.UTC, ZoneId.of("Z")
+ ) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
+
+}
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala
deleted file mode 100644
index 57b2803..0000000
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright (C) 2017 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics.engine
-
-import java.io.{BufferedReader, IOException, InputStreamReader}
-import java.net.URL
-import java.nio.charset.StandardCharsets
-import java.time.format.DateTimeFormatter
-import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
-
-import com.gerritforge.analytics.model.{GerritEndpointConfig, ProjectContribution, ProjectContributionSource}
-import org.apache.spark.rdd.RDD
-import org.json4s.JsonAST.{JField, JInt, JString}
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-
-import scala.collection.JavaConverters._
-
-object GerritAnalyticsTrasformations {
-
- private[analytics] def longDateToISO(in: Number): String =
- ZonedDateTime.ofInstant(
- LocalDateTime.ofEpochSecond(in.longValue() / 1000L, 0, ZoneOffset.UTC),
- ZoneOffset.UTC, ZoneId.of("Z")
- ) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
-
- private[analytics] val emailToDomain: PartialFunction[JValue,String] = {
- case JString(email) if email.contains("@") && !email.endsWith("@") =>
- email.split("@").last.toLowerCase
- }
-
- private[analytics] def transformLongDateToISO(in: String): JObject = {
- parse(in).transformField {
- case JField(fieldName, JInt(v)) if (fieldName=="date" || fieldName=="last_commit_date") =>
- JField(fieldName, JString(longDateToISO(v)))
- }.asInstanceOf[JObject]
- }
-
- private[analytics] def transformAddOrganization(in: JObject): JObject = {
- Some(in \ "email")
- .collect(emailToDomain)
- .fold(in)(org => ("organization" -> org) ~ in)
- }
-
- def getFileContentAsProjectContributions(sourceUrl: String, projectName: String): Iterator[ProjectContribution] = {
- val is = new URL(sourceUrl).openConnection.getInputStream
- new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))
- .lines.iterator().asScala
- .filterNot(_.trim.isEmpty)
- .map(transformLongDateToISO)
- .map(transformAddOrganization)
- .map(ProjectContribution(projectName, _))
-
- }
-
- implicit class PimpedRddOfProjectContributionSource(val rdd: RDD[ProjectContributionSource]) extends AnyVal {
-
- def fetchContributors: RDD[ProjectContribution] = {
- rdd.flatMap {
- case ProjectContributionSource(projectName, sourceUrl) =>
- try {
- getFileContentAsProjectContributions(sourceUrl, projectName)
- } catch {
- case ioex: IOException => None
- }
- }
- }
- }
-
- implicit class PimpedRddOfProjects(val rdd: RDD[String]) extends AnyVal {
-
- def enrichWithSource(config: GerritEndpointConfig) = {
- rdd.map { projectName =>
- ProjectContributionSource(projectName, config.contributorsUrl(projectName))
- }
- }
- }
-
- implicit class PimpedRddOfProjects2Json(val rdd: RDD[ProjectContribution]) extends AnyVal {
- def toJson() = {
- rdd.map(pc =>
- compact(render(
- ("project" -> pc.projectName) ~ pc.authorContribution)
- )
- )
- }
- }
-
-}
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index df5406a..089b21c 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -14,12 +14,11 @@
package com.gerritforge.analytics.job
-import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjects, ProjectContribution}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.{SparkConf, SparkContext}
+import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
+import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjects}
+import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.io.{Codec, Source}
-import org.elasticsearch.spark._
object Main extends App with Job {
@@ -45,14 +44,13 @@
} text "aggregate email/email_hour/email_day/email_month/email_year"
}.parse(args, GerritEndpointConfig()) match {
case Some(config) =>
- val sparkConf = new SparkConf().setAppName("Gerrit Analytics ETL")
- val sc = new SparkContext(sparkConf)
-
- val outRDD = run(config, sc)
- outRDD.saveAsTextFile(config.outputDir)
- saveES(config,outRDD)
-
-
+ implicit val spark = SparkSession.builder()
+ .appName("Gerrit Analytics ETL")
+ .getOrCreate()
+ implicit val implicitConfig = config;
+ val dataFrame = run()
+ dataFrame.write.json(config.outputDir)
+ saveES(dataFrame)
case None => // invalid configuration usage has been displayed
}
}
@@ -60,15 +58,25 @@
trait Job {
implicit val codec = Codec.ISO8859
- import com.gerritforge.analytics.engine.GerritAnalyticsTrasformations._
+ def run()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
+ import spark.sqlContext.implicits._ // toDF
- def run(implicit config: GerritEndpointConfig, sc: SparkContext): RDD[ProjectContribution] = {
- val rdd: RDD[String] = sc.parallelize(GerritProjects(Source.fromURL(s"${config.baseUrl}/projects/")))
+ val sc = spark.sparkContext
+ val projects = sc.parallelize(GerritProjects(Source.fromURL(s"${config.baseUrl}/projects/")))
- rdd.enrichWithSource(config).fetchContributors
+ val processedDF = projects
+ .enrichWithSource
+ .fetchRawContributors
+ .toDF("project", "json")
+ .transformWorkableDF
+ .convertDates("last_commit_date")
+ .addOrganization()
+
+ processedDF
}
- def saveES(implicit config: GerritEndpointConfig, rdd: RDD[ProjectContribution]) = {
- config.elasticIndex.map(rdd.toJson().saveJsonToEs(_))
+ def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
+ import org.elasticsearch.spark.sql._
+ config.elasticIndex.map(df.saveToEs(_))
}
}
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritProjects.scala b/src/main/scala/com/gerritforge/analytics/model/GerritProjects.scala
index 5d677af..0e80468 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritProjects.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritProjects.scala
@@ -19,9 +19,6 @@
import scala.io.Source
-/**
- * Created by lucamilanesio on 22/08/2017.
- */
object GerritProjects {
type GerritProjects = Seq[String]
@@ -38,5 +35,3 @@
case class ProjectContributionSource(name: String, contributorsUrl: String)
-case class ProjectContribution(projectName: String, authorContribution: JObject)
-
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 1bc4d8b..b2c80e1 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -16,81 +16,152 @@
import java.io.{File, FileWriter}
-import com.gerritforge.analytics.model.{GerritEndpointConfig, ProjectContribution, ProjectContributionSource}
+import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjects, ProjectContributionSource}
+import org.apache.spark.sql.Row
import org.json4s.JsonDSL._
import org.json4s._
-import org.json4s.jackson.JsonMethods._
-import org.scalatest.{FlatSpec, Inside, Matchers, PartialFunctionValues}
+import org.scalatest.{FlatSpec, Inside, Matchers}
+import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
+import org.json4s.jackson.JsonMethods.{compact, render}
-class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers with SparkTestSupport with Inside with PartialFunctionValues {
+import scala.io.Source
- import com.gerritforge.analytics.engine.GerritAnalyticsTrasformations._
+class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers
+ with SparkTestSupport with Inside {
- "A project" should "be enriched with its contributors URL endpoint" in {
- withSparkContext { sc =>
- val projectRdd = sc.parallelize(Seq("project"))
- val config = GerritEndpointConfig("http://somewhere.com")
+ "GerritProjects" should "parse project names" in {
- val projectWithSource = projectRdd.enrichWithSource(config).collect
+ val projectNames = GerritProjects(Source.fromString(
+ """)]}'
+ |{
+ | "All-Projects": {
+ | "id": "All-Projects",
+ | },
+ | "Test": {
+ | "id": "Test",
+ | }
+ |}
+ |""".stripMargin))
- projectWithSource should have size 1
+ projectNames should have size 2
+ projectNames should contain allOf("All-Projects", "Test")
+ }
- inside(projectWithSource.head) {
- case ProjectContributionSource(project, url) => {
- project should be("project")
- url should startWith("http://somewhere.com")
- }
+
+ "enrichWithSource" should "enrich project RDD object with its source" in {
+
+ val projectRdd = sc.parallelize(Seq("project"))
+ implicit val config = GerritEndpointConfig("http://somewhere.com")
+
+ val projectWithSource = projectRdd
+ .enrichWithSource
+ .collect
+
+ projectWithSource should have size 1
+ inside(projectWithSource.head) {
+ case ProjectContributionSource(project, url) => {
+ project should be("project")
+ url should startWith("http://somewhere.com")
}
}
}
- it should "return two lines enriched with its two contributors" in {
- val contributor1: JObject = ("foo" -> "bar")
- val contributor2: JObject = ("first" -> "John")
- val projectSource = ProjectContributionSource("project", newSource(contributor1, contributor2))
+ "fetchRawContributors" should "fetch file content from the initial list of project names and file names" in {
- withSparkContext { sc =>
- val projectContributions = sc.parallelize(Seq(projectSource))
- .fetchContributors
- .collect()
+ val line1 = "foo" -> "bar"
+ val line2 = "foo1" -> "bar1"
+ val line3 = "foo2" -> "bar2\u0100" // checks UTF-8 as well
+ val line3b = "foo3" -> "bar3\u0101"
- projectContributions should contain allOf(
- ProjectContribution("project", contributor1),
- ProjectContribution("project", contributor2)
- )
- }
- }
+ val projectSource1 = ProjectContributionSource("p1", newSource(line1, line2, line3))
+ val projectSource2 = ProjectContributionSource("p2", newSource(line3b))
- "A ProjectContribution RDD" should "serialize to Json" in {
- val pc1 = ProjectContribution("project", ("foo" -> "bar"))
- val pc2 = ProjectContribution("project", ("foo2" -> "bar2"))
- withSparkContext { sc =>
- sc.parallelize(Seq(pc1, pc2)).toJson.collect should contain allOf(
- """{"project":"project","foo":"bar"}""",
- """{"project":"project","foo2":"bar2"}""")
- }
- }
+ val rawContributors = sc.parallelize(Seq(projectSource1, projectSource2))
+ .fetchRawContributors
+ .collect
- "getFileContentAsProjectContributions" should "collect contributors and handle utf-8" in {
- val contributor1: JObject = ("foo" -> "bar")
- val contributor2: JObject = ("first" -> "(A with macron) in unicode is: \u0100")
- val url = newSource(contributor1, contributor2)
- val projectContributions = getFileContentAsProjectContributions(url, "project").toArray
-
- projectContributions should contain allOf(
- ProjectContribution("project", contributor1),
- ProjectContribution("project", contributor2)
+ rawContributors should have size (4)
+ rawContributors should contain allOf(
+ ("p1","""{"foo":"bar"}"""),
+ ("p1","""{"foo1":"bar1"}"""),
+ ("p1", "{\"foo2\":\"bar2\u0100\"}"),
+ ("p2", "{\"foo3\":\"bar3\u0101\"}")
)
}
- "emailToDomain" should "parse domain" in {
- emailToDomain.valueAt("a@x.y") should be ("x.y")
+ "transformWorkableDF" should "transform a DataFrame with project and json to a workable DF with separated columns" in {
+
+ import sql.implicits._
+
+ val rdd = sc.parallelize(Seq(
+ ("p1","""{"name":"a","email":"a@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":1, "num_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":0, "commits":[{ "sha1": "e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":false}]}"""),
+ ("p2","""{"name":"b","email":"b@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":428, "num_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":1500000000000,"commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1500000000000,"merge":true}]}"""),
+ // last commit is missing hour,day,month,year to check optionality
+ ("p3","""{"name":"c","email":"c@mail.com","num_commits":12,"num_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":1600000000000,"commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1600000000000,"merge":true}]}""")
+ ))
+
+ val df = rdd.toDF("project", "json")
+ .transformWorkableDF
+
+ df.count should be(3)
+ val collected = df.collect
+
+ df.schema.fields.map(_.name) should contain allOf(
+ "project", "name", "email",
+ "year", "month", "day", "hour",
+ "num_files", "added_lines", "deleted_lines",
+ "num_commits", "last_commit_date")
+
+ collected should contain allOf(
+ Row("p1", "a", "a@mail.com", 2017, 9, 11, 23, 2, 1, 1, 1, 0),
+ Row("p2", "b", "b@mail.com", 2017, 9, 11, 23, 2, 1, 1, 428, 1500000000000L),
+ Row("p3", "c", "c@mail.com", null, null, null, null, 2, 1, 1, 12, 1600000000000L)
+ )
}
- "transformAddOrganization" should "add organization" in {
- val contributor: JObject = ("name" -> "contributor1") ~ ("email" -> "name1@domain1")
- val transformed = transformAddOrganization(contributor)
- transformed \ "organization" should be(JString("domain1"))
+ "addOrganization" should "compute organization column from the email" in {
+ import sql.implicits._
+
+ val df = sc.parallelize(Seq(
+ "",
+ "@", // corner case
+ "not an email",
+ "email@domain"
+ )).toDF("email")
+
+ val transformed = df.addOrganization()
+
+ transformed.schema.fields.map(_.name) should contain allOf("email","organization")
+
+ transformed.collect should contain allOf(
+ Row("", ""),
+ Row("@", ""),
+ Row("not an email", ""),
+ Row("email@domain", "domain")
+ )
+
+ }
+
+ "convertDates" should "process specific column from Long to ISO date" in {
+ // some notable dates converted in UnixMillisecs and ISO format
+ val DATES = Map(
+ 0L -> "1970-01-01T00:00:00Z",
+ 1500000000000L -> "2017-07-14T02:40:00Z",
+ 1600000000000L -> "2020-09-13T12:26:40Z")
+ import sql.implicits._
+ val df = sc.parallelize(Seq(
+ ("a", 0L, 1),
+ ("b", 1500000000000L, 2),
+ ("c", 1600000000000L, 3))).toDF("name", "date", "num")
+
+ val converted = df
+ .convertDates("date")
+
+ converted.collect should contain allOf(
+ Row("a", DATES(0), 1),
+ Row("b", DATES(1500000000000L), 2),
+ Row("c", DATES(1600000000000L), 3)
+ )
}
private def newSource(contributorsJson: JObject*): String = {
diff --git a/src/test/scala/com/gerritforge/analytics/GerritProjectsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritProjectsSpec.scala
deleted file mode 100644
index 3a827a6..0000000
--- a/src/test/scala/com/gerritforge/analytics/GerritProjectsSpec.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-// Copyright (C) 2017 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics
-
-import com.gerritforge.analytics.model.GerritProjects
-import org.scalatest.{FlatSpec, Matchers}
-
-import scala.io.{Codec, Source}
-
-class GerritProjectsSpec extends FlatSpec with Matchers {
-
- "GerritProjects" should "parseProjects" in {
- val projectNames = GerritProjects(Source.fromString(
- """)]}'
- |{
- | "All-Projects": {
- | "id": "All-Projects",
- | "state": "ACTIVE"
- | },
- | "Test": {
- | "id": "Test",
- | "state": "ACTIVE"
- | }
- |}
- |""".stripMargin))
-
- projectNames should have size 2
- projectNames should contain allOf("All-Projects", "Test")
- }
-
-}
diff --git a/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala b/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
index e55cdbc..d800069 100644
--- a/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
+++ b/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
@@ -14,19 +14,14 @@
package com.gerritforge.analytics
-import org.apache.commons.lang.RandomStringUtils
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.log4j.{Level, Logger}
+import org.apache.spark.sql.SparkSession
trait SparkTestSupport {
-
- def withSparkContext(test: SparkContext => Unit): Unit = {
- val sc = new SparkContext("local[4]", RandomStringUtils.randomAlphabetic(10))
- try {
- test(sc)
- } finally {
- sc.stop()
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
- }
- }
+ Logger.getLogger("org").setLevel(Level.ERROR)
+ implicit val spark = SparkSession.builder()
+ .master("local[4]")
+ .getOrCreate()
+ implicit val sc = spark.sparkContext
+ implicit val sql = spark.sqlContext
}
\ No newline at end of file
diff --git a/src/test/scala/com/gerritforge/analytics/engine/DateConverterTest.scala b/src/test/scala/com/gerritforge/analytics/engine/DateConverterTest.scala
deleted file mode 100644
index 8bdf688..0000000
--- a/src/test/scala/com/gerritforge/analytics/engine/DateConverterTest.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-// Copyright (C) 2017 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics.engine
-
-import com.gerritforge.analytics.engine.GerritAnalyticsTrasformations._
-import org.json4s.JsonAST.{JArray, JString}
-import org.json4s.native.JsonMethods.{compact, render}
-import org.scalatest.matchers.{MatchResult, Matcher}
-import org.scalatest.{FlatSpec, Inside, Matchers}
-
-class DateConverterTest extends FlatSpec with Matchers with Inside {
-
- "converter" should "convert 3 canonical numbers in a nested json object" in {
- val DATES = Map(
- 0L -> "1970-01-01T00:00:00Z",
- 1500000000000L -> "2017-07-14T02:40:00Z",
- 1600000000000L -> "2020-09-13T12:26:40Z")
- val json =
- """{
- "date": 0,
- "name": "foo",
- "commits": [
- {"sha1": "xxx", "last_commit_date": 1500000000000},
- {"sha1": "yyy", "last_commit_date": 1600000000000}
- }
- }"""
- val out = transformLongDateToISO(json)
- inside (out \ "date") { case JString(s0) => s0 should equal(DATES(0L)) }
- inside (out \ "commits") { case JArray(List(o1,o2)) =>
- inside (o1 \ "last_commit_date") { case JString(s15) => s15 should equal(DATES(1500000000000L))}
- inside (o2 \ "last_commit_date") { case JString(s16) => s16 should equal(DATES(1600000000000L))}
- }
- }
-}