Allow the installation of analytics-etl as plugin

Introduce an SSH command to start the extraction of Gerrit Analytics
and re-package the fat-jar as Gerrit plugin.
To run the Spark job inside Gerrit, requires to have the Spark
libraries linked into the /lib directory beforehand.

Keep the ability to still run the Job in a Spark Cluster by keeping
the main entry point and the Spark dependencies as provided.

Bug: Issue 9021
Change-Id: I66a051250242b1534400d6baacad0cdaaf177266
diff --git a/build.sbt b/build.sbt
index 011bdc1..4ce7153 100644
--- a/build.sbt
+++ b/build.sbt
@@ -13,6 +13,10 @@
 
 val sparkVersion = "2.1.1"
 
+val gerritApiVersion = "2.13.7"
+
+val pluginName = "analytics-etl"
+
 libraryDependencies ++= Seq(
   "org.apache.spark" %% "spark-core" % sparkVersion % "provided"
     exclude("org.spark-project.spark", "unused"),
@@ -21,6 +25,7 @@
     excludeAll ExclusionRule(organization = "org.apache.spark"),
   // json4s still needed by GerritProjects
   "org.json4s" %% "json4s-native" % "3.2.11",
+  "com.google.gerrit" % "gerrit-plugin-api" % gerritApiVersion % Provided withSources(),
 
   "com.typesafe.scala-logging" %% "scala-logging" % "3.7.2",
 
@@ -31,6 +36,8 @@
 
 mainClass in (Compile,run) := Some("com.gerritforge.analytics.job.Main")
 
+assemblyJarName in assembly := s"${name.value}.jar"
+
 parallelExecution in Test := false
 
 dockerfile in docker := {
@@ -55,4 +62,13 @@
   )
 )
 
-buildOptions in docker := BuildOptions(cache = false)
\ No newline at end of file
+buildOptions in docker := BuildOptions(cache = false)
+
+packageOptions in(Compile, packageBin) += Package.ManifestAttributes(
+  ("Gerrit-ApiType", "plugin"),
+  ("Gerrit-PluginName", pluginName),
+  ("Gerrit-Module", "com.gerritforge.analytics.plugin.Module"),
+  ("Gerrit-SshModule", "com.gerritforge.analytics.plugin.SshModule"),
+  ("Implementation-Title", "Analytics ETL plugin"),
+  ("Implementation-URL", "https://gerrit.googlesource.com/plugins/analytics-etl")
+)
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index b52540f..8aa89d2 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -31,7 +31,7 @@
 
   implicit class PimpedGerritProjectRDD(val rdd: RDD[GerritProject]) extends AnyVal {
 
-    def enrichWithSource(projectToContributorsAnalyticsUrlFactory: String => String): RDD[ProjectContributionSource] = {
+    def enrichWithSource(projectToContributorsAnalyticsUrlFactory: String => Option[String]): RDD[ProjectContributionSource] = {
       rdd.map { project =>
         ProjectContributionSource(project.name, projectToContributorsAnalyticsUrlFactory(project.id))
       }
@@ -45,7 +45,11 @@
       .filterNot(_.trim.isEmpty)
   }
 
-  def getProjectJsonContributorsArray(project: String, sourceURL: String): Array[(String, String)] = {
+  def getProjectJsonContributorsArray(project: String, sourceURL: Option[String]): Array[(String, String)] = {
+    sourceURL.toArray.flatMap(getProjectJsonContributorsArrayFromUrl(project, _))
+  }
+
+  def getProjectJsonContributorsArrayFromUrl(project: String, sourceURL: String): Array[(String, String)] = {
     try {
       getLinesFromURL(sourceURL)
         .map(s => (project, s))
@@ -187,7 +191,7 @@
       ZoneOffset.UTC, ZoneId.of("Z")
     ) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
 
-  def getContributorStatsFromAnalyticsPlugin(projects: RDD[GerritProject], projectToContributorsAnalyticsUrlFactory: String => String)(implicit spark: SparkSession) = {
+  def getContributorStatsFromAnalyticsPlugin(projects: RDD[GerritProject], projectToContributorsAnalyticsUrlFactory: String => Option[String])(implicit spark: SparkSession) = {
     import spark.sqlContext.implicits._ // toDF
 
     projects
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index 3d1c4d4..e6f322d 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -18,7 +18,7 @@
 
 import com.gerritforge.analytics.engine.events.GerritEventsTransformations.NotParsableJsonEvent
 import com.gerritforge.analytics.engine.events.{AggregationStrategy, EventParser, GerritJsonEvent}
-import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjectsSupport}
+import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
 import com.gerritforge.analytics.support.ops.AnalyticsTimeOps
 import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
 import com.typesafe.scalalogging.LazyLogging
@@ -32,7 +32,7 @@
 import scala.util.control.NonFatal
 import scala.util.{Failure, Success}
 
-object Main extends App with Job with LazyLogging {
+object Main extends App with Job with LazyLogging with FetchRemoteProjects {
 
   private val fileExists: String => Either[String, Unit] = { path =>
     if (!new java.io.File(path).exists) {
@@ -56,7 +56,7 @@
   private val cliOptionParser: OptionParser[GerritEndpointConfig] = new scopt.OptionParser[GerritEndpointConfig]("scopt") {
     head("scopt", "3.x")
     opt[String]('u', "url") optional() action { (x, c) =>
-      c.copy(baseUrl = x)
+      c.copy(baseUrl = Some(x))
     } text "gerrit url"
     opt[String]('p', "prefix") optional() action { (p, c) =>
       c.copy(prefix = Some(p))
@@ -109,8 +109,7 @@
   }
 }
 
-trait Job {
-  self: LazyLogging =>
+trait Job { self: LazyLogging with FetchProjects =>
   implicit val codec = Codec.ISO8859
 
   def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
@@ -129,7 +128,7 @@
         }
       }.getOrElse(AggregationStrategy.aggregateByEmail)
 
-    val projects = GerritProjectsSupport.parseJsonProjectListResponse(Source.fromURL(config.gerritProjectsUrl))
+    val projects = fetchProjects(config)
 
     logger.info(s"Loaded a list of ${projects.size} projects ${if (projects.size > 20) projects.take(20).mkString("[", ",", ", ...]") else projects.mkString("[", ",", "]")}")
 
@@ -168,13 +167,17 @@
 
     val statsFromEvents = getContributorStatsFromGerritEvents(repositoryAlteringEvents, statsFromAnalyticsPlugin.commitSet.rdd, aggregationStrategy)
 
-    require(statsFromAnalyticsPlugin.schema == statsFromEvents.schema,
-      s""" Schemas from the stats collected from events and from the analytics datasets differs!!
-         | From analytics plugin: ${statsFromAnalyticsPlugin.schema}
-         | From gerrit events: ${statsFromEvents.schema}
+    if (statsFromEvents.head(1).isEmpty) {
+      statsFromAnalyticsPlugin
+    } else {
+      require(statsFromAnalyticsPlugin.schema == statsFromEvents.schema,
+        s""" Schemas from the stats collected from events and from the analytics datasets differs!!
+           | From analytics plugin: ${statsFromAnalyticsPlugin.schema}
+           | From gerrit events: ${statsFromEvents.schema}
       """.stripMargin)
 
-    (statsFromAnalyticsPlugin union statsFromEvents).dashboardStats(aliasesDF)
+      (statsFromAnalyticsPlugin union statsFromEvents)
+    }.dashboardStats(aliasesDF)
   }
 
   def loadEvents(implicit config: GerritEndpointConfig, spark: SparkSession): RDD[Either[NotParsableJsonEvent, GerritJsonEvent]] = { // toDF
@@ -198,3 +201,14 @@
   }
 }
 
+trait FetchProjects {
+  def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject]
+}
+
+trait FetchRemoteProjects extends FetchProjects {
+
+  def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
+    config.gerritProjectsUrl.toSeq.flatMap { url => GerritProjectsSupport.parseJsonProjectListResponse(Source.fromURL(url)) }
+  }
+}
+
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
index c3ea2be..626a1f7 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
@@ -19,7 +19,7 @@
 
 import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
 
-case class GerritEndpointConfig(baseUrl: String = "",
+case class GerritEndpointConfig(baseUrl: Option[String] = None,
                                 prefix: Option[String] = None,
                                 outputDir: String = s"file://${System.getProperty("java.io.tmpdir")}/analytics-${System.nanoTime()}",
                                 elasticIndex: Option[String] = None,
@@ -31,7 +31,7 @@
                                 eventsFailureOutputPath: Option[String] = None
                                ) {
 
-  val gerritProjectsUrl: String = s"${baseUrl}/projects/" + prefix.fold("")("?p=" + _)
+  val gerritProjectsUrl: Option[String] = baseUrl.map { url => s"${url}/projects/" + prefix.fold("")("?p=" + _) }
 
   def queryOpt(opt: (String, Option[String])): Option[String] = {
     opt match {
@@ -44,6 +44,6 @@
   val queryString = Seq("since" -> since.map(format.format), "until" -> until.map(format.format), "aggregate" -> aggregate)
     .flatMap(queryOpt).mkString("?", "&", "")
 
-  def contributorsUrl(projectName: String) =
-    s"$baseUrl/projects/$projectName/analytics~contributors$queryString"
+  def contributorsUrl(projectName: String): Option[String] =
+    baseUrl.map { url => s"$url/projects/$projectName/analytics~contributors$queryString" }
 }
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala b/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
index 522870c..139791d 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
@@ -14,12 +14,26 @@
 
 package com.gerritforge.analytics.model
 
+import com.google.gerrit.extensions.api.GerritApi
+import com.google.inject.Inject
 import org.json4s.native.JsonMethods.parse
 
 import scala.io.Source
+import scala.util.Try
 
 case class GerritProject(id: String, name: String)
 
+class GerritProjectsSupport @Inject()(gerritApi: GerritApi) {
+
+  def getProject(projectName: String): Try[GerritProject] = {
+    val projectApi = gerritApi.projects().name(projectName)
+    Try {
+      val project = projectApi.get()
+      GerritProject(project.id, project.name)
+    }
+  }
+}
+
 object GerritProjectsSupport {
 
   val GERRIT_PREFIX = ")]}'\n"
@@ -37,4 +51,4 @@
   }
 }
 
-case class ProjectContributionSource(name: String, contributorsUrl: String)
\ No newline at end of file
+case class ProjectContributionSource(name: String, contributorsUrl: Option[String])
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/GerritConfigSupport.scala b/src/main/scala/com/gerritforge/analytics/plugin/GerritConfigSupport.scala
new file mode 100644
index 0000000..c980431
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/plugin/GerritConfigSupport.scala
@@ -0,0 +1,27 @@
+package com.gerritforge.analytics.plugin
+
+import com.google.gerrit.server.config.GerritServerConfig
+import com.google.inject.Inject
+import com.typesafe.scalalogging.LazyLogging
+import org.eclipse.jgit.lib.Config
+
+class GerritConfigSupport @Inject()(@GerritServerConfig val cfg: Config) extends LazyLogging {
+
+  def getListenUrl(): Option[String] = {
+    val listenUrl = cfg.getString("httpd", null, "listenUrl")
+    val portRegex = "(proxy-)?https?://[^:]+:([0-9]+)/(.*)".r
+    listenUrl match {
+      case portRegex(_, port, path) =>
+        val url = s"http://127.0.0.1:$port/$path"
+        if(url.endsWith("/")) {
+          Some(url.dropRight(1))
+        } else {
+          Some(url)
+        }
+      case _ => {
+        logger.warn(s"Unable to extract local Gerrit URL from $listenUrl")
+        None
+      }
+    }
+  }
+}
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/Module.scala b/src/main/scala/com/gerritforge/analytics/plugin/Module.scala
new file mode 100644
index 0000000..a42e44e
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/plugin/Module.scala
@@ -0,0 +1,7 @@
+package com.gerritforge.analytics.plugin
+
+import com.google.inject.AbstractModule
+
+class Module extends AbstractModule {
+  override def configure() {}
+}
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/SshModule.scala b/src/main/scala/com/gerritforge/analytics/plugin/SshModule.scala
new file mode 100644
index 0000000..290b1ff
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/plugin/SshModule.scala
@@ -0,0 +1,9 @@
+package com.gerritforge.analytics.plugin
+
+import com.google.gerrit.sshd.PluginCommandModule
+
+class SshModule extends PluginCommandModule {
+  override protected def configureCommands {
+    command(classOf[StartCommand])
+  }
+}
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala b/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala
new file mode 100644
index 0000000..f09fe2d
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala
@@ -0,0 +1,107 @@
+package com.gerritforge.analytics.plugin
+
+import java.sql.Timestamp
+import java.time.LocalDate
+
+import com.gerritforge.analytics.engine.events.AggregationStrategy
+import com.gerritforge.analytics.job.{FetchProjects, Job}
+import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
+import com.google.gerrit.server.project.ProjectControl
+import com.google.gerrit.sshd.{CommandMetaData, SshCommand}
+import com.google.inject.Inject
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
+import org.kohsuke.args4j.{Argument, Option => ArgOption}
+
+import scala.io.Source
+import scala.util.{Failure, Success}
+
+@CommandMetaData(name = "start", description = "Start the extraction of Gerrit analytics")
+class StartCommand @Inject()(implicit val gerritProjects: GerritProjectsSupport, val gerritConfig: GerritConfigSupport)
+  extends SshCommand with Job with DateConversions with FetchProjects with LazyLogging {
+
+  @Argument(index = 0, required = true, metaVar = "PROJECT", usage = "project name")
+  var projectControl: ProjectControl = null
+
+  @ArgOption(name = "--elasticIndex", aliases = Array("-e"),
+    usage = "index name")
+  var elasticIndex: String = null
+
+  @ArgOption(name = "--since", aliases = Array("-s"), usage = "begin date")
+  var beginDate: Timestamp = NO_TIMESTAMP
+
+  @ArgOption(name = "--until", aliases = Array("-u"), usage = "end date")
+  var endDate: Timestamp = NO_TIMESTAMP
+
+  @ArgOption(name = "--aggregate", aliases = Array("-g"), usage = "aggregate email/email_hour/email_day/email_month/email_year")
+  var aggregate: String = "email_day"
+
+  @ArgOption(name = "--email-aliases", aliases = Array("-a"), usage = "\"emails to author alias\" input data path")
+  var emailAlias: String = null
+
+  override def run() {
+    implicit val config = GerritEndpointConfig(gerritConfig.getListenUrl(), prefix = Option(projectControl).map(_.getProject.getName), "", elasticIndex,
+      beginDate, endDate, aggregate, emailAlias)
+
+    implicit val spark: SparkSession = SparkSession.builder()
+      .appName("analytics-etl")
+      .master("local")
+      .config("key", "value")
+      .getOrCreate()
+
+    implicit lazy val sc: SparkContext = spark.sparkContext
+    implicit lazy val sql: SQLContext = spark.sqlContext
+
+    val prevClassLoader = Thread.currentThread().getContextClassLoader
+    Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
+    try {
+      stdout.println(s"Starting new Spark job with parameters: $config")
+      stdout.flush()
+      val startTs = System.currentTimeMillis
+      val projectStats = buildProjectStats().cache()
+      val numRows = projectStats.count()
+
+        import org.elasticsearch.spark.sql._
+        config.elasticIndex.foreach { esIndex =>
+          stdout.println(s"$numRows rows extracted. Posting Elasticsearch at '${config.elasticIndex}'")
+          stdout.flush()
+          projectStats.saveToEs(esIndex)
+        }
+
+      val elaspsedTs = (System.currentTimeMillis - startTs) / 1000L
+      stdout.println(s"Job COMPLETED in $elaspsedTs secs")
+    } catch {
+      case e: Throwable =>
+        e.printStackTrace()
+        stderr.println(s"Job FAILED: ${e.getClass} : ${e.getMessage}")
+        die(e)
+    } finally {
+      Thread.currentThread().setContextClassLoader(prevClassLoader)
+    }
+  }
+
+  def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
+    config.prefix.toSeq.flatMap(projectName => gerritProjects.getProject(projectName) match {
+      case Success(project) =>
+        Seq(project)
+      case Failure(e) => {
+        logger.warn(s"Unable to fetch project $projectName", e)
+        Seq()
+      }
+    })
+  }
+}
+
+trait DateConversions {
+  val NO_TIMESTAMP = new Timestamp(0L)
+
+  implicit def timestampToLocalDate(timestamp: Timestamp): Option[LocalDate] = timestamp match {
+    case NO_TIMESTAMP => None
+    case ts => Some(ts.toLocalDateTime.toLocalDate)
+  }
+
+  implicit def nullableStringToOption(nullableString: String): Option[String] = Option(nullableString)
+}
+
+
diff --git a/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala b/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
index f99697f..6cb59d0 100644
--- a/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
+++ b/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
@@ -67,4 +67,14 @@
 
   }
 
+  trait DateConversions {
+    val NO_TIMESTAMP = new Timestamp(0L)
+
+    implicit def timestampToLocalDate(timestamp: Timestamp): Option[LocalDate] = timestamp match {
+      case NO_TIMESTAMP => None
+      case ts => Some(ts.toLocalDateTime.toLocalDate)
+    }
+
+    implicit def nullableStringToOption(nullableString: String): Option[String] = Option(nullableString)
+  }
 }
\ No newline at end of file
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 0e4c1dd..31d37c4 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -53,14 +53,14 @@
     val projectRdd = sc.parallelize(Seq(GerritProject("project-id", "project-name")))
 
     val projectWithSource = projectRdd
-      .enrichWithSource(projectId => s"http://somewhere.com/$projectId")
+      .enrichWithSource(projectId => Some(s"http://somewhere.com/$projectId"))
       .collect
 
     projectWithSource should have size 1
     inside(projectWithSource.head) {
       case ProjectContributionSource(projectName, url) => {
         projectName should be("project-name")
-        url shouldBe "http://somewhere.com/project-id"
+        url should contain("http://somewhere.com/project-id")
       }
     }
   }
@@ -308,13 +308,13 @@
 
   }
 
-  private def newSource(contributorsJson: JObject*): String = {
+  private def newSource(contributorsJson: JObject*): Option[String] = {
     val tmpFile = File.createTempFile(System.getProperty("java.io.tmpdir"),
       s"${getClass.getName}-${System.nanoTime()}")
 
     val out = new OutputStreamWriter(new FileOutputStream(tmpFile), StandardCharsets.UTF_8)
     contributorsJson.foreach(json => out.write(compact(render(json)) + '\n'))
     out.close
-    tmpFile.toURI.toString
+    Some(tmpFile.toURI.toString)
   }
 }
diff --git a/src/test/scala/com/gerritforge/analytics/model/GerritEndpointConfigTest.scala b/src/test/scala/com/gerritforge/analytics/model/GerritEndpointConfigTest.scala
index 5603550..591184b 100644
--- a/src/test/scala/com/gerritforge/analytics/model/GerritEndpointConfigTest.scala
+++ b/src/test/scala/com/gerritforge/analytics/model/GerritEndpointConfigTest.scala
@@ -20,13 +20,13 @@
 
   "gerritProjectsUrl" should "contain prefix when available" in {
     val prefix = "prefixMustBeThere"
-    val conf = GerritEndpointConfig(baseUrl = "testBaseUrl", prefix = Some(prefix))
-    conf.gerritProjectsUrl shouldBe s"testBaseUrl/projects/?p=$prefix"
+    val conf = GerritEndpointConfig(baseUrl = Some("testBaseUrl"), prefix = Some(prefix))
+    conf.gerritProjectsUrl should contain(s"testBaseUrl/projects/?p=$prefix")
   }
 
   it should "not contain prefix when not available" in {
-    val conf = GerritEndpointConfig(baseUrl = "testBaseUrl", prefix = None)
-    conf.gerritProjectsUrl shouldBe s"testBaseUrl/projects/"
+    val conf = GerritEndpointConfig(baseUrl = Some("testBaseUrl"), prefix = None)
+    conf.gerritProjectsUrl should contain(s"testBaseUrl/projects/")
   }
 
 }