Fix inversion of name and ID when parsing project list API response

Project name and code were swapped in the code reading it.
Done some code refactoring too

Change-Id: Ic212a890ca577bdf9969713fc0f775297af580a5
Jira-Id: GERICS-633
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index 7bcecb9..846fb36 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -23,17 +23,17 @@
 import com.gerritforge.analytics.model._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.functions.{udf, _}
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 
 import scala.collection.JavaConverters._
 
 object GerritAnalyticsTransformations {
 
-  implicit class PimpedRDDString(val rdd: RDD[GerritProject]) extends AnyVal {
+  implicit class PimpedGerritProjectRDD(val rdd: RDD[GerritProject]) extends AnyVal {
 
-    def enrichWithSource(implicit config: GerritEndpointConfig): RDD[ProjectContributionSource] = {
+    def enrichWithSource(projectToContributorsAnalyticsUrlFactory: String => String): RDD[ProjectContributionSource] = {
       rdd.map { project =>
-        ProjectContributionSource(project.name, config.contributorsUrl(project.id))
+        ProjectContributionSource(project.name, projectToContributorsAnalyticsUrlFactory(project.id))
       }
     }
   }
@@ -68,7 +68,7 @@
 
   case class CommitterInfo(author: String, email_alias: String)
 
-  case class CommitInfo(sha1: String, date: Long, isMerge: Boolean)
+  case class CommitInfo(sha1: String, date: Long, merge: Boolean)
 
   case class UserActivitySummary(year: Integer,
                                  month: Integer,
@@ -129,6 +129,14 @@
 
     def addOrganization()(implicit spark: SparkSession): DataFrame =
       df.withColumn("organization", emailToDomainUdf(col("email")))
+
+
+    def dashboardStats(aliasesDFMaybe: Option[DataFrame])(implicit spark: SparkSession) : DataFrame = {
+      df
+        .addOrganization()
+        .handleAliases(aliasesDFMaybe)
+        .convertDates("last_commit_date")
+    }
   }
 
   private def emailToDomain(email: String): String = email match {
@@ -158,4 +166,14 @@
       ZoneOffset.UTC, ZoneId.of("Z")
     ) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
 
+  def getContributorStatsFromAnalyticsPlugin(projects: RDD[GerritProject], projectToContributorsAnalyticsUrlFactory: String => String)(implicit spark: SparkSession) = {
+    import spark.sqlContext.implicits._ // toDF
+
+    projects
+      .enrichWithSource(projectToContributorsAnalyticsUrlFactory)
+      .fetchRawContributors
+      .toDF("project", "json")
+      .transformCommitterInfo
+  }
+
 }
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index 9f65497..1ce5b73 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -14,17 +14,40 @@
 
 package com.gerritforge.analytics.job
 
-import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
-import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjectsRDD}
+import java.time.format.DateTimeFormatter
+import java.time.{LocalDate, ZoneId}
+
+import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjectsSupport}
 import com.typesafe.scalalogging.LazyLogging
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.{DataFrame, SparkSession}
+import scopt.Read.reads
+import scopt.{OptionParser, Read}
 
 import scala.io.{Codec, Source}
+import scala.util.control.NonFatal
 
 object Main extends App with Job with LazyLogging {
 
-  new scopt.OptionParser[GerritEndpointConfig]("scopt") {
+  private val fileExists: String => Either[String, Unit] = { path =>
+    if (!new java.io.File(path).exists) {
+      Left(s"ERROR: Path '$path' doesn't exists!")
+    } else {
+      Right()
+    }
+  }
+
+  implicit val localDateRead: Read[LocalDate] = reads { str =>
+    val format = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"))
+    try {
+      LocalDate.parse(str, format)
+    } catch {
+      case NonFatal(e) =>
+        throw new IllegalArgumentException(s"Invalid date '$str' expected format is '$format'")
+    }
+  }
+
+  private val cliOptionParser: OptionParser[GerritEndpointConfig] = new scopt.OptionParser[GerritEndpointConfig]("scopt") {
     head("scopt", "3.x")
     opt[String]('u', "url") optional() action { (x, c) =>
       c.copy(baseUrl = x)
@@ -38,23 +61,22 @@
     opt[String]('e', "elasticIndex") optional() action { (x, c) =>
       c.copy(elasticIndex = Some(x))
     } text "output directory"
-    opt[String]('s', "since") optional() action { (x, c) =>
+    opt[LocalDate]('s', "since") optional() action { (x, c) =>
       c.copy(since = Some(x))
     } text "begin date "
-    opt[String]('u', "until") optional() action { (x, c) =>
+    opt[LocalDate]('u', "until") optional() action { (x, c) =>
       c.copy(until = Some(x))
     } text "since date"
     opt[String]('g', "aggregate") optional() action { (x, c) =>
       c.copy(aggregate = Some(x))
     } text "aggregate email/email_hour/email_day/email_month/email_year"
-    opt[String]('a', "email-aliases") optional() action { (path, c) =>
-      if (!new java.io.File(path).exists) {
-        println(s"ERROR: Path '${path}' doesn't exists!")
-        System.exit(1)
-      }
+
+    opt[String]('a', "email-aliases") optional() validate fileExists action { (path, c) =>
       c.copy(emailAlias = Some(path))
     } text "\"emails to author alias\" input data path"
-  }.parse(args, GerritEndpointConfig()) match {
+  }
+
+  cliOptionParser.parse(args, GerritEndpointConfig()) match {
     case Some(config) =>
       implicit val spark: SparkSession = SparkSession.builder()
         .appName("Gerrit Analytics ETL")
@@ -64,7 +86,7 @@
 
       logger.info(s"Starting analytics app with config $config")
 
-      val dataFrame = run()
+      val dataFrame = buildProjectStats().cache() //This dataframe is written twice
 
       logger.info(s"ES content created, saving it to '${config.outputDir}'")
       dataFrame.write.json(config.outputDir)
@@ -78,22 +100,19 @@
 trait Job { self: LazyLogging =>
   implicit val codec = Codec.ISO8859
 
-  def run()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
-    import spark.sqlContext.implicits._ // toDF
+  def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
+    import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
+
     implicit val sc: SparkContext = spark.sparkContext
 
-    val projects = GerritProjectsRDD(Source.fromURL(config.gerritProjectsUrl))
+    val projects = GerritProjectsSupport.parseJsonProjectListResponse(Source.fromURL(config.gerritProjectsUrl))
+
+    logger.info(s"Loaded a list of ${projects.size} projects ${if(projects.size > 20) projects.take(20).mkString("[", ",", ", ...]") else projects.mkString("[", ",", "]")}")
 
     val aliasesDF = getAliasDF(config.emailAlias)
 
-    projects
-      .enrichWithSource
-      .fetchRawContributors
-      .toDF("project", "json")
-      .transformCommitterInfo
-      .addOrganization()
-      .handleAliases(aliasesDF)
-      .convertDates("last_commit_date")
+    getContributorStatsFromAnalyticsPlugin(spark.sparkContext.parallelize(projects), config.contributorsUrl)
+      .dashboardStats(aliasesDF)
 
   }
 
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
index 8134544..316847f 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
@@ -14,12 +14,15 @@
 
 package com.gerritforge.analytics.model
 
+import java.time.format.DateTimeFormatter
+import java.time.{LocalDate, ZoneId}
+
 case class GerritEndpointConfig(baseUrl: String = "",
                                 prefix: Option[String] = None,
                                 outputDir: String = s"file://${System.getProperty("java.io.tmpdir")}/analytics-${System.nanoTime()}",
                                 elasticIndex: Option[String] = None,
-                                since: Option[String] = None,
-                                until: Option[String] = None,
+                                since: Option[LocalDate] = None,
+                                until: Option[LocalDate] = None,
                                 aggregate: Option[String] = None,
                                 emailAlias: Option[String] = None) {
 
@@ -31,7 +34,9 @@
     }
   }
 
-  val queryString = Seq("since" -> since, "until" -> until, "aggregate" -> aggregate)
+  @transient
+  private lazy val format = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"))
+  val queryString = Seq("since" -> since.map(format.format), "until" -> until.map(format.format), "aggregate" -> aggregate)
     .flatMap(queryOpt).mkString("?", "&", "")
 
   def contributorsUrl(projectName: String) =
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala b/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
index ad8ca50..522870c 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
@@ -14,29 +14,27 @@
 
 package com.gerritforge.analytics.model
 
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
 import org.json4s.native.JsonMethods.parse
 
 import scala.io.Source
 
 case class GerritProject(id: String, name: String)
 
-object GerritProjectsRDD {
+object GerritProjectsSupport {
 
   val GERRIT_PREFIX = ")]}'\n"
   private val GERRIT_PREFIX_LEN = GERRIT_PREFIX.length
 
-  def apply(jsonSource: Source)(implicit sc: SparkContext): RDD[GerritProject] =
-    sc.parallelize(
-      parse(jsonSource.drop(GERRIT_PREFIX_LEN).mkString)
-        .values
-        .asInstanceOf[Map[String, Map[String, String]]]
-        .mapValues(_ ("id"))
-        .toSeq)
+  def parseJsonProjectListResponse(jsonSource: Source): Seq[GerritProject] = {
+    parse(jsonSource.drop(GERRIT_PREFIX_LEN).mkString)
+      .values
+      .asInstanceOf[Map[String, Map[String, String]]]
+      .mapValues(projectAttributes => projectAttributes("id"))
+      .toSeq
       .map {
-        case (id, name) => GerritProject(id, name)
+        case (name, id) => GerritProject(id, name)
       }
+  }
 }
 
 case class ProjectContributionSource(name: String, contributorsUrl: String)
\ No newline at end of file
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 44c134f..f8c0fab 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -17,7 +17,7 @@
 import java.io.{File, FileWriter}
 
 import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
-import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProject, GerritProjectsRDD, ProjectContributionSource}
+import com.gerritforge.analytics.model.{GerritProject, GerritProjectsSupport, ProjectContributionSource}
 import org.apache.spark.sql.Row
 import org.json4s.JsonDSL._
 import org.json4s._
@@ -26,12 +26,11 @@
 
 import scala.io.Source
 
-class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers
-  with SparkTestSupport with Inside {
+class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers with SparkTestSupport with Inside {
 
   "GerritProjects" should "parse JSON into a GerritProject objects" in {
 
-    val projects = GerritProjectsRDD(Source.fromString(
+    val projects = GerritProjectsSupport.parseJsonProjectListResponse(Source.fromString(
       """)]}'
         |{
         | "All-Projects-name": {
@@ -41,28 +40,25 @@
         |   "id": "Test-id",
         | }
         |}
-        |""".stripMargin)).collect()
+        |""".stripMargin))
 
-    projects should have size 2
-    projects should contain
-    allOf(GerritProject("All-Projects-id", "All-Projects-name"), GerritProject("Test-id", "Test-name"))
+    projects should contain only(GerritProject("All-Projects-id", "All-Projects-name"), GerritProject("Test-id", "Test-name"))
   }
 
 
   "enrichWithSource" should "enrich project RDD object with its source" in {
 
     val projectRdd = sc.parallelize(Seq(GerritProject("project-id", "project-name")))
-    implicit val config = GerritEndpointConfig("http://somewhere.com")
 
     val projectWithSource = projectRdd
-      .enrichWithSource
+      .enrichWithSource(projectId => s"http://somewhere.com/$projectId")
       .collect
 
     projectWithSource should have size 1
     inside(projectWithSource.head) {
       case ProjectContributionSource(projectName, url) => {
         projectName should be("project-name")
-        url should startWith("http://somewhere.com")
+        url shouldBe "http://somewhere.com/project-id"
       }
     }
   }
diff --git a/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala b/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
index d800069..5f89e78 100644
--- a/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
+++ b/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
@@ -14,14 +14,20 @@
 
 package com.gerritforge.analytics
 
-import org.apache.log4j.{Level, Logger}
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.scalatest.{BeforeAndAfterAll, Suite}
 
-trait SparkTestSupport {
-  Logger.getLogger("org").setLevel(Level.ERROR)
-  implicit val spark = SparkSession.builder()
+trait SparkTestSupport extends BeforeAndAfterAll { this: Suite =>
+
+  implicit val spark : SparkSession = SparkSession.builder()
     .master("local[4]")
     .getOrCreate()
-  implicit val sc = spark.sparkContext
-  implicit val sql = spark.sqlContext
+
+  implicit lazy val sc: SparkContext = spark.sparkContext
+  implicit lazy val sql: SQLContext = spark.sqlContext
+
+  override protected def afterAll() = {
+    spark.close()
+  }
 }
\ No newline at end of file