blob: 9832df5fcd65d9835dd91e8923461130f36c6ef6 [file] [log] [blame]
package com.gerritforge.analytics.plugin
import java.sql.Timestamp
import java.time.LocalDate
import com.gerritforge.analytics.engine.events.AggregationStrategy
import com.gerritforge.analytics.job.{FetchProjects, Job}
import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
import com.google.gerrit.server.project.ProjectControl
import com.google.gerrit.sshd.{CommandMetaData, SshCommand}
import com.google.inject.Inject
import com.typesafe.scalalogging.LazyLogging
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.kohsuke.args4j.{Argument, Option => ArgOption}
import scala.io.Source
import scala.util.{Failure, Success}
@CommandMetaData(name = "start", description = "Start the extraction of Gerrit analytics")
class StartCommand @Inject()(implicit val gerritProjects: GerritProjectsSupport, val gerritConfig: GerritConfigSupport)
extends SshCommand with Job with DateConversions with FetchProjects with LazyLogging {
@Argument(index = 0, required = true, metaVar = "PROJECT", usage = "project name")
var projectControl: ProjectControl = null
@ArgOption(name = "--elasticIndex", aliases = Array("-e"),
usage = "index name")
var elasticIndex: String = "gerrit/analytics"
@ArgOption(name = "--since", aliases = Array("-s"), usage = "begin date")
var beginDate: Timestamp = NO_TIMESTAMP
@ArgOption(name = "--until", aliases = Array("-u"), usage = "end date")
var endDate: Timestamp = NO_TIMESTAMP
@ArgOption(name = "--aggregate", aliases = Array("-g"), usage = "aggregate email/email_hour/email_day/email_month/email_year")
var aggregate: String = "email_day"
@ArgOption(name = "--email-aliases", aliases = Array("-a"), usage = "\"emails to author alias\" input data path")
var emailAlias: String = null
@ArgOption(name = "--extract-branches", aliases = Array("-r"), usage = "enables branches extraction for each commit")
var extractBranches: Boolean = false
override def run() {
implicit val config = GerritEndpointConfig(gerritConfig.getListenUrl(), prefix = Option(projectControl).map(_.getProject.getName), "", elasticIndex,
beginDate, endDate, aggregate, emailAlias)
implicit val spark: SparkSession = SparkSession.builder()
.appName("analytics-etl")
.master("local")
.config("key", "value")
.getOrCreate()
implicit lazy val sc: SparkContext = spark.sparkContext
implicit lazy val sql: SQLContext = spark.sqlContext
val prevClassLoader = Thread.currentThread().getContextClassLoader
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
try {
stdout.println(s"Starting new Spark job with parameters: $config")
stdout.flush()
val startTs = System.currentTimeMillis
val projectStats = buildProjectStats().cache()
val numRows = projectStats.count()
import org.elasticsearch.spark.sql._
config.elasticIndex.foreach { esIndex =>
stdout.println(s"$numRows rows extracted. Posting Elasticsearch at '${config.elasticIndex}'")
stdout.flush()
projectStats.saveToEs(esIndex)
}
val elaspsedTs = (System.currentTimeMillis - startTs) / 1000L
stdout.println(s"Job COMPLETED in $elaspsedTs secs")
} catch {
case e: Throwable =>
e.printStackTrace()
stderr.println(s"Job FAILED: ${e.getClass} : ${e.getMessage}")
die(e)
} finally {
Thread.currentThread().setContextClassLoader(prevClassLoader)
}
}
def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
config.prefix.toSeq.flatMap(projectName => gerritProjects.getProject(projectName) match {
case Success(project) =>
Seq(project)
case Failure(e) => {
logger.warn(s"Unable to fetch project $projectName", e)
Seq()
}
})
}
}
trait DateConversions {
val NO_TIMESTAMP = new Timestamp(0L)
implicit def timestampToLocalDate(timestamp: Timestamp): Option[LocalDate] = timestamp match {
case NO_TIMESTAMP => None
case ts => Some(ts.toLocalDateTime.toLocalDate)
}
implicit def nullableStringToOption(nullableString: String): Option[String] = Option(nullableString)
}