Fix inversion of name and ID when parsing project list API response
Project name and code were swapped in the code reading it.
Done some code refactoring too
Change-Id: Ic212a890ca577bdf9969713fc0f775297af580a5
Jira-Id: GERICS-633
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index 7bcecb9..846fb36 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -23,17 +23,17 @@
import com.gerritforge.analytics.model._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.{udf, _}
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import scala.collection.JavaConverters._
object GerritAnalyticsTransformations {
- implicit class PimpedRDDString(val rdd: RDD[GerritProject]) extends AnyVal {
+ implicit class PimpedGerritProjectRDD(val rdd: RDD[GerritProject]) extends AnyVal {
- def enrichWithSource(implicit config: GerritEndpointConfig): RDD[ProjectContributionSource] = {
+ def enrichWithSource(projectToContributorsAnalyticsUrlFactory: String => String): RDD[ProjectContributionSource] = {
rdd.map { project =>
- ProjectContributionSource(project.name, config.contributorsUrl(project.id))
+ ProjectContributionSource(project.name, projectToContributorsAnalyticsUrlFactory(project.id))
}
}
}
@@ -68,7 +68,7 @@
case class CommitterInfo(author: String, email_alias: String)
- case class CommitInfo(sha1: String, date: Long, isMerge: Boolean)
+ case class CommitInfo(sha1: String, date: Long, merge: Boolean)
case class UserActivitySummary(year: Integer,
month: Integer,
@@ -129,6 +129,14 @@
def addOrganization()(implicit spark: SparkSession): DataFrame =
df.withColumn("organization", emailToDomainUdf(col("email")))
+
+
+ def dashboardStats(aliasesDFMaybe: Option[DataFrame])(implicit spark: SparkSession) : DataFrame = {
+ df
+ .addOrganization()
+ .handleAliases(aliasesDFMaybe)
+ .convertDates("last_commit_date")
+ }
}
private def emailToDomain(email: String): String = email match {
@@ -158,4 +166,14 @@
ZoneOffset.UTC, ZoneId.of("Z")
) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
+ def getContributorStatsFromAnalyticsPlugin(projects: RDD[GerritProject], projectToContributorsAnalyticsUrlFactory: String => String)(implicit spark: SparkSession) = {
+ import spark.sqlContext.implicits._ // toDF
+
+ projects
+ .enrichWithSource(projectToContributorsAnalyticsUrlFactory)
+ .fetchRawContributors
+ .toDF("project", "json")
+ .transformCommitterInfo
+ }
+
}
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index 9f65497..1ce5b73 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -14,17 +14,40 @@
package com.gerritforge.analytics.job
-import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
-import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjectsRDD}
+import java.time.format.DateTimeFormatter
+import java.time.{LocalDate, ZoneId}
+
+import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjectsSupport}
import com.typesafe.scalalogging.LazyLogging
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
+import scopt.Read.reads
+import scopt.{OptionParser, Read}
import scala.io.{Codec, Source}
+import scala.util.control.NonFatal
object Main extends App with Job with LazyLogging {
- new scopt.OptionParser[GerritEndpointConfig]("scopt") {
+ private val fileExists: String => Either[String, Unit] = { path =>
+ if (!new java.io.File(path).exists) {
+ Left(s"ERROR: Path '$path' doesn't exists!")
+ } else {
+ Right()
+ }
+ }
+
+ implicit val localDateRead: Read[LocalDate] = reads { str =>
+ val format = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"))
+ try {
+ LocalDate.parse(str, format)
+ } catch {
+ case NonFatal(e) =>
+ throw new IllegalArgumentException(s"Invalid date '$str' expected format is '$format'")
+ }
+ }
+
+ private val cliOptionParser: OptionParser[GerritEndpointConfig] = new scopt.OptionParser[GerritEndpointConfig]("scopt") {
head("scopt", "3.x")
opt[String]('u', "url") optional() action { (x, c) =>
c.copy(baseUrl = x)
@@ -38,23 +61,22 @@
opt[String]('e', "elasticIndex") optional() action { (x, c) =>
c.copy(elasticIndex = Some(x))
} text "output directory"
- opt[String]('s', "since") optional() action { (x, c) =>
+ opt[LocalDate]('s', "since") optional() action { (x, c) =>
c.copy(since = Some(x))
} text "begin date "
- opt[String]('u', "until") optional() action { (x, c) =>
+ opt[LocalDate]('u', "until") optional() action { (x, c) =>
c.copy(until = Some(x))
} text "since date"
opt[String]('g', "aggregate") optional() action { (x, c) =>
c.copy(aggregate = Some(x))
} text "aggregate email/email_hour/email_day/email_month/email_year"
- opt[String]('a', "email-aliases") optional() action { (path, c) =>
- if (!new java.io.File(path).exists) {
- println(s"ERROR: Path '${path}' doesn't exists!")
- System.exit(1)
- }
+
+ opt[String]('a', "email-aliases") optional() validate fileExists action { (path, c) =>
c.copy(emailAlias = Some(path))
} text "\"emails to author alias\" input data path"
- }.parse(args, GerritEndpointConfig()) match {
+ }
+
+ cliOptionParser.parse(args, GerritEndpointConfig()) match {
case Some(config) =>
implicit val spark: SparkSession = SparkSession.builder()
.appName("Gerrit Analytics ETL")
@@ -64,7 +86,7 @@
logger.info(s"Starting analytics app with config $config")
- val dataFrame = run()
+ val dataFrame = buildProjectStats().cache() //This dataframe is written twice
logger.info(s"ES content created, saving it to '${config.outputDir}'")
dataFrame.write.json(config.outputDir)
@@ -78,22 +100,19 @@
trait Job { self: LazyLogging =>
implicit val codec = Codec.ISO8859
- def run()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
- import spark.sqlContext.implicits._ // toDF
+ def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
+ import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
+
implicit val sc: SparkContext = spark.sparkContext
- val projects = GerritProjectsRDD(Source.fromURL(config.gerritProjectsUrl))
+ val projects = GerritProjectsSupport.parseJsonProjectListResponse(Source.fromURL(config.gerritProjectsUrl))
+
+ logger.info(s"Loaded a list of ${projects.size} projects ${if(projects.size > 20) projects.take(20).mkString("[", ",", ", ...]") else projects.mkString("[", ",", "]")}")
val aliasesDF = getAliasDF(config.emailAlias)
- projects
- .enrichWithSource
- .fetchRawContributors
- .toDF("project", "json")
- .transformCommitterInfo
- .addOrganization()
- .handleAliases(aliasesDF)
- .convertDates("last_commit_date")
+ getContributorStatsFromAnalyticsPlugin(spark.sparkContext.parallelize(projects), config.contributorsUrl)
+ .dashboardStats(aliasesDF)
}
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
index 8134544..316847f 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
@@ -14,12 +14,15 @@
package com.gerritforge.analytics.model
+import java.time.format.DateTimeFormatter
+import java.time.{LocalDate, ZoneId}
+
case class GerritEndpointConfig(baseUrl: String = "",
prefix: Option[String] = None,
outputDir: String = s"file://${System.getProperty("java.io.tmpdir")}/analytics-${System.nanoTime()}",
elasticIndex: Option[String] = None,
- since: Option[String] = None,
- until: Option[String] = None,
+ since: Option[LocalDate] = None,
+ until: Option[LocalDate] = None,
aggregate: Option[String] = None,
emailAlias: Option[String] = None) {
@@ -31,7 +34,9 @@
}
}
- val queryString = Seq("since" -> since, "until" -> until, "aggregate" -> aggregate)
+ @transient
+ private lazy val format = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"))
+ val queryString = Seq("since" -> since.map(format.format), "until" -> until.map(format.format), "aggregate" -> aggregate)
.flatMap(queryOpt).mkString("?", "&", "")
def contributorsUrl(projectName: String) =
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala b/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
index ad8ca50..522870c 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
@@ -14,29 +14,27 @@
package com.gerritforge.analytics.model
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
import org.json4s.native.JsonMethods.parse
import scala.io.Source
case class GerritProject(id: String, name: String)
-object GerritProjectsRDD {
+object GerritProjectsSupport {
val GERRIT_PREFIX = ")]}'\n"
private val GERRIT_PREFIX_LEN = GERRIT_PREFIX.length
- def apply(jsonSource: Source)(implicit sc: SparkContext): RDD[GerritProject] =
- sc.parallelize(
- parse(jsonSource.drop(GERRIT_PREFIX_LEN).mkString)
- .values
- .asInstanceOf[Map[String, Map[String, String]]]
- .mapValues(_ ("id"))
- .toSeq)
+ def parseJsonProjectListResponse(jsonSource: Source): Seq[GerritProject] = {
+ parse(jsonSource.drop(GERRIT_PREFIX_LEN).mkString)
+ .values
+ .asInstanceOf[Map[String, Map[String, String]]]
+ .mapValues(projectAttributes => projectAttributes("id"))
+ .toSeq
.map {
- case (id, name) => GerritProject(id, name)
+ case (name, id) => GerritProject(id, name)
}
+ }
}
case class ProjectContributionSource(name: String, contributorsUrl: String)
\ No newline at end of file
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 44c134f..f8c0fab 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -17,7 +17,7 @@
import java.io.{File, FileWriter}
import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
-import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProject, GerritProjectsRDD, ProjectContributionSource}
+import com.gerritforge.analytics.model.{GerritProject, GerritProjectsSupport, ProjectContributionSource}
import org.apache.spark.sql.Row
import org.json4s.JsonDSL._
import org.json4s._
@@ -26,12 +26,11 @@
import scala.io.Source
-class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers
- with SparkTestSupport with Inside {
+class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers with SparkTestSupport with Inside {
"GerritProjects" should "parse JSON into a GerritProject objects" in {
- val projects = GerritProjectsRDD(Source.fromString(
+ val projects = GerritProjectsSupport.parseJsonProjectListResponse(Source.fromString(
""")]}'
|{
| "All-Projects-name": {
@@ -41,28 +40,25 @@
| "id": "Test-id",
| }
|}
- |""".stripMargin)).collect()
+ |""".stripMargin))
- projects should have size 2
- projects should contain
- allOf(GerritProject("All-Projects-id", "All-Projects-name"), GerritProject("Test-id", "Test-name"))
+ projects should contain only(GerritProject("All-Projects-id", "All-Projects-name"), GerritProject("Test-id", "Test-name"))
}
"enrichWithSource" should "enrich project RDD object with its source" in {
val projectRdd = sc.parallelize(Seq(GerritProject("project-id", "project-name")))
- implicit val config = GerritEndpointConfig("http://somewhere.com")
val projectWithSource = projectRdd
- .enrichWithSource
+ .enrichWithSource(projectId => s"http://somewhere.com/$projectId")
.collect
projectWithSource should have size 1
inside(projectWithSource.head) {
case ProjectContributionSource(projectName, url) => {
projectName should be("project-name")
- url should startWith("http://somewhere.com")
+ url shouldBe "http://somewhere.com/project-id"
}
}
}
diff --git a/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala b/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
index d800069..5f89e78 100644
--- a/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
+++ b/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
@@ -14,14 +14,20 @@
package com.gerritforge.analytics
-import org.apache.log4j.{Level, Logger}
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.scalatest.{BeforeAndAfterAll, Suite}
-trait SparkTestSupport {
- Logger.getLogger("org").setLevel(Level.ERROR)
- implicit val spark = SparkSession.builder()
+trait SparkTestSupport extends BeforeAndAfterAll { this: Suite =>
+
+ implicit val spark : SparkSession = SparkSession.builder()
.master("local[4]")
.getOrCreate()
- implicit val sc = spark.sparkContext
- implicit val sql = spark.sqlContext
+
+ implicit lazy val sc: SparkContext = spark.sparkContext
+ implicit lazy val sql: SQLContext = spark.sqlContext
+
+ override protected def afterAll() = {
+ spark.close()
+ }
}
\ No newline at end of file