Allow the installation of analytics-etl as plugin
Introduce an SSH command to start the extraction of Gerrit Analytics
and re-package the fat-jar as Gerrit plugin.
To run the Spark job inside Gerrit, requires to have the Spark
libraries linked into the /lib directory beforehand.
Keep the ability to still run the Job in a Spark Cluster by keeping
the main entry point and the Spark dependencies as provided.
Bug: Issue 9021
Change-Id: I66a051250242b1534400d6baacad0cdaaf177266
diff --git a/build.sbt b/build.sbt
index 011bdc1..4ce7153 100644
--- a/build.sbt
+++ b/build.sbt
@@ -13,6 +13,10 @@
val sparkVersion = "2.1.1"
+val gerritApiVersion = "2.13.7"
+
+val pluginName = "analytics-etl"
+
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided"
exclude("org.spark-project.spark", "unused"),
@@ -21,6 +25,7 @@
excludeAll ExclusionRule(organization = "org.apache.spark"),
// json4s still needed by GerritProjects
"org.json4s" %% "json4s-native" % "3.2.11",
+ "com.google.gerrit" % "gerrit-plugin-api" % gerritApiVersion % Provided withSources(),
"com.typesafe.scala-logging" %% "scala-logging" % "3.7.2",
@@ -31,6 +36,8 @@
mainClass in (Compile,run) := Some("com.gerritforge.analytics.job.Main")
+assemblyJarName in assembly := s"${name.value}.jar"
+
parallelExecution in Test := false
dockerfile in docker := {
@@ -55,4 +62,13 @@
)
)
-buildOptions in docker := BuildOptions(cache = false)
\ No newline at end of file
+buildOptions in docker := BuildOptions(cache = false)
+
+packageOptions in(Compile, packageBin) += Package.ManifestAttributes(
+ ("Gerrit-ApiType", "plugin"),
+ ("Gerrit-PluginName", pluginName),
+ ("Gerrit-Module", "com.gerritforge.analytics.plugin.Module"),
+ ("Gerrit-SshModule", "com.gerritforge.analytics.plugin.SshModule"),
+ ("Implementation-Title", "Analytics ETL plugin"),
+ ("Implementation-URL", "https://gerrit.googlesource.com/plugins/analytics-etl")
+)
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index b52540f..8aa89d2 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -31,7 +31,7 @@
implicit class PimpedGerritProjectRDD(val rdd: RDD[GerritProject]) extends AnyVal {
- def enrichWithSource(projectToContributorsAnalyticsUrlFactory: String => String): RDD[ProjectContributionSource] = {
+ def enrichWithSource(projectToContributorsAnalyticsUrlFactory: String => Option[String]): RDD[ProjectContributionSource] = {
rdd.map { project =>
ProjectContributionSource(project.name, projectToContributorsAnalyticsUrlFactory(project.id))
}
@@ -45,7 +45,11 @@
.filterNot(_.trim.isEmpty)
}
- def getProjectJsonContributorsArray(project: String, sourceURL: String): Array[(String, String)] = {
+ def getProjectJsonContributorsArray(project: String, sourceURL: Option[String]): Array[(String, String)] = {
+ sourceURL.toArray.flatMap(getProjectJsonContributorsArrayFromUrl(project, _))
+ }
+
+ def getProjectJsonContributorsArrayFromUrl(project: String, sourceURL: String): Array[(String, String)] = {
try {
getLinesFromURL(sourceURL)
.map(s => (project, s))
@@ -187,7 +191,7 @@
ZoneOffset.UTC, ZoneId.of("Z")
) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
- def getContributorStatsFromAnalyticsPlugin(projects: RDD[GerritProject], projectToContributorsAnalyticsUrlFactory: String => String)(implicit spark: SparkSession) = {
+ def getContributorStatsFromAnalyticsPlugin(projects: RDD[GerritProject], projectToContributorsAnalyticsUrlFactory: String => Option[String])(implicit spark: SparkSession) = {
import spark.sqlContext.implicits._ // toDF
projects
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index 3d1c4d4..e6f322d 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -18,7 +18,7 @@
import com.gerritforge.analytics.engine.events.GerritEventsTransformations.NotParsableJsonEvent
import com.gerritforge.analytics.engine.events.{AggregationStrategy, EventParser, GerritJsonEvent}
-import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjectsSupport}
+import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
import com.gerritforge.analytics.support.ops.AnalyticsTimeOps
import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
import com.typesafe.scalalogging.LazyLogging
@@ -32,7 +32,7 @@
import scala.util.control.NonFatal
import scala.util.{Failure, Success}
-object Main extends App with Job with LazyLogging {
+object Main extends App with Job with LazyLogging with FetchRemoteProjects {
private val fileExists: String => Either[String, Unit] = { path =>
if (!new java.io.File(path).exists) {
@@ -56,7 +56,7 @@
private val cliOptionParser: OptionParser[GerritEndpointConfig] = new scopt.OptionParser[GerritEndpointConfig]("scopt") {
head("scopt", "3.x")
opt[String]('u', "url") optional() action { (x, c) =>
- c.copy(baseUrl = x)
+ c.copy(baseUrl = Some(x))
} text "gerrit url"
opt[String]('p', "prefix") optional() action { (p, c) =>
c.copy(prefix = Some(p))
@@ -109,8 +109,7 @@
}
}
-trait Job {
- self: LazyLogging =>
+trait Job { self: LazyLogging with FetchProjects =>
implicit val codec = Codec.ISO8859
def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
@@ -129,7 +128,7 @@
}
}.getOrElse(AggregationStrategy.aggregateByEmail)
- val projects = GerritProjectsSupport.parseJsonProjectListResponse(Source.fromURL(config.gerritProjectsUrl))
+ val projects = fetchProjects(config)
logger.info(s"Loaded a list of ${projects.size} projects ${if (projects.size > 20) projects.take(20).mkString("[", ",", ", ...]") else projects.mkString("[", ",", "]")}")
@@ -168,13 +167,17 @@
val statsFromEvents = getContributorStatsFromGerritEvents(repositoryAlteringEvents, statsFromAnalyticsPlugin.commitSet.rdd, aggregationStrategy)
- require(statsFromAnalyticsPlugin.schema == statsFromEvents.schema,
- s""" Schemas from the stats collected from events and from the analytics datasets differs!!
- | From analytics plugin: ${statsFromAnalyticsPlugin.schema}
- | From gerrit events: ${statsFromEvents.schema}
+ if (statsFromEvents.head(1).isEmpty) {
+ statsFromAnalyticsPlugin
+ } else {
+ require(statsFromAnalyticsPlugin.schema == statsFromEvents.schema,
+ s""" Schemas from the stats collected from events and from the analytics datasets differs!!
+ | From analytics plugin: ${statsFromAnalyticsPlugin.schema}
+ | From gerrit events: ${statsFromEvents.schema}
""".stripMargin)
- (statsFromAnalyticsPlugin union statsFromEvents).dashboardStats(aliasesDF)
+ (statsFromAnalyticsPlugin union statsFromEvents)
+ }.dashboardStats(aliasesDF)
}
def loadEvents(implicit config: GerritEndpointConfig, spark: SparkSession): RDD[Either[NotParsableJsonEvent, GerritJsonEvent]] = { // toDF
@@ -198,3 +201,14 @@
}
}
+trait FetchProjects {
+ def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject]
+}
+
+trait FetchRemoteProjects extends FetchProjects {
+
+ def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
+ config.gerritProjectsUrl.toSeq.flatMap { url => GerritProjectsSupport.parseJsonProjectListResponse(Source.fromURL(url)) }
+ }
+}
+
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
index c3ea2be..626a1f7 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
@@ -19,7 +19,7 @@
import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
-case class GerritEndpointConfig(baseUrl: String = "",
+case class GerritEndpointConfig(baseUrl: Option[String] = None,
prefix: Option[String] = None,
outputDir: String = s"file://${System.getProperty("java.io.tmpdir")}/analytics-${System.nanoTime()}",
elasticIndex: Option[String] = None,
@@ -31,7 +31,7 @@
eventsFailureOutputPath: Option[String] = None
) {
- val gerritProjectsUrl: String = s"${baseUrl}/projects/" + prefix.fold("")("?p=" + _)
+ val gerritProjectsUrl: Option[String] = baseUrl.map { url => s"${url}/projects/" + prefix.fold("")("?p=" + _) }
def queryOpt(opt: (String, Option[String])): Option[String] = {
opt match {
@@ -44,6 +44,6 @@
val queryString = Seq("since" -> since.map(format.format), "until" -> until.map(format.format), "aggregate" -> aggregate)
.flatMap(queryOpt).mkString("?", "&", "")
- def contributorsUrl(projectName: String) =
- s"$baseUrl/projects/$projectName/analytics~contributors$queryString"
+ def contributorsUrl(projectName: String): Option[String] =
+ baseUrl.map { url => s"$url/projects/$projectName/analytics~contributors$queryString" }
}
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala b/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
index 522870c..139791d 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
@@ -14,12 +14,26 @@
package com.gerritforge.analytics.model
+import com.google.gerrit.extensions.api.GerritApi
+import com.google.inject.Inject
import org.json4s.native.JsonMethods.parse
import scala.io.Source
+import scala.util.Try
case class GerritProject(id: String, name: String)
+class GerritProjectsSupport @Inject()(gerritApi: GerritApi) {
+
+ def getProject(projectName: String): Try[GerritProject] = {
+ val projectApi = gerritApi.projects().name(projectName)
+ Try {
+ val project = projectApi.get()
+ GerritProject(project.id, project.name)
+ }
+ }
+}
+
object GerritProjectsSupport {
val GERRIT_PREFIX = ")]}'\n"
@@ -37,4 +51,4 @@
}
}
-case class ProjectContributionSource(name: String, contributorsUrl: String)
\ No newline at end of file
+case class ProjectContributionSource(name: String, contributorsUrl: Option[String])
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/GerritConfigSupport.scala b/src/main/scala/com/gerritforge/analytics/plugin/GerritConfigSupport.scala
new file mode 100644
index 0000000..c980431
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/plugin/GerritConfigSupport.scala
@@ -0,0 +1,27 @@
+package com.gerritforge.analytics.plugin
+
+import com.google.gerrit.server.config.GerritServerConfig
+import com.google.inject.Inject
+import com.typesafe.scalalogging.LazyLogging
+import org.eclipse.jgit.lib.Config
+
+class GerritConfigSupport @Inject()(@GerritServerConfig val cfg: Config) extends LazyLogging {
+
+ def getListenUrl(): Option[String] = {
+ val listenUrl = cfg.getString("httpd", null, "listenUrl")
+ val portRegex = "(proxy-)?https?://[^:]+:([0-9]+)/(.*)".r
+ listenUrl match {
+ case portRegex(_, port, path) =>
+ val url = s"http://127.0.0.1:$port/$path"
+ if(url.endsWith("/")) {
+ Some(url.dropRight(1))
+ } else {
+ Some(url)
+ }
+ case _ => {
+ logger.warn(s"Unable to extract local Gerrit URL from $listenUrl")
+ None
+ }
+ }
+ }
+}
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/Module.scala b/src/main/scala/com/gerritforge/analytics/plugin/Module.scala
new file mode 100644
index 0000000..a42e44e
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/plugin/Module.scala
@@ -0,0 +1,7 @@
+package com.gerritforge.analytics.plugin
+
+import com.google.inject.AbstractModule
+
+class Module extends AbstractModule {
+ override def configure() {}
+}
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/SshModule.scala b/src/main/scala/com/gerritforge/analytics/plugin/SshModule.scala
new file mode 100644
index 0000000..290b1ff
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/plugin/SshModule.scala
@@ -0,0 +1,9 @@
+package com.gerritforge.analytics.plugin
+
+import com.google.gerrit.sshd.PluginCommandModule
+
+class SshModule extends PluginCommandModule {
+ override protected def configureCommands {
+ command(classOf[StartCommand])
+ }
+}
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala b/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala
new file mode 100644
index 0000000..f09fe2d
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala
@@ -0,0 +1,107 @@
+package com.gerritforge.analytics.plugin
+
+import java.sql.Timestamp
+import java.time.LocalDate
+
+import com.gerritforge.analytics.engine.events.AggregationStrategy
+import com.gerritforge.analytics.job.{FetchProjects, Job}
+import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
+import com.google.gerrit.server.project.ProjectControl
+import com.google.gerrit.sshd.{CommandMetaData, SshCommand}
+import com.google.inject.Inject
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
+import org.kohsuke.args4j.{Argument, Option => ArgOption}
+
+import scala.io.Source
+import scala.util.{Failure, Success}
+
+@CommandMetaData(name = "start", description = "Start the extraction of Gerrit analytics")
+class StartCommand @Inject()(implicit val gerritProjects: GerritProjectsSupport, val gerritConfig: GerritConfigSupport)
+ extends SshCommand with Job with DateConversions with FetchProjects with LazyLogging {
+
+ @Argument(index = 0, required = true, metaVar = "PROJECT", usage = "project name")
+ var projectControl: ProjectControl = null
+
+ @ArgOption(name = "--elasticIndex", aliases = Array("-e"),
+ usage = "index name")
+ var elasticIndex: String = null
+
+ @ArgOption(name = "--since", aliases = Array("-s"), usage = "begin date")
+ var beginDate: Timestamp = NO_TIMESTAMP
+
+ @ArgOption(name = "--until", aliases = Array("-u"), usage = "end date")
+ var endDate: Timestamp = NO_TIMESTAMP
+
+ @ArgOption(name = "--aggregate", aliases = Array("-g"), usage = "aggregate email/email_hour/email_day/email_month/email_year")
+ var aggregate: String = "email_day"
+
+ @ArgOption(name = "--email-aliases", aliases = Array("-a"), usage = "\"emails to author alias\" input data path")
+ var emailAlias: String = null
+
+ override def run() {
+ implicit val config = GerritEndpointConfig(gerritConfig.getListenUrl(), prefix = Option(projectControl).map(_.getProject.getName), "", elasticIndex,
+ beginDate, endDate, aggregate, emailAlias)
+
+ implicit val spark: SparkSession = SparkSession.builder()
+ .appName("analytics-etl")
+ .master("local")
+ .config("key", "value")
+ .getOrCreate()
+
+ implicit lazy val sc: SparkContext = spark.sparkContext
+ implicit lazy val sql: SQLContext = spark.sqlContext
+
+ val prevClassLoader = Thread.currentThread().getContextClassLoader
+ Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
+ try {
+ stdout.println(s"Starting new Spark job with parameters: $config")
+ stdout.flush()
+ val startTs = System.currentTimeMillis
+ val projectStats = buildProjectStats().cache()
+ val numRows = projectStats.count()
+
+ import org.elasticsearch.spark.sql._
+ config.elasticIndex.foreach { esIndex =>
+ stdout.println(s"$numRows rows extracted. Posting Elasticsearch at '${config.elasticIndex}'")
+ stdout.flush()
+ projectStats.saveToEs(esIndex)
+ }
+
+ val elaspsedTs = (System.currentTimeMillis - startTs) / 1000L
+ stdout.println(s"Job COMPLETED in $elaspsedTs secs")
+ } catch {
+ case e: Throwable =>
+ e.printStackTrace()
+ stderr.println(s"Job FAILED: ${e.getClass} : ${e.getMessage}")
+ die(e)
+ } finally {
+ Thread.currentThread().setContextClassLoader(prevClassLoader)
+ }
+ }
+
+ def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
+ config.prefix.toSeq.flatMap(projectName => gerritProjects.getProject(projectName) match {
+ case Success(project) =>
+ Seq(project)
+ case Failure(e) => {
+ logger.warn(s"Unable to fetch project $projectName", e)
+ Seq()
+ }
+ })
+ }
+}
+
+trait DateConversions {
+ val NO_TIMESTAMP = new Timestamp(0L)
+
+ implicit def timestampToLocalDate(timestamp: Timestamp): Option[LocalDate] = timestamp match {
+ case NO_TIMESTAMP => None
+ case ts => Some(ts.toLocalDateTime.toLocalDate)
+ }
+
+ implicit def nullableStringToOption(nullableString: String): Option[String] = Option(nullableString)
+}
+
+
diff --git a/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala b/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
index f99697f..6cb59d0 100644
--- a/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
+++ b/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
@@ -67,4 +67,14 @@
}
+ trait DateConversions {
+ val NO_TIMESTAMP = new Timestamp(0L)
+
+ implicit def timestampToLocalDate(timestamp: Timestamp): Option[LocalDate] = timestamp match {
+ case NO_TIMESTAMP => None
+ case ts => Some(ts.toLocalDateTime.toLocalDate)
+ }
+
+ implicit def nullableStringToOption(nullableString: String): Option[String] = Option(nullableString)
+ }
}
\ No newline at end of file
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 0e4c1dd..31d37c4 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -53,14 +53,14 @@
val projectRdd = sc.parallelize(Seq(GerritProject("project-id", "project-name")))
val projectWithSource = projectRdd
- .enrichWithSource(projectId => s"http://somewhere.com/$projectId")
+ .enrichWithSource(projectId => Some(s"http://somewhere.com/$projectId"))
.collect
projectWithSource should have size 1
inside(projectWithSource.head) {
case ProjectContributionSource(projectName, url) => {
projectName should be("project-name")
- url shouldBe "http://somewhere.com/project-id"
+ url should contain("http://somewhere.com/project-id")
}
}
}
@@ -308,13 +308,13 @@
}
- private def newSource(contributorsJson: JObject*): String = {
+ private def newSource(contributorsJson: JObject*): Option[String] = {
val tmpFile = File.createTempFile(System.getProperty("java.io.tmpdir"),
s"${getClass.getName}-${System.nanoTime()}")
val out = new OutputStreamWriter(new FileOutputStream(tmpFile), StandardCharsets.UTF_8)
contributorsJson.foreach(json => out.write(compact(render(json)) + '\n'))
out.close
- tmpFile.toURI.toString
+ Some(tmpFile.toURI.toString)
}
}
diff --git a/src/test/scala/com/gerritforge/analytics/model/GerritEndpointConfigTest.scala b/src/test/scala/com/gerritforge/analytics/model/GerritEndpointConfigTest.scala
index 5603550..591184b 100644
--- a/src/test/scala/com/gerritforge/analytics/model/GerritEndpointConfigTest.scala
+++ b/src/test/scala/com/gerritforge/analytics/model/GerritEndpointConfigTest.scala
@@ -20,13 +20,13 @@
"gerritProjectsUrl" should "contain prefix when available" in {
val prefix = "prefixMustBeThere"
- val conf = GerritEndpointConfig(baseUrl = "testBaseUrl", prefix = Some(prefix))
- conf.gerritProjectsUrl shouldBe s"testBaseUrl/projects/?p=$prefix"
+ val conf = GerritEndpointConfig(baseUrl = Some("testBaseUrl"), prefix = Some(prefix))
+ conf.gerritProjectsUrl should contain(s"testBaseUrl/projects/?p=$prefix")
}
it should "not contain prefix when not available" in {
- val conf = GerritEndpointConfig(baseUrl = "testBaseUrl", prefix = None)
- conf.gerritProjectsUrl shouldBe s"testBaseUrl/projects/"
+ val conf = GerritEndpointConfig(baseUrl = Some("testBaseUrl"), prefix = None)
+ conf.gerritProjectsUrl should contain(s"testBaseUrl/projects/")
}
}