Gerrit analytics ETL job.
Extracts committers data using analytics plugin and insert in Elastic
Search.
Accept parameters
--since --until --aggregate like gerrit analytics plugin
Additionally understands
--url to access a gerrit repo
[--out] to specify an intermediate output folder (possibly for debug)
for generated RDD
[--elasticIndex] to specify index/type to use
Change-Id: I62440b3cb4f2f3f3b2346eda9f62d81188264d8e
diff --git a/.gitignore b/.gitignore
index 9c07d4a..52cb35b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,18 @@
*.class
*.log
+
+# sbt specific
+.cache
+.history
+.lib/
+.idea/
+dist/*
+target/
+lib_managed/
+src_managed/
+project/boot/
+project/plugins/project/
+
+# Scala-IDE specific
+.scala_dependencies
+.worksheet
diff --git a/README.md b/README.md
index f897d75..c9de667 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,23 @@
# spark-gerrit-analytics-etl
-Spark ETL to extra analytics data from Gerrit Projects
+Spark ETL to extra analytics data from Gerrit Projects.
+
+Job can be launched with the following parameters:
+
+```
+bin/spark-submit \
+ --conf spark.es.nodes=company.com \
+ $JARS/SparkAnalytics-assembly-1.0.jar \
+ --since 2000-06-01 \
+ --aggregate email_hour \
+ --url http://localhost:8080 \
+ -e gerrit/analytics
+```
+### Parameters
+- since, until, aggregate are the same defined in Gerrit Analytics plugin
+ see: https://gerrit.googlesource.com/plugins/analytics/+/master/README.md
+- -u --url location/port of Gerrit server for extracting the analytics data
+- -e --elasticIndex specify as <index>/<type> to be loaded in Elastic Search
+ if not provided no ES export will be performed
+- -o --out folder location for storing the output as JSON files
+ if not provided data is saved to </tmp>/analytics-<NNNN> where </tmp> is
+ the system temporary directory
\ No newline at end of file
diff --git a/build.sbt b/build.sbt
new file mode 100644
index 0000000..a332f49
--- /dev/null
+++ b/build.sbt
@@ -0,0 +1,22 @@
+name := "GerritAnalytics"
+
+version := "1.0"
+
+scalaVersion := "2.11.8"
+
+libraryDependencies ++= Seq(
+ "org.apache.spark" %% "spark-core" % "2.1.1" % "provided"
+ exclude("org.spark-project.spark", "unused"),
+ "org.elasticsearch" %% "elasticsearch-spark-20" % "5.0.2"
+ excludeAll ExclusionRule(organization = "org.apache.spark"),
+
+ // fixed versions for apache spark 2.1.1
+ "org.json4s" %% "json4s-native" % "3.2.11",
+ "joda-time" % "joda-time" % "2.9.3",
+
+ "com.github.scopt" %% "scopt" % "3.6.0",
+ "org.scalactic" %% "scalactic" % "3.0.1" % "test",
+ "org.scalatest" %% "scalatest" % "3.0.1" % "test"
+)
+
+mainClass in (Compile,run) := Some("com.gerritforge.analytics.job.Main")
\ No newline at end of file
diff --git a/project/assembly.sbt b/project/assembly.sbt
new file mode 100644
index 0000000..74adde3
--- /dev/null
+++ b/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
diff --git a/project/build.properties b/project/build.properties
new file mode 100644
index 0000000..64317fd
--- /dev/null
+++ b/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.15
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala
new file mode 100644
index 0000000..8756178
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala
@@ -0,0 +1,79 @@
+package com.gerritforge.analytics.engine
+
+import java.io.{BufferedReader, IOException, InputStreamReader}
+import java.net.URL
+import java.nio.charset.StandardCharsets
+import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
+import java.time.format.DateTimeFormatter
+
+import com.gerritforge.analytics.model.{GerritEndpointConfig, ProjectContribution, ProjectContributionSource}
+import org.apache.spark.rdd.RDD
+import org.json4s.JsonAST.{JField, JInt, JString}
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+
+import scala.collection.JavaConverters._
+
+object GerritAnalyticsTrasformations {
+
+ private[analytics] def longDateToISO(in: Number): String =
+ ZonedDateTime.ofInstant(
+ LocalDateTime.ofEpochSecond(in.longValue() / 1000L, 0, ZoneOffset.UTC),
+ ZoneOffset.UTC, ZoneId.of("Z")
+ ) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
+
+ private[analytics] def transformLongDateToISO(in: String): JObject = {
+ parse(in).transformField {
+ case JField(fieldName, JInt(v)) if (fieldName=="date" || fieldName=="last_commit_date") =>
+ JField(fieldName, JString(longDateToISO(v)))
+ }.asInstanceOf[JObject]
+ }
+
+ def getFileContentAsProjectContributions(sourceUrl: String, projectName: String): Iterator[ProjectContribution] = {
+ val is = new URL(sourceUrl).openConnection.getInputStream
+ new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))
+ .lines.iterator().asScala
+ .filterNot(_.trim.isEmpty)
+ .map(transformLongDateToISO)
+ .map(ProjectContribution(projectName, _))
+
+ }
+
+ implicit class PimpedRddOfProjectContributionSource(val rdd: RDD[ProjectContributionSource]) extends AnyVal {
+
+ def fetchContributors: RDD[ProjectContribution] = {
+ rdd.flatMap {
+ case ProjectContributionSource(projectName, sourceUrl) =>
+ try {
+ getFileContentAsProjectContributions(sourceUrl, projectName)
+ } catch {
+ case ioex: IOException => None
+ }
+ }
+ }
+ }
+
+ implicit class PimpedRddOfProjects(val rdd: RDD[String]) extends AnyVal {
+
+ def enrichWithSource(config: GerritEndpointConfig) = {
+ rdd.map { projectName =>
+ ProjectContributionSource(projectName, config.contributorsUrl(projectName))
+ }
+ }
+ }
+
+ implicit class PimpedRddOfProjects2Json(val rdd: RDD[ProjectContribution]) extends AnyVal {
+ def toJson() = {
+ rdd.map(pc =>
+ compact(render(
+ ("project" -> pc.projectName) ~ pc.authorContribution)
+ )
+ )
+ }
+ }
+
+}
+
+
+
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
new file mode 100644
index 0000000..ad1daca
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -0,0 +1,60 @@
+package com.gerritforge.analytics.job
+
+import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjects, ProjectContribution}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.io.{Codec, Source}
+import org.elasticsearch.spark._
+
+object Main extends App with Job {
+
+ new scopt.OptionParser[GerritEndpointConfig]("scopt") {
+ head("scopt", "3.x")
+ opt[String]('u', "url") optional() action { (x, c) =>
+ c.copy(baseUrl = x)
+ } text "gerrit url"
+ opt[String]('o', "out") optional() action { (x, c) =>
+ c.copy(outputDir = x)
+ } text "output directory"
+ opt[String]('e', "elasticIndex") optional() action { (x, c) =>
+ c.copy(elasticIndex = Some(x))
+ } text "output directory"
+ opt[String]('s', "since") optional() action { (x, c) =>
+ c.copy(since = Some(x))
+ } text "begin date "
+ opt[String]('u', "until") optional() action { (x, c) =>
+ c.copy(until = Some(x))
+ } text "since date"
+ opt[String]('g', "aggregate") optional() action { (x, c) =>
+ c.copy(aggregate = Some(x))
+ } text "aggregate email/email_hour/email_day/email_month/email_year"
+ }.parse(args, GerritEndpointConfig()) match {
+ case Some(config) =>
+ val sparkConf = new SparkConf().setAppName("Gerrit Analytics ETL")
+ val sc = new SparkContext(sparkConf)
+
+ val outRDD = run(config, sc)
+ outRDD.saveAsTextFile(config.outputDir)
+ saveES(config,outRDD)
+
+
+ case None => // invalid configuration usage has been displayed
+ }
+}
+
+trait Job {
+ implicit val codec = Codec.ISO8859
+
+ import com.gerritforge.analytics.engine.GerritAnalyticsTrasformations._
+
+ def run(implicit config: GerritEndpointConfig, sc: SparkContext): RDD[ProjectContribution] = {
+ val rdd: RDD[String] = sc.parallelize(GerritProjects(Source.fromURL(s"${config.baseUrl}/projects/")))
+
+ rdd.enrichWithSource(config).fetchContributors
+ }
+ def saveES(implicit config: GerritEndpointConfig, rdd: RDD[ProjectContribution]) = {
+ config.elasticIndex.map(rdd.toJson().saveJsonToEs(_))
+ }
+}
+
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
new file mode 100644
index 0000000..5e14fdf
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
@@ -0,0 +1,21 @@
+package com.gerritforge.analytics.model
+
+case class GerritEndpointConfig(baseUrl: String = "",
+ outputDir: String = s"${System.getProperty("java.io.tmp")}/analytics-${System.nanoTime()}",
+ elasticIndex: Option[String] = None,
+ since: Option[String] = None,
+ until: Option[String] = None,
+ aggregate: Option[String] = None) {
+
+ def queryOpt(opt: (String, Option[String])): Option[String] = {
+ opt match {
+ case (name: String, value: Option[String]) => value.map(name + "=" + _)
+ }
+ }
+
+ val queryString = Seq("since" -> since, "until" -> until, "aggregate" -> aggregate)
+ .flatMap(queryOpt).mkString("?", "&", "")
+
+ def contributorsUrl(projectName: String) =
+ s"${baseUrl}/projects/$projectName/analytics~contributors${queryString}"
+}
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritProjects.scala b/src/main/scala/com/gerritforge/analytics/model/GerritProjects.scala
new file mode 100644
index 0000000..35bab23
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritProjects.scala
@@ -0,0 +1,28 @@
+package com.gerritforge.analytics.model
+
+import org.json4s.JObject
+import org.json4s.native.JsonMethods.parse
+
+import scala.io.Source
+
+/**
+ * Created by lucamilanesio on 22/08/2017.
+ */
+object GerritProjects {
+
+ type GerritProjects = Seq[String]
+
+ val GERRIT_PREFIX_LEN = ")]}'\n".length
+
+ def apply(jsonSource: Source) =
+ parse(jsonSource.drop(GERRIT_PREFIX_LEN).mkString)
+ .asInstanceOf[JObject]
+ .values
+ .keys
+ .toSeq
+}
+
+case class ProjectContributionSource(name: String, contributorsUrl: String)
+
+case class ProjectContribution(projectName: String, authorContribution: JObject)
+
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
new file mode 100644
index 0000000..e26013e
--- /dev/null
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -0,0 +1,95 @@
+package com.gerritforge.analytics
+
+import java.io.{File, FileWriter}
+
+import com.gerritforge.analytics.model.{GerritEndpointConfig, ProjectContribution, ProjectContributionSource}
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+import org.scalatest.{FlatSpec, Inside, Matchers}
+
+class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers with SparkTestSupport with Inside {
+
+ import com.gerritforge.analytics.engine.GerritAnalyticsTrasformations._
+
+ "A project" should "be enriched with its contributors URL endpoint" in {
+ withSparkContext { sc =>
+ val projectRdd = sc.parallelize(Seq("project"))
+ val config = GerritEndpointConfig("http://somewhere.com")
+
+ val projectWithSource = projectRdd.enrichWithSource(config).collect
+
+ projectWithSource should have size 1
+
+ inside(projectWithSource.head) {
+ case ProjectContributionSource(project, url) => {
+ project should be("project")
+ url should startWith("http://somewhere.com")
+ }
+ }
+ }
+ }
+
+ it should "return one line enriched with its contributor" in {
+ val contributorsJson: JObject = ("foo" -> "bar")
+ val projectSource = ProjectContributionSource("project", newSource(contributorsJson))
+
+ withSparkContext { sc =>
+ val projectContributions = sc.parallelize(Seq(projectSource))
+ .fetchContributors
+ .collect()
+
+ projectContributions should contain(
+ ProjectContribution("project", contributorsJson)
+ )
+ }
+ }
+
+ it should "return two lines enriched with its two contributors" in {
+ val contributor1: JObject = ("foo" -> "bar")
+ val contributor2: JObject = ("first" -> "John")
+ val projectSource = ProjectContributionSource("project", newSource(contributor1, contributor2))
+
+ withSparkContext { sc =>
+ val projectContributions = sc.parallelize(Seq(projectSource))
+ .fetchContributors
+ .collect()
+
+ projectContributions should contain allOf(
+ ProjectContribution("project", contributor1),
+ ProjectContribution("project", contributor2)
+ )
+ }
+ }
+
+ "A ProjectContribution RDD" should "serialize to Json" in {
+ val pc1 = ProjectContribution("project", ("foo" -> "bar"))
+ val pc2 = ProjectContribution("project", ("foo2" -> "bar2"))
+ withSparkContext { sc =>
+ sc.parallelize(Seq(pc1,pc2)).toJson.collect should contain allOf(
+ """{"project":"project","foo":"bar"}""",
+ """{"project":"project","foo2":"bar2"}""")
+ }
+ }
+ "getFileContentAsProjectContributions" should "collect contributors and handle utf-8" in {
+ val contributor1: JObject = ("foo" -> "bar")
+ val contributor2: JObject = ("first" -> "A with macron in unicode is: \u0100")
+ val url = newSource(contributor1, contributor2)
+ val projectContributions = getFileContentAsProjectContributions(url,"project").toArray
+
+ projectContributions should contain allOf(
+ ProjectContribution("project", contributor1),
+ ProjectContribution("project", contributor2)
+ )
+ }
+
+ private def newSource(contributorsJson: JObject*): String = {
+ val tmpFile = File.createTempFile(System.getProperty("java.io.tmpdir"),
+ s"${getClass.getName}-${System.nanoTime()}")
+
+ val out = new FileWriter(tmpFile)
+ contributorsJson.foreach(json => out.write(compact(render(json)) + '\n'))
+ out.close
+ tmpFile.toURI.toString
+ }
+}
diff --git a/src/test/scala/com/gerritforge/analytics/GerritProjectsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritProjectsSpec.scala
new file mode 100644
index 0000000..b2dc81f
--- /dev/null
+++ b/src/test/scala/com/gerritforge/analytics/GerritProjectsSpec.scala
@@ -0,0 +1,29 @@
+package com.gerritforge.analytics
+
+import com.gerritforge.analytics.model.GerritProjects
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.io.{Codec, Source}
+
+class GerritProjectsSpec extends FlatSpec with Matchers {
+
+ "GerritProjects" should "parseProjects" in {
+ val projectNames = GerritProjects(Source.fromString(
+ """)]}'
+ |{
+ | "All-Projects": {
+ | "id": "All-Projects",
+ | "state": "ACTIVE"
+ | },
+ | "Test": {
+ | "id": "Test",
+ | "state": "ACTIVE"
+ | }
+ |}
+ |""".stripMargin))
+
+ projectNames should have size 2
+ projectNames should contain allOf("All-Projects", "Test")
+ }
+
+}
diff --git a/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala b/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
new file mode 100644
index 0000000..6a596ba
--- /dev/null
+++ b/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
@@ -0,0 +1,18 @@
+package com.gerritforge.analytics
+
+import org.apache.commons.lang.RandomStringUtils
+import org.apache.spark.{SparkConf, SparkContext}
+
+trait SparkTestSupport {
+
+ def withSparkContext(test: SparkContext => Unit): Unit = {
+ val sc = new SparkContext("local[4]", RandomStringUtils.randomAlphabetic(10))
+ try {
+ test(sc)
+ } finally {
+ sc.stop()
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/scala/com/gerritforge/analytics/engine/DateConverterTest.scala b/src/test/scala/com/gerritforge/analytics/engine/DateConverterTest.scala
new file mode 100644
index 0000000..c6eee80
--- /dev/null
+++ b/src/test/scala/com/gerritforge/analytics/engine/DateConverterTest.scala
@@ -0,0 +1,32 @@
+package com.gerritforge.analytics.engine
+
+import com.gerritforge.analytics.engine.GerritAnalyticsTrasformations._
+import org.json4s.JsonAST.{JArray, JString}
+import org.json4s.native.JsonMethods.{compact, render}
+import org.scalatest.matchers.{MatchResult, Matcher}
+import org.scalatest.{FlatSpec, Inside, Matchers}
+
+class DateConverterTest extends FlatSpec with Matchers with Inside {
+
+ "converter" should "convert 3 canonical numbers in a nested json object" in {
+ val DATES = Map(
+ 0L -> "1970-01-01T00:00:00Z",
+ 1500000000000L -> "2017-07-14T02:40:00Z",
+ 1600000000000L -> "2020-09-13T12:26:40Z")
+ val json =
+ """{
+ "date": 0,
+ "name": "foo",
+ "commits": [
+ {"sha1": "xxx", "last_commit_date": 1500000000000},
+ {"sha1": "yyy", "last_commit_date": 1600000000000}
+ }
+ }"""
+ val out = transformLongDateToISO(json)
+ inside (out \ "date") { case JString(s0) => s0 should equal(DATES(0L)) }
+ inside (out \ "commits") { case JArray(List(o1,o2)) =>
+ inside (o1 \ "last_commit_date") { case JString(s15) => s15 should equal(DATES(1500000000000L))}
+ inside (o2 \ "last_commit_date") { case JString(s16) => s16 should equal(DATES(1600000000000L))}
+ }
+ }
+}