Gerrit analytics ETL job.

Extracts committers data using analytics plugin and insert in Elastic
Search.
Accept parameters
--since --until --aggregate like gerrit analytics plugin
Additionally understands
--url to access a gerrit repo
[--out] to specify an intermediate output folder (possibly for debug)
for generated RDD
[--elasticIndex] to specify index/type to use

Change-Id: I62440b3cb4f2f3f3b2346eda9f62d81188264d8e
diff --git a/.gitignore b/.gitignore
index 9c07d4a..52cb35b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,18 @@
 *.class
 *.log
+
+# sbt specific
+.cache
+.history
+.lib/
+.idea/
+dist/*
+target/
+lib_managed/
+src_managed/
+project/boot/
+project/plugins/project/
+
+# Scala-IDE specific
+.scala_dependencies
+.worksheet
diff --git a/README.md b/README.md
index f897d75..c9de667 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,23 @@
 # spark-gerrit-analytics-etl
-Spark ETL to extra analytics data from Gerrit Projects
+Spark ETL to extra analytics data from Gerrit Projects.
+
+Job can be launched with the following parameters:
+
+```
+bin/spark-submit \
+    --conf spark.es.nodes=company.com \
+    $JARS/SparkAnalytics-assembly-1.0.jar \
+    --since 2000-06-01 \
+    --aggregate email_hour \ 
+    --url http://localhost:8080 \ 
+    -e gerrit/analytics
+```
+### Parameters
+- since, until, aggregate are the same defined in Gerrit Analytics plugin
+    see: https://gerrit.googlesource.com/plugins/analytics/+/master/README.md
+- -u --url location/port of Gerrit server for extracting the analytics data
+- -e --elasticIndex specify as <index>/<type> to be loaded in Elastic Search
+    if not provided no ES export will be performed
+- -o --out folder location for storing the output as JSON files
+    if not provided data is saved to </tmp>/analytics-<NNNN> where </tmp> is
+    the system temporary directory
\ No newline at end of file
diff --git a/build.sbt b/build.sbt
new file mode 100644
index 0000000..a332f49
--- /dev/null
+++ b/build.sbt
@@ -0,0 +1,22 @@
+name := "GerritAnalytics"
+
+version := "1.0"
+
+scalaVersion := "2.11.8"
+
+libraryDependencies ++= Seq(
+  "org.apache.spark" %% "spark-core" % "2.1.1" % "provided"
+    exclude("org.spark-project.spark", "unused"),
+  "org.elasticsearch" %% "elasticsearch-spark-20" % "5.0.2"
+    excludeAll ExclusionRule(organization = "org.apache.spark"),
+
+  // fixed versions for apache spark 2.1.1
+  "org.json4s" %% "json4s-native" % "3.2.11",
+  "joda-time" % "joda-time" % "2.9.3",
+
+  "com.github.scopt" %% "scopt" % "3.6.0",
+  "org.scalactic" %% "scalactic" % "3.0.1" % "test",
+  "org.scalatest" %% "scalatest" % "3.0.1" % "test"
+)
+
+mainClass in (Compile,run) := Some("com.gerritforge.analytics.job.Main")
\ No newline at end of file
diff --git a/project/assembly.sbt b/project/assembly.sbt
new file mode 100644
index 0000000..74adde3
--- /dev/null
+++ b/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
diff --git a/project/build.properties b/project/build.properties
new file mode 100644
index 0000000..64317fd
--- /dev/null
+++ b/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.15
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala
new file mode 100644
index 0000000..8756178
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTrasformations.scala
@@ -0,0 +1,79 @@
+package com.gerritforge.analytics.engine
+
+import java.io.{BufferedReader, IOException, InputStreamReader}
+import java.net.URL
+import java.nio.charset.StandardCharsets
+import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
+import java.time.format.DateTimeFormatter
+
+import com.gerritforge.analytics.model.{GerritEndpointConfig, ProjectContribution, ProjectContributionSource}
+import org.apache.spark.rdd.RDD
+import org.json4s.JsonAST.{JField, JInt, JString}
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+
+import scala.collection.JavaConverters._
+
+object GerritAnalyticsTrasformations {
+
+  private[analytics] def longDateToISO(in: Number): String =
+    ZonedDateTime.ofInstant(
+      LocalDateTime.ofEpochSecond(in.longValue() / 1000L, 0, ZoneOffset.UTC),
+      ZoneOffset.UTC, ZoneId.of("Z")
+    ) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
+
+  private[analytics] def transformLongDateToISO(in: String): JObject = {
+    parse(in).transformField {
+      case JField(fieldName, JInt(v)) if (fieldName=="date" || fieldName=="last_commit_date") =>
+        JField(fieldName, JString(longDateToISO(v)))
+    }.asInstanceOf[JObject]
+  }
+
+  def getFileContentAsProjectContributions(sourceUrl: String, projectName: String): Iterator[ProjectContribution] = {
+    val is = new URL(sourceUrl).openConnection.getInputStream
+    new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))
+      .lines.iterator().asScala
+      .filterNot(_.trim.isEmpty)
+      .map(transformLongDateToISO)
+      .map(ProjectContribution(projectName, _))
+
+  }
+
+  implicit class PimpedRddOfProjectContributionSource(val rdd: RDD[ProjectContributionSource]) extends AnyVal {
+
+    def fetchContributors: RDD[ProjectContribution] = {
+      rdd.flatMap {
+        case ProjectContributionSource(projectName, sourceUrl) =>
+          try {
+            getFileContentAsProjectContributions(sourceUrl, projectName)
+          } catch {
+            case ioex: IOException => None
+          }
+      }
+    }
+  }
+
+  implicit class PimpedRddOfProjects(val rdd: RDD[String]) extends AnyVal {
+
+    def enrichWithSource(config: GerritEndpointConfig) = {
+      rdd.map { projectName =>
+        ProjectContributionSource(projectName, config.contributorsUrl(projectName))
+      }
+    }
+  }
+
+  implicit class PimpedRddOfProjects2Json(val rdd: RDD[ProjectContribution]) extends AnyVal {
+    def toJson() = {
+      rdd.map(pc =>
+        compact(render(
+          ("project" -> pc.projectName) ~ pc.authorContribution)
+        )
+      )
+    }
+  }
+
+}
+
+
+
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
new file mode 100644
index 0000000..ad1daca
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -0,0 +1,60 @@
+package com.gerritforge.analytics.job
+
+import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjects, ProjectContribution}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.io.{Codec, Source}
+import org.elasticsearch.spark._
+
+object Main extends App with Job {
+
+  new scopt.OptionParser[GerritEndpointConfig]("scopt") {
+    head("scopt", "3.x")
+    opt[String]('u', "url") optional() action { (x, c) =>
+      c.copy(baseUrl = x)
+    } text "gerrit url"
+    opt[String]('o', "out") optional() action { (x, c) =>
+      c.copy(outputDir = x)
+    } text "output directory"
+    opt[String]('e', "elasticIndex") optional() action { (x, c) =>
+      c.copy(elasticIndex = Some(x))
+    } text "output directory"
+    opt[String]('s', "since") optional() action { (x, c) =>
+      c.copy(since = Some(x))
+    } text "begin date "
+    opt[String]('u', "until") optional() action { (x, c) =>
+      c.copy(until = Some(x))
+    } text "since date"
+    opt[String]('g', "aggregate") optional() action { (x, c) =>
+      c.copy(aggregate = Some(x))
+    } text "aggregate email/email_hour/email_day/email_month/email_year"
+  }.parse(args, GerritEndpointConfig()) match {
+    case Some(config) =>
+      val sparkConf = new SparkConf().setAppName("Gerrit Analytics ETL")
+      val sc = new SparkContext(sparkConf)
+
+      val outRDD = run(config, sc)
+      outRDD.saveAsTextFile(config.outputDir)
+      saveES(config,outRDD)
+
+
+    case None => // invalid configuration usage has been displayed
+  }
+}
+
+trait Job {
+  implicit val codec = Codec.ISO8859
+
+  import com.gerritforge.analytics.engine.GerritAnalyticsTrasformations._
+
+  def run(implicit config: GerritEndpointConfig, sc: SparkContext): RDD[ProjectContribution] = {
+    val rdd: RDD[String] = sc.parallelize(GerritProjects(Source.fromURL(s"${config.baseUrl}/projects/")))
+
+    rdd.enrichWithSource(config).fetchContributors
+  }
+  def saveES(implicit config: GerritEndpointConfig, rdd: RDD[ProjectContribution]) = {
+      config.elasticIndex.map(rdd.toJson().saveJsonToEs(_))
+  }
+}
+
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
new file mode 100644
index 0000000..5e14fdf
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
@@ -0,0 +1,21 @@
+package com.gerritforge.analytics.model
+
+case class GerritEndpointConfig(baseUrl: String = "",
+                                outputDir: String = s"${System.getProperty("java.io.tmp")}/analytics-${System.nanoTime()}",
+                                elasticIndex: Option[String] = None,
+                                since: Option[String] = None,
+                                until: Option[String] = None,
+                                aggregate: Option[String] = None) {
+
+  def queryOpt(opt: (String, Option[String])): Option[String] = {
+    opt match {
+      case (name: String, value: Option[String]) => value.map(name + "=" + _)
+    }
+  }
+
+  val queryString = Seq("since" -> since, "until" -> until, "aggregate" -> aggregate)
+    .flatMap(queryOpt).mkString("?", "&", "")
+
+  def contributorsUrl(projectName: String) =
+    s"${baseUrl}/projects/$projectName/analytics~contributors${queryString}"
+}
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritProjects.scala b/src/main/scala/com/gerritforge/analytics/model/GerritProjects.scala
new file mode 100644
index 0000000..35bab23
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritProjects.scala
@@ -0,0 +1,28 @@
+package com.gerritforge.analytics.model
+
+import org.json4s.JObject
+import org.json4s.native.JsonMethods.parse
+
+import scala.io.Source
+
+/**
+  * Created by lucamilanesio on 22/08/2017.
+  */
+object GerritProjects {
+
+  type GerritProjects = Seq[String]
+
+  val GERRIT_PREFIX_LEN = ")]}'\n".length
+
+  def apply(jsonSource: Source) =
+    parse(jsonSource.drop(GERRIT_PREFIX_LEN).mkString)
+      .asInstanceOf[JObject]
+      .values
+      .keys
+      .toSeq
+}
+
+case class ProjectContributionSource(name: String, contributorsUrl: String)
+
+case class ProjectContribution(projectName: String, authorContribution: JObject)
+
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
new file mode 100644
index 0000000..e26013e
--- /dev/null
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -0,0 +1,95 @@
+package com.gerritforge.analytics
+
+import java.io.{File, FileWriter}
+
+import com.gerritforge.analytics.model.{GerritEndpointConfig, ProjectContribution, ProjectContributionSource}
+import org.json4s.JsonDSL._
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+import org.scalatest.{FlatSpec, Inside, Matchers}
+
+class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers with SparkTestSupport with Inside {
+
+  import com.gerritforge.analytics.engine.GerritAnalyticsTrasformations._
+
+  "A project" should "be enriched with its contributors URL endpoint" in {
+    withSparkContext { sc =>
+      val projectRdd = sc.parallelize(Seq("project"))
+      val config = GerritEndpointConfig("http://somewhere.com")
+
+      val projectWithSource = projectRdd.enrichWithSource(config).collect
+
+      projectWithSource should have size 1
+
+      inside(projectWithSource.head) {
+        case ProjectContributionSource(project, url) => {
+          project should be("project")
+          url should startWith("http://somewhere.com")
+        }
+      }
+    }
+  }
+
+  it should "return one line enriched with its contributor" in {
+    val contributorsJson: JObject = ("foo" -> "bar")
+    val projectSource = ProjectContributionSource("project", newSource(contributorsJson))
+
+    withSparkContext { sc =>
+      val projectContributions = sc.parallelize(Seq(projectSource))
+        .fetchContributors
+        .collect()
+
+      projectContributions should contain(
+        ProjectContribution("project", contributorsJson)
+      )
+    }
+  }
+
+  it should "return two lines enriched with its two contributors" in {
+    val contributor1: JObject = ("foo" -> "bar")
+    val contributor2: JObject = ("first" -> "John")
+    val projectSource = ProjectContributionSource("project", newSource(contributor1, contributor2))
+
+    withSparkContext { sc =>
+      val projectContributions = sc.parallelize(Seq(projectSource))
+        .fetchContributors
+        .collect()
+
+      projectContributions should contain allOf(
+        ProjectContribution("project", contributor1),
+        ProjectContribution("project", contributor2)
+      )
+    }
+  }
+
+  "A ProjectContribution RDD" should "serialize to Json" in {
+    val pc1 = ProjectContribution("project", ("foo" -> "bar"))
+    val pc2 = ProjectContribution("project", ("foo2" -> "bar2"))
+    withSparkContext { sc =>
+      sc.parallelize(Seq(pc1,pc2)).toJson.collect should contain allOf(
+      """{"project":"project","foo":"bar"}""",
+      """{"project":"project","foo2":"bar2"}""")
+    }
+  }
+  "getFileContentAsProjectContributions" should "collect contributors and handle utf-8" in {
+    val contributor1: JObject = ("foo" -> "bar")
+    val contributor2: JObject = ("first" -> "A with macron in unicode is: \u0100")
+    val url = newSource(contributor1, contributor2)
+    val projectContributions = getFileContentAsProjectContributions(url,"project").toArray
+
+    projectContributions should contain allOf(
+      ProjectContribution("project", contributor1),
+      ProjectContribution("project", contributor2)
+    )
+  }
+
+  private def newSource(contributorsJson: JObject*): String = {
+    val tmpFile = File.createTempFile(System.getProperty("java.io.tmpdir"),
+      s"${getClass.getName}-${System.nanoTime()}")
+
+    val out = new FileWriter(tmpFile)
+    contributorsJson.foreach(json => out.write(compact(render(json)) + '\n'))
+    out.close
+    tmpFile.toURI.toString
+  }
+}
diff --git a/src/test/scala/com/gerritforge/analytics/GerritProjectsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritProjectsSpec.scala
new file mode 100644
index 0000000..b2dc81f
--- /dev/null
+++ b/src/test/scala/com/gerritforge/analytics/GerritProjectsSpec.scala
@@ -0,0 +1,29 @@
+package com.gerritforge.analytics
+
+import com.gerritforge.analytics.model.GerritProjects
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.io.{Codec, Source}
+
+class GerritProjectsSpec extends FlatSpec with Matchers {
+
+  "GerritProjects" should "parseProjects" in {
+    val projectNames = GerritProjects(Source.fromString(
+      """)]}'
+        |{
+        | "All-Projects": {
+        |   "id": "All-Projects",
+        |   "state": "ACTIVE"
+        | },
+        | "Test": {
+        |   "id": "Test",
+        |   "state": "ACTIVE"
+        | }
+        |}
+        |""".stripMargin))
+
+    projectNames should have size 2
+    projectNames should contain allOf("All-Projects", "Test")
+  }
+
+}
diff --git a/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala b/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
new file mode 100644
index 0000000..6a596ba
--- /dev/null
+++ b/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
@@ -0,0 +1,18 @@
+package com.gerritforge.analytics
+
+import org.apache.commons.lang.RandomStringUtils
+import org.apache.spark.{SparkConf, SparkContext}
+
+trait SparkTestSupport {
+
+  def withSparkContext(test: SparkContext => Unit): Unit = {
+    val sc = new SparkContext("local[4]", RandomStringUtils.randomAlphabetic(10))
+    try {
+      test(sc)
+    } finally {
+      sc.stop()
+      // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+      System.clearProperty("spark.master.port")
+    }
+  }
+}
\ No newline at end of file
diff --git a/src/test/scala/com/gerritforge/analytics/engine/DateConverterTest.scala b/src/test/scala/com/gerritforge/analytics/engine/DateConverterTest.scala
new file mode 100644
index 0000000..c6eee80
--- /dev/null
+++ b/src/test/scala/com/gerritforge/analytics/engine/DateConverterTest.scala
@@ -0,0 +1,32 @@
+package com.gerritforge.analytics.engine
+
+import com.gerritforge.analytics.engine.GerritAnalyticsTrasformations._
+import org.json4s.JsonAST.{JArray, JString}
+import org.json4s.native.JsonMethods.{compact, render}
+import org.scalatest.matchers.{MatchResult, Matcher}
+import org.scalatest.{FlatSpec, Inside, Matchers}
+
+class DateConverterTest extends FlatSpec with Matchers with Inside {
+
+  "converter" should "convert 3 canonical numbers in a nested json object" in {
+    val DATES = Map(
+      0L -> "1970-01-01T00:00:00Z",
+      1500000000000L -> "2017-07-14T02:40:00Z",
+      1600000000000L -> "2020-09-13T12:26:40Z")
+    val json =
+      """{
+         "date": 0,
+         "name": "foo",
+         "commits": [
+            {"sha1": "xxx", "last_commit_date": 1500000000000},
+            {"sha1": "yyy", "last_commit_date": 1600000000000}
+          }
+      }"""
+    val out = transformLongDateToISO(json)
+    inside (out \ "date") { case JString(s0) => s0 should equal(DATES(0L)) }
+    inside (out \ "commits") { case JArray(List(o1,o2)) =>
+      inside (o1 \ "last_commit_date") { case JString(s15) => s15 should equal(DATES(1500000000000L))}
+      inside (o2 \ "last_commit_date") { case JString(s16) => s16 should equal(DATES(1600000000000L))}
+    }
+  }
+}