blob: 1ce5b7335cf647ed38fea3d37bf3fa86a61ce6a7 [file] [log] [blame]
// Copyright (C) 2017 GerritForge Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.gerritforge.analytics.job
import java.time.format.DateTimeFormatter
import java.time.{LocalDate, ZoneId}
import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjectsSupport}
import com.typesafe.scalalogging.LazyLogging
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import scopt.Read.reads
import scopt.{OptionParser, Read}
import scala.io.{Codec, Source}
import scala.util.control.NonFatal
object Main extends App with Job with LazyLogging {
private val fileExists: String => Either[String, Unit] = { path =>
if (!new java.io.File(path).exists) {
Left(s"ERROR: Path '$path' doesn't exists!")
} else {
Right()
}
}
implicit val localDateRead: Read[LocalDate] = reads { str =>
val format = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"))
try {
LocalDate.parse(str, format)
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Invalid date '$str' expected format is '$format'")
}
}
private val cliOptionParser: OptionParser[GerritEndpointConfig] = new scopt.OptionParser[GerritEndpointConfig]("scopt") {
head("scopt", "3.x")
opt[String]('u', "url") optional() action { (x, c) =>
c.copy(baseUrl = x)
} text "gerrit url"
opt[String]('p', "prefix") optional() action { (p, c) =>
c.copy(prefix = Some(p))
} text "projects prefix"
opt[String]('o', "out") optional() action { (x, c) =>
c.copy(outputDir = x)
} text "output directory"
opt[String]('e', "elasticIndex") optional() action { (x, c) =>
c.copy(elasticIndex = Some(x))
} text "output directory"
opt[LocalDate]('s', "since") optional() action { (x, c) =>
c.copy(since = Some(x))
} text "begin date "
opt[LocalDate]('u', "until") optional() action { (x, c) =>
c.copy(until = Some(x))
} text "since date"
opt[String]('g', "aggregate") optional() action { (x, c) =>
c.copy(aggregate = Some(x))
} text "aggregate email/email_hour/email_day/email_month/email_year"
opt[String]('a', "email-aliases") optional() validate fileExists action { (path, c) =>
c.copy(emailAlias = Some(path))
} text "\"emails to author alias\" input data path"
}
cliOptionParser.parse(args, GerritEndpointConfig()) match {
case Some(config) =>
implicit val spark: SparkSession = SparkSession.builder()
.appName("Gerrit Analytics ETL")
.getOrCreate()
implicit val _: GerritEndpointConfig = config
logger.info(s"Starting analytics app with config $config")
val dataFrame = buildProjectStats().cache() //This dataframe is written twice
logger.info(s"ES content created, saving it to '${config.outputDir}'")
dataFrame.write.json(config.outputDir)
saveES(dataFrame)
case None => // invalid configuration usage has been displayed
}
}
trait Job { self: LazyLogging =>
implicit val codec = Codec.ISO8859
def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
implicit val sc: SparkContext = spark.sparkContext
val projects = GerritProjectsSupport.parseJsonProjectListResponse(Source.fromURL(config.gerritProjectsUrl))
logger.info(s"Loaded a list of ${projects.size} projects ${if(projects.size > 20) projects.take(20).mkString("[", ",", ", ...]") else projects.mkString("[", ",", "]")}")
val aliasesDF = getAliasDF(config.emailAlias)
getContributorStatsFromAnalyticsPlugin(spark.sparkContext.parallelize(projects), config.contributorsUrl)
.dashboardStats(aliasesDF)
}
def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
import org.elasticsearch.spark.sql._
config.elasticIndex.foreach { esIndex =>
logger.info(s"ES content created, saving it to elastic search instance at '${config.elasticIndex}'")
df.saveToEs(esIndex)
}
}
}