blob: 996a5536fbcf11ff1206a72e060f295fe4002f79 [file] [log] [blame]
// Copyright (C) 2017 GerritForge Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.gerritforge.analytics.gitcommits.job
import java.time.LocalDate
import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations.NotParsableJsonEvent
import com.gerritforge.analytics.gitcommits.engine.events.{AggregationStrategy, EventParser, GerritJsonEvent}
import com.gerritforge.analytics.gitcommits.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
import com.gerritforge.analytics.spark.SparkApp
import com.gerritforge.analytics.support.ops.ReadsOps._
import com.typesafe.scalalogging.LazyLogging
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import scopt.OptionParser
import scala.io.Codec
import scala.util.{Failure, Success}
object Main extends App with SparkApp with Job with LazyLogging with FetchRemoteProjects {
override val appName = "Gerrit GitCommits Analytics ETL"
private val fileExists: String => Either[String, Unit] = { path =>
if (!new java.io.File(path).exists) {
Left(s"ERROR: Path '$path' doesn't exists!")
} else {
Right()
}
}
private val cliOptionParser: OptionParser[GerritEndpointConfig] =
new scopt.OptionParser[GerritEndpointConfig]("scopt") {
head("scopt", "3.x")
opt[String]('u', "url") optional () action { (x, c) =>
c.copy(baseUrl = Some(x))
} text "gerrit url"
opt[String]('p', "prefix") optional () action { (p, c) =>
c.copy(prefix = Some(p))
} text "projects prefix"
opt[String]('o', "out") optional () action { (x, c) =>
c.copy(outputDir = x)
} text "output directory"
opt[String]('e', "elasticIndex") optional () action { (x, c) =>
c.copy(elasticIndex = Some(x))
} text "index name"
opt[LocalDate]('s', "since") optional () action { (x, c) =>
c.copy(since = Some(x))
} text "begin date "
opt[LocalDate]('u', "until") optional () action { (x, c) =>
c.copy(until = Some(x))
} text "since date"
opt[String]('g', "aggregate") optional () action { (x, c) =>
c.copy(aggregate = Some(x))
} text "aggregate email/email_hour/email_day/email_month/email_year"
opt[String]('a', "email-aliases") optional () validate fileExists action { (path, c) =>
c.copy(emailAlias = Some(path))
} text "\"emails to author alias\" input data path"
opt[String]("events") optional () action { (eventsPath, config) =>
config.copy(eventsPath = Some(eventsPath))
} text "location where to load the Gerrit Events"
opt[String]("writeNotProcessedEventsTo") optional () action { (failedEventsPath, config) =>
config.copy(eventsFailureOutputPath = Some(failedEventsPath))
} text "location where to write a TSV file containing the events we couldn't process with a description fo the reason why"
opt[String]("username") optional () action { (input, c) =>
c.copy(username = Some(input))
} text "Gerrit API Username"
opt[String]("password") optional () action { (input, c) =>
c.copy(password = Some(input))
} text "Gerrit API Password"
opt[Boolean]('k', "ignore-ssl-cert") optional () action { (input, c) =>
c.copy(ignoreSSLCert = Some(input))
} text "Ignore SSL certificate validation"
opt[Boolean]('r', "extract-branches") optional () action { (input, c) =>
c.copy(extractBranches = Some(input))
} text "enables branches extraction for each commit"
}
cliOptionParser.parse(args, GerritEndpointConfig()) match {
case Some(config) =>
implicit val _: GerritEndpointConfig = config
logger.info(s"Starting analytics app with config $config")
val dataFrame = buildProjectStats().cache() //This dataframe is written twice
logger.info(s"ES content created, saving it to '${config.outputDir}'")
dataFrame.write.json(config.outputDir)
saveES(dataFrame)
case None => // invalid configuration usage has been displayed
}
}
trait Job {
self: LazyLogging with FetchProjects =>
implicit val codec = Codec.ISO8859
val indexType = "gitCommits"
def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations._
import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations._
implicit val sc: SparkContext = spark.sparkContext
val aggregationStrategy: AggregationStrategy =
config.aggregate
.flatMap { agg =>
AggregationStrategy.byName(agg) match {
case Success(strategy) => Some(strategy)
case Failure(e) =>
logger.warn(s"Invalid aggregation strategy '$agg", e)
None
}
}
.getOrElse(AggregationStrategy.aggregateByEmail)
val projects = fetchProjects(config)
logger.info(
s"Loaded a list of ${projects.size} projects ${if (projects.size > 20) projects.take(20).mkString("[", ",", ", ...]")
else projects.mkString("[", ",", "]")}")
val aliasesDF = getAliasDF(config.emailAlias)
val events = loadEvents
val failedEvents: RDD[NotParsableJsonEvent] = events.collect {
case Left(eventFailureDescription) => eventFailureDescription
}
if (!failedEvents.isEmpty()) {
config.eventsFailureOutputPath.foreach { failurePath =>
logger.info(s"Events failures will be stored at '$failurePath'")
import spark.implicits._
failedEvents.toDF().write.option("sep", "\t").option("header", true).csv(failurePath)
}
}
//We might want to use the time of the events as information to feed to the collection of data from the repository
val repositoryAlteringEvents = events.collect { case Right(event) => event }.repositoryWithNewRevisionEvents
val firstEventDateMaybe: Option[LocalDate] =
if (repositoryAlteringEvents.isEmpty()) None
else Some(repositoryAlteringEvents.earliestEventTime.toLocalDate)
val configWithOverriddenUntil = firstEventDateMaybe.fold(config) { firstEventDate =>
val lastAggregationDate = firstEventDate.plusMonths(1)
if (lastAggregationDate.isBefore(LocalDate.now())) {
logger.info(
s"Overriding 'until' date '${config.until}' with '$lastAggregationDate' since events are available until $firstEventDate")
config.copy(until = Some(lastAggregationDate))
} else {
config
}
}
val statsFromAnalyticsPlugin =
getContributorStatsFromAnalyticsPlugin(spark.sparkContext.parallelize(projects),
configWithOverriddenUntil.contributorsUrl,
config.gerritApiConnection)
val statsFromEvents = getContributorStatsFromGerritEvents(
repositoryAlteringEvents,
statsFromAnalyticsPlugin.commitSet.rdd,
aggregationStrategy)
val mergedEvents = if (statsFromEvents.head(1).isEmpty) {
statsFromAnalyticsPlugin
} else {
require(
statsFromAnalyticsPlugin.schema == statsFromEvents.schema,
s""" Schemas from the stats collected from events and from the analytics datasets differs!!
| From analytics plugin: ${statsFromAnalyticsPlugin.schema}
| From gerrit events: ${statsFromEvents.schema}
""".stripMargin
)
(statsFromAnalyticsPlugin union statsFromEvents)
}
mergedEvents.dashboardStats(aliasesDF)
}
def loadEvents(
implicit config: GerritEndpointConfig,
spark: SparkSession): RDD[Either[NotParsableJsonEvent, GerritJsonEvent]] = { // toDF
import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations._
config.eventsPath.fold(
spark.sparkContext.emptyRDD[Either[NotParsableJsonEvent, GerritJsonEvent]]) { eventsPath =>
spark.read
.textFile(eventsPath)
.rdd
.parseEvents(EventParser)
}
}
def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
import org.elasticsearch.spark.sql._
config.elasticIndex.foreach { esIndex =>
logger.info(
s"ES content created, saving it to elastic search instance at '${config.elasticIndex}/$indexType'")
df.saveToEs(s"$esIndex/$indexType")
}
}
}
trait FetchProjects {
def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject]
}
trait FetchRemoteProjects extends FetchProjects {
def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
config.gerritProjectsUrl.toSeq.flatMap { url =>
GerritProjectsSupport.parseJsonProjectListResponse(
config.gerritApiConnection.getContentFromApi(url))
}
}
}