Add Git commits Analytics namespace
This will allow a better code layout when adding new sources of analytics data (i.e.:audit logs).
Package name has been extended with the name of the data source: "com.gerritforge.analytics.<source>"
Feature: Issue 9983
Change-Id: I3a8d912ab34550f2a21f51f66d5ca86671f5dfe2
diff --git a/README.md b/README.md
index 2cdf5d2..44c9229 100644
--- a/README.md
+++ b/README.md
@@ -9,6 +9,7 @@
```bash
bin/spark-submit \
+ --class com.gerritforge.analytics.gitcommits.job \
--conf spark.es.nodes=es.mycompany.com \
$JARS/analytics-etl.jar \
--since 2000-06-01 \
diff --git a/build.sbt b/build.sbt
index 2a2c22e..3da5061 100644
--- a/build.sbt
+++ b/build.sbt
@@ -18,7 +18,7 @@
val pluginName = "analytics-etl"
-val mainClassPackage = "com.gerritforge.analytics.job.Main"
+val mainClassPackage = "com.gerritforge.analytics.gitcommits.job.Main"
val dockerRepository = "spark-gerrit-analytics-etl"
libraryDependencies ++= Seq(
@@ -87,8 +87,8 @@
packageOptions in(Compile, packageBin) += Package.ManifestAttributes(
("Gerrit-ApiType", "plugin"),
("Gerrit-PluginName", pluginName),
- ("Gerrit-Module", "com.gerritforge.analytics.plugin.Module"),
- ("Gerrit-SshModule", "com.gerritforge.analytics.plugin.SshModule"),
+ ("Gerrit-Module", "com.gerritforge.analytics.gitcommits.plugin.Module"),
+ ("Gerrit-SshModule", "com.gerritforge.analytics.gitcommits.plugin.SshModule"),
("Implementation-Title", "Analytics ETL plugin"),
("Implementation-URL", "https://gerrit.googlesource.com/plugins/analytics-etl")
)
diff --git a/scripts/gerrit-analytics-etl.sh b/scripts/gerrit-analytics-etl.sh
index 30cf2cf..5a0de69 100755
--- a/scripts/gerrit-analytics-etl.sh
+++ b/scripts/gerrit-analytics-etl.sh
@@ -10,7 +10,7 @@
# Optional
ES_PORT="${ES_PORT:-9200}"
SPARK_JAR_PATH="${SPARK_JAR_PATH:-/app/analytics-etl-assembly.jar}"
-SPARK_JAR_CLASS="${SPARK_JAR_CLASS:-com.gerritforge.analytics.job.Main}"
+SPARK_JAR_CLASS="${SPARK_JAR_CLASS:-com.gerritforge.analytics.gitcommits.job.Main}"
echo "* Elastic Search Host: $ES_HOST:$ES_PORT"
echo "* Gerrit URL: $GERRIT_URL"
diff --git a/src/main/scala/com/gerritforge/analytics/api/gerritApiConnectivity.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/api/gerritApiConnectivity.scala
similarity index 97%
rename from src/main/scala/com/gerritforge/analytics/api/gerritApiConnectivity.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/api/gerritApiConnectivity.scala
index 8417f1d..ece68d1 100644
--- a/src/main/scala/com/gerritforge/analytics/api/gerritApiConnectivity.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/api/gerritApiConnectivity.scala
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.gerritforge.analytics.api
+package com.gerritforge.analytics.gitcommits.api
import java.net.URL
@@ -21,7 +21,6 @@
import scala.io.{BufferedSource, Codec, Source}
-
sealed trait HttpBasicAuthentication {
val BASIC = "Basic"
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala
similarity index 97%
rename from src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala
index a0733bc..ada6a12 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala
@@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.gerritforge.analytics.engine
+package com.gerritforge.analytics.gitcommits.engine
import java.io.IOException
import java.time.format.DateTimeFormatter
import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
-import com.gerritforge.analytics.api.GerritConnectivity
-import com.gerritforge.analytics.model._
+import com.gerritforge.analytics.gitcommits.api.GerritConnectivity
+import com.gerritforge.analytics.gitcommits.model._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.{udf, _}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
@@ -41,7 +41,6 @@
sourceURL.toArray.flatMap(getProjectJsonContributorsArrayFromUrl(project, _, gerritApiConnection))
}
-
def filterEmptyStrings(urlSource: BufferedSource): Iterator[String] =
urlSource.getLines()
.filterNot(_.trim.isEmpty)
@@ -163,7 +162,6 @@
private def emailToDomainUdf = udf(emailToDomain(_: String))
-
implicit class PimpedRDDProjectContributionSource(val projectsAndUrls: RDD[ProjectContributionSource]) extends AnyVal {
def fetchRawContributors(gerritApiConnection: GerritConnectivity)(implicit spark: SparkSession): RDD[(String, String)] = {
@@ -192,5 +190,4 @@
.toDF("project", "json")
.transformCommitterInfo
}
-
}
diff --git a/src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/AggregationStrategy.scala
similarity index 94%
rename from src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/AggregationStrategy.scala
index 95aa113..197367a 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/AggregationStrategy.scala
@@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.gerritforge.analytics.engine.events
+package com.gerritforge.analytics.gitcommits.engine.events
import java.text.DateFormat
import java.time.LocalDateTime
-import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.{AnalyticsDateTimeFormater, CommonTimeOperations}
+import com.gerritforge.analytics.gitcommits.support.ops.AnalyticsTimeOps.{AnalyticsDateTimeFormater, CommonTimeOperations}
import scala.util.Try
@@ -84,4 +84,4 @@
super.decomposeTimeOfAggregatedEvent(event).copy(month = 0, day = 0, hour = 0)
}
-}
\ No newline at end of file
+}
diff --git a/src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformations.scala
similarity index 97%
rename from src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformations.scala
index 121b98c..07d9bc1 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformations.scala
@@ -12,18 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.gerritforge.analytics.engine.events
+package com.gerritforge.analytics.gitcommits.engine.events
import java.time.{LocalDateTime, ZoneOffset}
-import com.gerritforge.analytics.engine.GerritAnalyticsTransformations.{CommitInfo, UserActivitySummary}
+import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations.{ CommitInfo, UserActivitySummary }
import com.typesafe.scalalogging.LazyLogging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
import scala.util.{Failure, Success}
-
object GerritEventsTransformations extends LazyLogging {
case class NotParsableJsonEvent(source: String, failureDescription: String)
@@ -35,6 +34,7 @@
}
implicit class PimpedGerritJsonEventRDD[T <: GerritJsonEvent](val self: RDD[T]) extends AnyVal {
+
/**
* Returns the UTC date of the earliest event in the RDD
*
@@ -97,7 +97,6 @@
}
}
-
private def addCommitSummary(summaryTemplate: UserActivitySummary, changes: Iterable[ChangeMergedEvent]): UserActivitySummary = {
// We assume all the changes are consistent on the is_merge filter
summaryTemplate.copy(
diff --git a/src/main/scala/com/gerritforge/analytics/engine/events/model.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/model.scala
similarity index 98%
rename from src/main/scala/com/gerritforge/analytics/engine/events/model.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/model.scala
index 15956a6..fe86d01 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/events/model.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/model.scala
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.gerritforge.analytics.engine.events
+package com.gerritforge.analytics.gitcommits.engine.events
import com.typesafe.scalalogging.LazyLogging
import scala.util.{Failure, Success, Try}
@@ -23,12 +23,10 @@
def fromJson(json: String): Try[GerritJsonEvent]
}
-
object EventParser extends GerritJsonEventParser with LazyLogging {
val ChangeMergedEventType = "change-merged"
val RefUpdatedEventType = "ref-updated"
-
override def fromJson(json: String): Try[GerritJsonEvent] = {
import org.json4s._
import org.json4s.native.JsonMethods._
@@ -93,7 +91,6 @@
by: GerritAccount
)
-
case class GerritPatchSet(
number: String,
revision: String,
@@ -111,7 +108,6 @@
def isDraft : Boolean = draft.getOrElse(false)
}
-
sealed trait GerritJsonEvent {
def `type`: String
@@ -120,7 +116,6 @@
def account: GerritAccount
}
-
sealed trait GerritRepositoryModifiedEvent extends GerritJsonEvent {
def modifiedProject: String
@@ -161,7 +156,6 @@
case class GerritRefUpdate(project: String, refName: String, oldRev: String, newRev: String)
-
case class RefUpdatedEvent(
refUpdate: GerritRefUpdate,
submitter: Option[GerritAccount],
@@ -176,4 +170,4 @@
def modifiedRef: String = refUpdate.refName
def newRev: String = refUpdate.newRev
-}
\ No newline at end of file
+}
diff --git a/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
new file mode 100644
index 0000000..16a376a
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
@@ -0,0 +1,263 @@
+// Copyright (C) 2017 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.gitcommits.job
+
+import java.time.LocalDate
+
+import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations.NotParsableJsonEvent
+import com.gerritforge.analytics.gitcommits.engine.events.{
+ AggregationStrategy,
+ EventParser,
+ GerritJsonEvent
+}
+import com.gerritforge.analytics.gitcommits.model.{
+ GerritEndpointConfig,
+ GerritProject,
+ GerritProjectsSupport
+}
+import com.gerritforge.analytics.gitcommits.support.ops.AnalyticsTimeOps
+import com.gerritforge.analytics.gitcommits.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import scopt.Read.reads
+import scopt.{OptionParser, Read}
+
+import scala.io.Codec
+import scala.util.control.NonFatal
+import scala.util.{Failure, Success}
+
+object Main extends App with Job with LazyLogging with FetchRemoteProjects {
+
+ private val fileExists: String => Either[String, Unit] = { path =>
+ if (!new java.io.File(path).exists) {
+ Left(s"ERROR: Path '$path' doesn't exists!")
+ } else {
+ Right()
+ }
+ }
+
+ implicit val localDateRead: Read[LocalDate] = reads { dateStr =>
+ val cliDateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd
+ try {
+ import AnalyticsTimeOps.implicits._
+ dateStr.parseStringToLocalDate(cliDateFormat).get
+ } catch {
+ case NonFatal(e) =>
+ throw new IllegalArgumentException(
+ s"Invalid date '$dateStr' expected format is '${cliDateFormat}'")
+ }
+ }
+
+ private val cliOptionParser: OptionParser[GerritEndpointConfig] =
+ new scopt.OptionParser[GerritEndpointConfig]("scopt") {
+ head("scopt", "3.x")
+ opt[String]('u', "url") optional () action { (x, c) =>
+ c.copy(baseUrl = Some(x))
+ } text "gerrit url"
+ opt[String]('p', "prefix") optional () action { (p, c) =>
+ c.copy(prefix = Some(p))
+ } text "projects prefix"
+ opt[String]('o', "out") optional () action { (x, c) =>
+ c.copy(outputDir = x)
+ } text "output directory"
+ opt[String]('e', "elasticIndex") optional () action { (x, c) =>
+ c.copy(elasticIndex = Some(x))
+ } text "index name"
+ opt[LocalDate]('s', "since") optional () action { (x, c) =>
+ c.copy(since = Some(x))
+ } text "begin date "
+ opt[LocalDate]('u', "until") optional () action { (x, c) =>
+ c.copy(until = Some(x))
+ } text "since date"
+ opt[String]('g', "aggregate") optional () action { (x, c) =>
+ c.copy(aggregate = Some(x))
+ } text "aggregate email/email_hour/email_day/email_month/email_year"
+
+ opt[String]('a', "email-aliases") optional () validate fileExists action { (path, c) =>
+ c.copy(emailAlias = Some(path))
+ } text "\"emails to author alias\" input data path"
+ opt[String]("events") optional () action { (eventsPath, config) =>
+ config.copy(eventsPath = Some(eventsPath))
+ } text "location where to load the Gerrit Events"
+ opt[String]("writeNotProcessedEventsTo") optional () action { (failedEventsPath, config) =>
+ config.copy(eventsFailureOutputPath = Some(failedEventsPath))
+ } text "location where to write a TSV file containing the events we couldn't process with a description fo the reason why"
+
+ opt[String]("username") optional () action { (input, c) =>
+ c.copy(username = Some(input))
+ } text "Gerrit API Username"
+
+ opt[String]("password") optional () action { (input, c) =>
+ c.copy(password = Some(input))
+ } text "Gerrit API Password"
+
+ opt[Boolean]('r', "extract-branches") optional () action { (input, c) =>
+ c.copy(extractBranches = Some(input))
+ } text "enables branches extraction for each commit"
+
+ }
+
+ cliOptionParser.parse(args, GerritEndpointConfig()) match {
+ case Some(config) =>
+ implicit val spark: SparkSession = SparkSession
+ .builder()
+ .appName("Gerrit Analytics ETL")
+ .getOrCreate()
+
+ implicit val _: GerritEndpointConfig = config
+
+ logger.info(s"Starting analytics app with config $config")
+
+ val dataFrame = buildProjectStats().cache() //This dataframe is written twice
+
+ logger.info(s"ES content created, saving it to '${config.outputDir}'")
+ dataFrame.write.json(config.outputDir)
+
+ saveES(dataFrame)
+
+ case None => // invalid configuration usage has been displayed
+ }
+}
+
+trait Job {
+ self: LazyLogging with FetchProjects =>
+ implicit val codec = Codec.ISO8859
+
+ def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
+ import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations._
+ import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations._
+
+ implicit val sc: SparkContext = spark.sparkContext
+
+ val aggregationStrategy: AggregationStrategy =
+ config.aggregate
+ .flatMap { agg =>
+ AggregationStrategy.byName(agg) match {
+ case Success(strategy) => Some(strategy)
+ case Failure(e) =>
+ logger.warn(s"Invalid aggregation strategy '$agg", e)
+ None
+ }
+ }
+ .getOrElse(AggregationStrategy.aggregateByEmail)
+
+ val projects = fetchProjects(config)
+
+ logger.info(
+ s"Loaded a list of ${projects.size} projects ${if (projects.size > 20) projects.take(20).mkString("[", ",", ", ...]")
+ else projects.mkString("[", ",", "]")}")
+
+ val aliasesDF = getAliasDF(config.emailAlias)
+
+ val events = loadEvents
+
+ val failedEvents: RDD[NotParsableJsonEvent] = events.collect {
+ case Left(eventFailureDescription) => eventFailureDescription
+ }
+
+ if (!failedEvents.isEmpty()) {
+ config.eventsFailureOutputPath.foreach { failurePath =>
+ logger.info(s"Events failures will be stored at '$failurePath'")
+
+ import spark.implicits._
+ failedEvents.toDF().write.option("sep", "\t").option("header", true).csv(failurePath)
+ }
+ }
+
+ //We might want to use the time of the events as information to feed to the collection of data from the repository
+ val repositoryAlteringEvents = events.collect { case Right(event) => event }.repositoryWithNewRevisionEvents
+
+ val firstEventDateMaybe: Option[LocalDate] =
+ if (repositoryAlteringEvents.isEmpty()) None
+ else Some(repositoryAlteringEvents.earliestEventTime.toLocalDate)
+
+ val configWithOverriddenUntil = firstEventDateMaybe.fold(config) { firstEventDate =>
+ val lastAggregationDate = firstEventDate.plusMonths(1)
+ if (lastAggregationDate.isBefore(LocalDate.now())) {
+ logger.info(
+ s"Overriding 'until' date '${config.until}' with '$lastAggregationDate' since events are available until $firstEventDate")
+ config.copy(until = Some(lastAggregationDate))
+ } else {
+ config
+ }
+ }
+
+ val statsFromAnalyticsPlugin =
+ getContributorStatsFromAnalyticsPlugin(spark.sparkContext.parallelize(projects),
+ configWithOverriddenUntil.contributorsUrl,
+ config.gerritApiConnection)
+
+ val statsFromEvents = getContributorStatsFromGerritEvents(
+ repositoryAlteringEvents,
+ statsFromAnalyticsPlugin.commitSet.rdd,
+ aggregationStrategy)
+
+ val mergedEvents = if (statsFromEvents.head(1).isEmpty) {
+ statsFromAnalyticsPlugin
+ } else {
+ require(
+ statsFromAnalyticsPlugin.schema == statsFromEvents.schema,
+ s""" Schemas from the stats collected from events and from the analytics datasets differs!!
+ | From analytics plugin: ${statsFromAnalyticsPlugin.schema}
+ | From gerrit events: ${statsFromEvents.schema}
+ """.stripMargin
+ )
+
+ (statsFromAnalyticsPlugin union statsFromEvents)
+ }
+
+ mergedEvents.dashboardStats(aliasesDF)
+ }
+
+ def loadEvents(
+ implicit config: GerritEndpointConfig,
+ spark: SparkSession): RDD[Either[NotParsableJsonEvent, GerritJsonEvent]] = { // toDF
+ import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations._
+
+ config.eventsPath.fold(
+ spark.sparkContext.emptyRDD[Either[NotParsableJsonEvent, GerritJsonEvent]]) { eventsPath =>
+ spark.read
+ .textFile(eventsPath)
+ .rdd
+ .parseEvents(EventParser)
+ }
+ }
+
+ def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
+ import org.elasticsearch.spark.sql._
+ config.elasticIndex.foreach { esIndex =>
+ logger.info(
+ s"ES content created, saving it to elastic search instance at '${config.elasticIndex}'")
+
+ df.saveToEs(esIndex)
+ }
+
+ }
+}
+
+trait FetchProjects {
+ def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject]
+}
+
+trait FetchRemoteProjects extends FetchProjects {
+ def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
+ config.gerritProjectsUrl.toSeq.flatMap { url =>
+ GerritProjectsSupport.parseJsonProjectListResponse(
+ config.gerritApiConnection.getContentFromApi(url))
+ }
+ }
+}
diff --git a/src/main/scala/com/gerritforge/analytics/model/Email.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/model/Email.scala
similarity index 94%
rename from src/main/scala/com/gerritforge/analytics/model/Email.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/model/Email.scala
index e439ae7..e99a8f1 100644
--- a/src/main/scala/com/gerritforge/analytics/model/Email.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/model/Email.scala
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.gerritforge.analytics.model
+package com.gerritforge.analytics.gitcommits.model
case class Email(user: String, domain: String)
@@ -27,4 +27,4 @@
case _ => None
}
}
-}
\ No newline at end of file
+}
diff --git a/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala
new file mode 100644
index 0000000..69e8cee
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala
@@ -0,0 +1,65 @@
+// Copyright (C) 2017 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.gitcommits.model
+
+import java.time.format.DateTimeFormatter
+import java.time.{LocalDate, ZoneOffset}
+
+import com.gerritforge.analytics.gitcommits.api.GerritConnectivity
+import com.gerritforge.analytics.gitcommits.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
+
+case class GerritEndpointConfig(
+ baseUrl: Option[String] = None,
+ prefix: Option[String] = None,
+ outputDir: String =
+ s"file://${System.getProperty("java.io.tmpdir")}/analytics-${System.nanoTime()}",
+ elasticIndex: Option[String] = None,
+ since: Option[LocalDate] = None,
+ until: Option[LocalDate] = None,
+ aggregate: Option[String] = None,
+ emailAlias: Option[String] = None,
+ eventsPath: Option[String] = None,
+ eventsFailureOutputPath: Option[String] = None,
+ username: Option[String] = None,
+ password: Option[String] = None,
+ extractBranches: Option[Boolean] = None) {
+
+ val gerritApiConnection: GerritConnectivity = new GerritConnectivity(username, password)
+
+ val gerritProjectsUrl: Option[String] = baseUrl.map { url =>
+ s"${url}/projects/" + prefix.fold("")("?p=" + _)
+ }
+
+ def queryOpt(opt: (String, Option[String])): Option[String] = {
+ opt match {
+ case (name: String, value: Option[String]) => value.map(name + "=" + _)
+ }
+ }
+
+ @transient
+ private lazy val format: DateTimeFormatter =
+ AnalyticsDateTimeFormater.yyyy_MM_dd.withZone(ZoneOffset.UTC)
+ val queryString = Seq(
+ "since" -> since.map(format.format),
+ "until" -> until.map(format.format),
+ "aggregate" -> aggregate,
+ "extract-branches" -> extractBranches.map(_.toString)
+ ).flatMap(queryOpt).mkString("?", "&", "")
+
+ def contributorsUrl(projectName: String): Option[String] =
+ baseUrl.map { url =>
+ s"$url/projects/$projectName/analytics~contributors$queryString"
+ }
+}
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritProject.scala
similarity index 96%
rename from src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritProject.scala
index 139791d..5dda011 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritProject.scala
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.gerritforge.analytics.model
+package com.gerritforge.analytics.gitcommits.model
import com.google.gerrit.extensions.api.GerritApi
import com.google.inject.Inject
@@ -51,4 +51,4 @@
}
}
-case class ProjectContributionSource(name: String, contributorsUrl: Option[String])
\ No newline at end of file
+case class ProjectContributionSource(name: String, contributorsUrl: Option[String])
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/GerritConfigSupport.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/GerritConfigSupport.scala
similarity index 93%
rename from src/main/scala/com/gerritforge/analytics/plugin/GerritConfigSupport.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/plugin/GerritConfigSupport.scala
index c980431..51f1c39 100644
--- a/src/main/scala/com/gerritforge/analytics/plugin/GerritConfigSupport.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/GerritConfigSupport.scala
@@ -1,4 +1,4 @@
-package com.gerritforge.analytics.plugin
+package com.gerritforge.analytics.gitcommits.plugin
import com.google.gerrit.server.config.GerritServerConfig
import com.google.inject.Inject
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/Module.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/Module.scala
similarity index 68%
rename from src/main/scala/com/gerritforge/analytics/plugin/Module.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/plugin/Module.scala
index a42e44e..e7ca14c 100644
--- a/src/main/scala/com/gerritforge/analytics/plugin/Module.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/Module.scala
@@ -1,4 +1,4 @@
-package com.gerritforge.analytics.plugin
+package com.gerritforge.analytics.gitcommits.plugin
import com.google.inject.AbstractModule
diff --git a/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
new file mode 100644
index 0000000..883b1f8
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
@@ -0,0 +1,134 @@
+package com.gerritforge.analytics.gitcommits.plugin
+
+import java.sql.Timestamp
+import java.time.LocalDate
+
+import com.gerritforge.analytics.gitcommits.engine.events.AggregationStrategy
+import com.gerritforge.analytics.gitcommits.job.{FetchProjects, Job}
+import com.gerritforge.analytics.gitcommits.model.{
+ GerritEndpointConfig,
+ GerritProject,
+ GerritProjectsSupport
+}
+import com.google.gerrit.server.project.ProjectControl
+import com.google.gerrit.sshd.{CommandMetaData, SshCommand}
+import com.google.inject.Inject
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
+import org.kohsuke.args4j.{Argument, Option => ArgOption}
+
+import scala.io.Source
+import scala.util.{Failure, Success}
+
+@CommandMetaData(name = "processGitCommits",
+ description = "Start the extraction of Git Commits Gerrit analytics")
+class ProcessGitCommitsCommand @Inject()(implicit val gerritProjects: GerritProjectsSupport,
+ val gerritConfig: GerritConfigSupport)
+ extends SshCommand
+ with Job
+ with DateConversions
+ with FetchProjects
+ with LazyLogging {
+
+ @Argument(index = 0, required = true, metaVar = "PROJECT", usage = "project name")
+ var projectControl: ProjectControl = null
+
+ @ArgOption(name = "--elasticIndex", aliases = Array("-e"), usage = "index name")
+ var elasticIndex: String = "gerrit/analytics"
+
+ @ArgOption(name = "--since", aliases = Array("-s"), usage = "begin date")
+ var beginDate: Timestamp = NO_TIMESTAMP
+
+ @ArgOption(name = "--until", aliases = Array("-u"), usage = "end date")
+ var endDate: Timestamp = NO_TIMESTAMP
+
+ @ArgOption(name = "--aggregate",
+ aliases = Array("-g"),
+ usage = "aggregate email/email_hour/email_day/email_month/email_year")
+ var aggregate: String = "email_day"
+
+ @ArgOption(name = "--email-aliases",
+ aliases = Array("-a"),
+ usage = "\"emails to author alias\" input data path")
+ var emailAlias: String = null
+
+ @ArgOption(name = "--extract-branches",
+ aliases = Array("-r"),
+ usage = "enables branches extraction for each commit")
+ var extractBranches: Boolean = false
+
+ override def run() {
+ implicit val config = GerritEndpointConfig(gerritConfig.getListenUrl(),
+ prefix =
+ Option(projectControl).map(_.getProject.getName),
+ "",
+ elasticIndex,
+ beginDate,
+ endDate,
+ aggregate,
+ emailAlias)
+
+ implicit val spark: SparkSession = SparkSession
+ .builder()
+ .appName("analytics-etl")
+ .master("local")
+ .config("key", "value")
+ .getOrCreate()
+
+ implicit lazy val sc: SparkContext = spark.sparkContext
+ implicit lazy val sql: SQLContext = spark.sqlContext
+
+ val prevClassLoader = Thread.currentThread().getContextClassLoader
+ Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
+ try {
+ stdout.println(s"Starting new Spark job with parameters: $config")
+ stdout.flush()
+ val startTs = System.currentTimeMillis
+ val projectStats = buildProjectStats().cache()
+ val numRows = projectStats.count()
+
+ import org.elasticsearch.spark.sql._
+ config.elasticIndex.foreach { esIndex =>
+ stdout.println(
+ s"$numRows rows extracted. Posting Elasticsearch at '${config.elasticIndex}'")
+ stdout.flush()
+ projectStats.saveToEs(esIndex)
+ }
+
+ val elaspsedTs = (System.currentTimeMillis - startTs) / 1000L
+ stdout.println(s"Job COMPLETED in $elaspsedTs secs")
+ } catch {
+ case e: Throwable =>
+ e.printStackTrace()
+ stderr.println(s"Job FAILED: ${e.getClass} : ${e.getMessage}")
+ die(e)
+ } finally {
+ Thread.currentThread().setContextClassLoader(prevClassLoader)
+ }
+ }
+
+ def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
+ config.prefix.toSeq.flatMap(projectName =>
+ gerritProjects.getProject(projectName) match {
+ case Success(project) =>
+ Seq(project)
+ case Failure(e) => {
+ logger.warn(s"Unable to fetch project $projectName", e)
+ Seq()
+ }
+ })
+ }
+}
+
+trait DateConversions {
+ val NO_TIMESTAMP = new Timestamp(0L)
+
+ implicit def timestampToLocalDate(timestamp: Timestamp): Option[LocalDate] = timestamp match {
+ case NO_TIMESTAMP => None
+ case ts => Some(ts.toLocalDateTime.toLocalDate)
+ }
+
+ implicit def nullableStringToOption(nullableString: String): Option[String] =
+ Option(nullableString)
+}
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/SshModule.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/SshModule.scala
similarity index 60%
rename from src/main/scala/com/gerritforge/analytics/plugin/SshModule.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/plugin/SshModule.scala
index 290b1ff..9d1bc0b 100644
--- a/src/main/scala/com/gerritforge/analytics/plugin/SshModule.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/SshModule.scala
@@ -1,9 +1,9 @@
-package com.gerritforge.analytics.plugin
+package com.gerritforge.analytics.gitcommits.plugin
import com.google.gerrit.sshd.PluginCommandModule
class SshModule extends PluginCommandModule {
override protected def configureCommands {
- command(classOf[StartCommand])
+ command(classOf[ProcessGitCommitsCommand])
}
}
diff --git a/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOps.scala
similarity index 97%
rename from src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOps.scala
index 9012617..5628ceb 100644
--- a/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOps.scala
@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-
-package com.gerritforge.analytics.support.ops
+package com.gerritforge.analytics.gitcommits.support.ops
import java.time.format.DateTimeFormatter
import java.time.{LocalDateTime, ZoneOffset}
@@ -62,7 +61,6 @@
def convertToUTCLocalDateTime: OffsetDateTime = localDateTime.atOffset(ZoneOffset.UTC)
}
-
implicit class StringToTimeParsingOps(val dateStr: String) extends AnyVal {
def parseStringToUTCEpoch(stringFormat: DateTimeFormatter): Option[Long] =
Try(LocalDateTime.parse(dateStr, stringFormat).convertToUTCEpochMillis).toOption
@@ -83,4 +81,4 @@
implicit def nullableStringToOption(nullableString: String): Option[String] = Option(nullableString)
}
-}
\ No newline at end of file
+}
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
deleted file mode 100644
index 099efac..0000000
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ /dev/null
@@ -1,228 +0,0 @@
-// Copyright (C) 2017 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics.job
-
-import java.time.LocalDate
-
-import com.gerritforge.analytics.engine.events.GerritEventsTransformations.NotParsableJsonEvent
-import com.gerritforge.analytics.engine.events.{AggregationStrategy, EventParser, GerritJsonEvent}
-import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
-import com.gerritforge.analytics.support.ops.AnalyticsTimeOps
-import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
-import com.typesafe.scalalogging.LazyLogging
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import scopt.Read.reads
-import scopt.{OptionParser, Read}
-
-import scala.io.Codec
-import scala.util.control.NonFatal
-import scala.util.{Failure, Success}
-
-object Main extends App with Job with LazyLogging with FetchRemoteProjects {
-
- private val fileExists: String => Either[String, Unit] = { path =>
- if (!new java.io.File(path).exists) {
- Left(s"ERROR: Path '$path' doesn't exists!")
- } else {
- Right()
- }
- }
-
- implicit val localDateRead: Read[LocalDate] = reads { dateStr =>
- val cliDateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd
- try {
- import AnalyticsTimeOps.implicits._
- dateStr.parseStringToLocalDate(cliDateFormat).get
- } catch {
- case NonFatal(e) =>
- throw new IllegalArgumentException(s"Invalid date '$dateStr' expected format is '${cliDateFormat}'")
- }
- }
-
- private val cliOptionParser: OptionParser[GerritEndpointConfig] = new scopt.OptionParser[GerritEndpointConfig]("scopt") {
- head("scopt", "3.x")
- opt[String]('u', "url") optional() action { (x, c) =>
- c.copy(baseUrl = Some(x))
- } text "gerrit url"
- opt[String]('p', "prefix") optional() action { (p, c) =>
- c.copy(prefix = Some(p))
- } text "projects prefix"
- opt[String]('o', "out") optional() action { (x, c) =>
- c.copy(outputDir = x)
- } text "output directory"
- opt[String]('e', "elasticIndex") optional() action { (x, c) =>
- c.copy(elasticIndex = Some(x))
- } text "index name"
- opt[LocalDate]('s', "since") optional() action { (x, c) =>
- c.copy(since = Some(x))
- } text "begin date "
- opt[LocalDate]('u', "until") optional() action { (x, c) =>
- c.copy(until = Some(x))
- } text "since date"
- opt[String]('g', "aggregate") optional() action { (x, c) =>
- c.copy(aggregate = Some(x))
- } text "aggregate email/email_hour/email_day/email_month/email_year"
-
- opt[String]('a', "email-aliases") optional() validate fileExists action { (path, c) =>
- c.copy(emailAlias = Some(path))
- } text "\"emails to author alias\" input data path"
- opt[String]("events") optional() action { (eventsPath, config) =>
- config.copy(eventsPath = Some(eventsPath))
- } text "location where to load the Gerrit Events"
- opt[String]("writeNotProcessedEventsTo") optional() action { (failedEventsPath, config) =>
- config.copy(eventsFailureOutputPath = Some(failedEventsPath))
- } text "location where to write a TSV file containing the events we couldn't process with a description fo the reason why"
-
- opt[String]("username") optional() action { (input, c) =>
- c.copy(username = Some(input))
- } text "Gerrit API Username"
-
- opt[String]("password") optional() action { (input, c) =>
- c.copy(password = Some(input))
- } text "Gerrit API Password"
-
- opt[Boolean]('r', "extract-branches") optional() action { (input, c) =>
- c.copy(extractBranches = Some(input))
- } text "enables branches extraction for each commit"
-
- }
-
- cliOptionParser.parse(args, GerritEndpointConfig()) match {
- case Some(config) =>
- implicit val spark: SparkSession = SparkSession.builder()
- .appName("Gerrit Analytics ETL")
- .getOrCreate()
-
- implicit val _: GerritEndpointConfig = config
-
- logger.info(s"Starting analytics app with config $config")
-
- val dataFrame = buildProjectStats().cache() //This dataframe is written twice
-
- logger.info(s"ES content created, saving it to '${config.outputDir}'")
- dataFrame.write.json(config.outputDir)
-
- saveES(dataFrame)
-
- case None => // invalid configuration usage has been displayed
- }
-}
-
-trait Job {
- self: LazyLogging with FetchProjects =>
- implicit val codec = Codec.ISO8859
-
- def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
- import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
- import com.gerritforge.analytics.engine.events.GerritEventsTransformations._
-
- implicit val sc: SparkContext = spark.sparkContext
-
- val aggregationStrategy: AggregationStrategy =
- config.aggregate.flatMap { agg =>
- AggregationStrategy.byName(agg) match {
- case Success(strategy) => Some(strategy)
- case Failure(e) =>
- logger.warn(s"Invalid aggregation strategy '$agg", e)
- None
- }
- }.getOrElse(AggregationStrategy.aggregateByEmail)
-
- val projects = fetchProjects(config)
-
- logger.info(s"Loaded a list of ${projects.size} projects ${if (projects.size > 20) projects.take(20).mkString("[", ",", ", ...]") else projects.mkString("[", ",", "]")}")
-
- val aliasesDF = getAliasDF(config.emailAlias)
-
- val events = loadEvents
-
- val failedEvents: RDD[NotParsableJsonEvent] = events.collect { case Left(eventFailureDescription) => eventFailureDescription }
-
- if (!failedEvents.isEmpty()) {
- config.eventsFailureOutputPath.foreach { failurePath =>
- logger.info(s"Events failures will be stored at '$failurePath'")
-
- import spark.implicits._
- failedEvents.toDF().write.option("sep", "\t").option("header", true).csv(failurePath)
- }
- }
-
- //We might want to use the time of the events as information to feed to the collection of data from the repository
- val repositoryAlteringEvents = events.collect { case Right(event) => event }.repositoryWithNewRevisionEvents
-
- val firstEventDateMaybe: Option[LocalDate] = if (repositoryAlteringEvents.isEmpty()) None else Some(repositoryAlteringEvents.earliestEventTime.toLocalDate)
-
- val configWithOverriddenUntil = firstEventDateMaybe.fold(config) { firstEventDate =>
- val lastAggregationDate = firstEventDate.plusMonths(1)
- if (lastAggregationDate.isBefore(LocalDate.now())) {
- logger.info(s"Overriding 'until' date '${config.until}' with '$lastAggregationDate' since events are available until $firstEventDate")
- config.copy(until = Some(lastAggregationDate))
- } else {
- config
- }
- }
-
- val statsFromAnalyticsPlugin =
- getContributorStatsFromAnalyticsPlugin(spark.sparkContext.parallelize(projects), configWithOverriddenUntil.contributorsUrl, config.gerritApiConnection)
-
- val statsFromEvents = getContributorStatsFromGerritEvents(repositoryAlteringEvents, statsFromAnalyticsPlugin.commitSet.rdd, aggregationStrategy)
-
- val mergedEvents = if (statsFromEvents.head(1).isEmpty) {
- statsFromAnalyticsPlugin
- } else {
- require(statsFromAnalyticsPlugin.schema == statsFromEvents.schema,
- s""" Schemas from the stats collected from events and from the analytics datasets differs!!
- | From analytics plugin: ${statsFromAnalyticsPlugin.schema}
- | From gerrit events: ${statsFromEvents.schema}
- """.stripMargin)
-
- (statsFromAnalyticsPlugin union statsFromEvents)
- }
-
- mergedEvents.dashboardStats(aliasesDF)
- }
-
- def loadEvents(implicit config: GerritEndpointConfig, spark: SparkSession): RDD[Either[NotParsableJsonEvent, GerritJsonEvent]] = { // toDF
- import com.gerritforge.analytics.engine.events.GerritEventsTransformations._
-
- config.eventsPath.fold(spark.sparkContext.emptyRDD[Either[NotParsableJsonEvent, GerritJsonEvent]]) { eventsPath =>
- spark
- .read.textFile(eventsPath).rdd
- .parseEvents(EventParser)
- }
- }
-
- def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
- import org.elasticsearch.spark.sql._
- config.elasticIndex.foreach { esIndex =>
- logger.info(s"ES content created, saving it to elastic search instance at '${config.elasticIndex}'")
-
- df.saveToEs(esIndex)
- }
-
- }
-}
-
-trait FetchProjects {
- def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject]
-}
-
-trait FetchRemoteProjects extends FetchProjects {
- def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
- config.gerritProjectsUrl.toSeq.flatMap { url => GerritProjectsSupport.parseJsonProjectListResponse(config.gerritApiConnection.getContentFromApi(url)) }
- }
-}
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
deleted file mode 100644
index d76743d..0000000
--- a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-// Copyright (C) 2017 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics.model
-
-import java.time.format.DateTimeFormatter
-import java.time.{LocalDate, ZoneOffset}
-
-import com.gerritforge.analytics.api.GerritConnectivity
-import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
-
-case class GerritEndpointConfig(baseUrl: Option[String] = None,
- prefix: Option[String] = None,
- outputDir: String = s"file://${System.getProperty("java.io.tmpdir")}/analytics-${System.nanoTime()}",
- elasticIndex: Option[String] = None,
- since: Option[LocalDate] = None,
- until: Option[LocalDate] = None,
- aggregate: Option[String] = None,
- emailAlias: Option[String] = None,
- eventsPath: Option[String] = None,
- eventsFailureOutputPath: Option[String] = None,
- username: Option[String] = None,
- password: Option[String] = None,
- extractBranches: Option[Boolean] = None
- ) {
-
-
- val gerritApiConnection: GerritConnectivity = new GerritConnectivity(username, password)
-
- val gerritProjectsUrl: Option[String] = baseUrl.map { url => s"${url}/projects/" + prefix.fold("")("?p=" + _) }
-
- def queryOpt(opt: (String, Option[String])): Option[String] = {
- opt match {
- case (name: String, value: Option[String]) => value.map(name + "=" + _)
- }
- }
-
- @transient
- private lazy val format: DateTimeFormatter = AnalyticsDateTimeFormater.yyyy_MM_dd.withZone(ZoneOffset.UTC)
- val queryString = Seq(
- "since" -> since.map(format.format),
- "until" -> until.map(format.format),
- "aggregate" -> aggregate,
- "extract-branches" -> extractBranches.map(_.toString)
- ).flatMap(queryOpt).mkString("?", "&", "")
-
- def contributorsUrl(projectName: String): Option[String] =
- baseUrl.map { url => s"$url/projects/$projectName/analytics~contributors$queryString" }
-}
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala b/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala
deleted file mode 100644
index 9832df5..0000000
--- a/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-package com.gerritforge.analytics.plugin
-
-import java.sql.Timestamp
-import java.time.LocalDate
-
-import com.gerritforge.analytics.engine.events.AggregationStrategy
-import com.gerritforge.analytics.job.{FetchProjects, Job}
-import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
-import com.google.gerrit.server.project.ProjectControl
-import com.google.gerrit.sshd.{CommandMetaData, SshCommand}
-import com.google.inject.Inject
-import com.typesafe.scalalogging.LazyLogging
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
-import org.kohsuke.args4j.{Argument, Option => ArgOption}
-
-import scala.io.Source
-import scala.util.{Failure, Success}
-
-@CommandMetaData(name = "start", description = "Start the extraction of Gerrit analytics")
-class StartCommand @Inject()(implicit val gerritProjects: GerritProjectsSupport, val gerritConfig: GerritConfigSupport)
- extends SshCommand with Job with DateConversions with FetchProjects with LazyLogging {
-
- @Argument(index = 0, required = true, metaVar = "PROJECT", usage = "project name")
- var projectControl: ProjectControl = null
-
- @ArgOption(name = "--elasticIndex", aliases = Array("-e"),
- usage = "index name")
- var elasticIndex: String = "gerrit/analytics"
-
- @ArgOption(name = "--since", aliases = Array("-s"), usage = "begin date")
- var beginDate: Timestamp = NO_TIMESTAMP
-
- @ArgOption(name = "--until", aliases = Array("-u"), usage = "end date")
- var endDate: Timestamp = NO_TIMESTAMP
-
- @ArgOption(name = "--aggregate", aliases = Array("-g"), usage = "aggregate email/email_hour/email_day/email_month/email_year")
- var aggregate: String = "email_day"
-
- @ArgOption(name = "--email-aliases", aliases = Array("-a"), usage = "\"emails to author alias\" input data path")
- var emailAlias: String = null
-
- @ArgOption(name = "--extract-branches", aliases = Array("-r"), usage = "enables branches extraction for each commit")
- var extractBranches: Boolean = false
-
- override def run() {
- implicit val config = GerritEndpointConfig(gerritConfig.getListenUrl(), prefix = Option(projectControl).map(_.getProject.getName), "", elasticIndex,
- beginDate, endDate, aggregate, emailAlias)
-
- implicit val spark: SparkSession = SparkSession.builder()
- .appName("analytics-etl")
- .master("local")
- .config("key", "value")
- .getOrCreate()
-
- implicit lazy val sc: SparkContext = spark.sparkContext
- implicit lazy val sql: SQLContext = spark.sqlContext
-
- val prevClassLoader = Thread.currentThread().getContextClassLoader
- Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
- try {
- stdout.println(s"Starting new Spark job with parameters: $config")
- stdout.flush()
- val startTs = System.currentTimeMillis
- val projectStats = buildProjectStats().cache()
- val numRows = projectStats.count()
-
- import org.elasticsearch.spark.sql._
- config.elasticIndex.foreach { esIndex =>
- stdout.println(s"$numRows rows extracted. Posting Elasticsearch at '${config.elasticIndex}'")
- stdout.flush()
- projectStats.saveToEs(esIndex)
- }
-
- val elaspsedTs = (System.currentTimeMillis - startTs) / 1000L
- stdout.println(s"Job COMPLETED in $elaspsedTs secs")
- } catch {
- case e: Throwable =>
- e.printStackTrace()
- stderr.println(s"Job FAILED: ${e.getClass} : ${e.getMessage}")
- die(e)
- } finally {
- Thread.currentThread().setContextClassLoader(prevClassLoader)
- }
- }
-
- def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
- config.prefix.toSeq.flatMap(projectName => gerritProjects.getProject(projectName) match {
- case Success(project) =>
- Seq(project)
- case Failure(e) => {
- logger.warn(s"Unable to fetch project $projectName", e)
- Seq()
- }
- })
- }
-}
-
-trait DateConversions {
- val NO_TIMESTAMP = new Timestamp(0L)
-
- implicit def timestampToLocalDate(timestamp: Timestamp): Option[LocalDate] = timestamp match {
- case NO_TIMESTAMP => None
- case ts => Some(ts.toLocalDateTime.toLocalDate)
- }
-
- implicit def nullableStringToOption(nullableString: String): Option[String] = Option(nullableString)
-}
-
-
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index ec9d3e5..65f4571 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -17,9 +17,9 @@
import java.io.{ByteArrayInputStream, File, FileOutputStream, OutputStreamWriter}
import java.nio.charset.StandardCharsets
-import com.gerritforge.analytics.api.GerritConnectivity
-import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
-import com.gerritforge.analytics.model.{GerritProject, GerritProjectsSupport, ProjectContributionSource}
+import com.gerritforge.analytics.gitcommits.api.GerritConnectivity
+import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations._
+import com.gerritforge.analytics.gitcommits.model.{GerritProject, GerritProjectsSupport, ProjectContributionSource}
import org.apache.spark.sql.Row
import org.json4s.JsonDSL._
import org.json4s._
@@ -48,7 +48,6 @@
projects should contain only(GerritProject("All-Projects-id", "All-Projects-name"), GerritProject("Test-id", "Test-name"))
}
-
"enrichWithSource" should "enrich project RDD object with its source" in {
val projectRdd = sc.parallelize(Seq(GerritProject("project-id", "project-name")))
@@ -75,7 +74,6 @@
""".stripMargin
val expectedResult = List("LineOne", "LineTwo", "LineThree")
-
val inputStream = new ByteArrayInputStream(contentWithEmptyLines.getBytes)
val contentWithoutEmptyLines = filterEmptyStrings(Source.fromInputStream(inputStream, Codec.UTF8.name))
@@ -179,7 +177,6 @@
("aliased_author_with_organization", "aliased_email@aliased_organization.com", "aliased_organization"),
("aliased_author_empty_organization", "aliased_email@emtpy_organization.com", ""),
("aliased_author_null_organization", "aliased_email@null_organization.com", null)
-
)).toDF("author", "email", "organization")
val inputSampleDF = sc.parallelize(Seq(
@@ -196,8 +193,7 @@
val df = inputSampleDF.handleAliases(Some(aliasDF))
- df.schema.fields.map(_.name) should contain allOf(
- "author", "email", "organization")
+ df.schema.fields.map(_.name) should contain allOf ("author", "email", "organization")
df.collect should contain theSameElementsAs expectedDF.collect
}
@@ -214,18 +210,18 @@
val df = inputSampleDF.handleAliases(Some(aliasDF))
- df.schema.fields.map(_.name) should contain allOf("author", "email", "organization")
+ df.schema.fields.map(_.name) should contain allOf ("author", "email", "organization")
}
it should "return correct columns when alias DF is not defined" in {
import spark.implicits._
val expectedTuple = ("author_name", "email@mail.com", "an_organization")
val inputSampleDF = sc.parallelize(Seq(expectedTuple)).toDF("author", "email", "organization")
- val expectedRow = Row.fromTuple(expectedTuple)
+ val expectedRow = Row.fromTuple(expectedTuple)
val df = inputSampleDF.handleAliases(None)
- df.schema.fields.map(_.name) should contain allOf("author", "email", "organization")
+ df.schema.fields.map(_.name) should contain allOf ("author", "email", "organization")
df.collect().head should be(expectedRow)
}
@@ -262,7 +258,6 @@
"email@mail.companyname-couk.co.uk",
"email@mail.companyname-com.com",
"email@mail.companyname-info.info"
-
)).toDF("email")
val transformed = df.addOrganization()
@@ -283,7 +278,6 @@
)
}
-
"extractCommitsPerProject" should "generate a Dataset with the all the SHA of commits with associated project" in {
import sql.implicits._
@@ -306,8 +300,7 @@
}
private def newSource(contributorsJson: JObject*): Option[String] = {
- val tmpFile = File.createTempFile(System.getProperty("java.io.tmpdir"),
- s"${getClass.getName}-${System.nanoTime()}")
+ val tmpFile = File.createTempFile(System.getProperty("java.io.tmpdir"),s"${getClass.getName}-${System.nanoTime()}")
val out = new OutputStreamWriter(new FileOutputStream(tmpFile), StandardCharsets.UTF_8)
contributorsJson.foreach(json => out.write(compact(render(json)) + '\n'))
diff --git a/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala
deleted file mode 100644
index 434662f..0000000
--- a/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala
+++ /dev/null
@@ -1,257 +0,0 @@
-// Copyright (C) 2018 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics.engine.events
-
-import com.gerritforge.analytics.SparkTestSupport
-import com.gerritforge.analytics.engine.GerritAnalyticsTransformations.{CommitInfo, UserActivitySummary}
-import com.gerritforge.analytics.engine.events.GerritEventsTransformations.NotParsableJsonEvent
-import org.apache.spark.rdd.RDD
-import org.scalatest.{Inside, Matchers, WordSpec}
-
-class GerritEventsTransformationsSpec extends WordSpec with Matchers with SparkTestSupport with Inside with EventFixture {
-
- "tryParseGerritTriggeredEvent" should {
-
- implicit val eventParser: GerritJsonEventParser = EventParser
-
- "Parse a correctly formed event" in new EventFixture {
- GerritEventsTransformations.tryParseGerritTriggeredEvent(refUpdated.json) shouldBe Right(refUpdated.event)
- }
-
- "Return a description of the failure in with the original event source a Left object if the JSON provided is invalid" in {
- val invalidJson = "invalid json string"
- GerritEventsTransformations.tryParseGerritTriggeredEvent(invalidJson) shouldBe Left(NotParsableJsonEvent(invalidJson, "unknown token i - Near: i"))
- }
-
- "Return a description of the failure with the original event source in a Left object if the JSON event is not supported" in {
- val unsupportedEvent = """{"type":"ref-updated-UNSUPPORTED","eventCreatedOn":1516531868}"""
-
- GerritEventsTransformations.tryParseGerritTriggeredEvent(unsupportedEvent) shouldBe Left(NotParsableJsonEvent(unsupportedEvent, "Unsupported event type 'ref-updated-UNSUPPORTED'"))
- }
- }
-
- "PimpedJsonRDD" should {
- "Convert an RDD of JSON events into an RDD of events or unparsed json strings" in {
- val jsonRdd = sc.parallelize(Seq(refUpdated.json, changeMerged.json, "invalid json string"))
-
- import GerritEventsTransformations._
-
- jsonRdd.parseEvents(EventParser).collect() should contain only(
- Right(refUpdated.event),
- Right(changeMerged.event),
- Left(NotParsableJsonEvent("invalid json string", "unknown token i - Near: i"))
- )
- }
- }
-
- "extractUserActivitySummary" should {
- implicit val eventParser: GerritJsonEventParser = EventParser
-
- "Build one UserActivitySummary object if given a series of non-merge commits" in {
- val events: Seq[ChangeMergedEvent] = Seq(
- aChangeMergedEvent("1", 1001l, newRev = "rev1", insertions = 2, deletions = 1, branch = "stable-1.14"),
- aChangeMergedEvent("2", 1002l, newRev = "rev2", insertions = 3, deletions = 0, branch = "stable-1.14"),
- aChangeMergedEvent("3", 1003l, newRev = "rev3", insertions = 1, deletions = 4, branch = "stable-1.14"),
- aChangeMergedEvent("4", 1004l, newRev = "rev4", insertions = 0, deletions = 2, branch = "stable-1.14"),
- aChangeMergedEvent("5", 1005l, newRev = "rev5", insertions = 1, deletions = 1, branch = "stable-1.14")
- ).map(_.event)
-
- val summaries: Iterable[UserActivitySummary] = GerritEventsTransformations.extractUserActivitySummary(
- changes = events,
- year = 2018, month = 1, day = 10, hour = 1
- )
-
- summaries should have size 1
-
- inside(summaries.head) {
- case UserActivitySummary(year, month, day, hour, name, email, num_commits, _, _, added_lines, deleted_lines, commits, branches, last_commit_date, is_merge) =>
- year shouldBe 2018
- month shouldBe 1
- day shouldBe 10
- hour shouldBe 1
- name shouldBe "Administrator"
- email shouldBe "admin@example.com"
- num_commits shouldBe events.size
- last_commit_date shouldBe 1005000l
- is_merge shouldBe false
- added_lines shouldBe 7
- deleted_lines shouldBe 8
- commits should contain only(
- CommitInfo("rev1", 1001000l, false),
- CommitInfo("rev2", 1002000l, false),
- CommitInfo("rev3", 1003000l, false),
- CommitInfo("rev4", 1004000l, false),
- CommitInfo("rev5", 1005000l, false)
- )
- branches should contain only "stable-1.14"
- }
- }
-
- "Build two UserActivitySummaries object if given a mixed series of merge and non merge commits" in {
- val events: Seq[ChangeMergedEvent] = Seq(
- aChangeMergedEvent("1", 1001l, newRev = "rev1", isMergeCommit = true),
- aChangeMergedEvent("2", 1002l, newRev = "rev2"),
- aChangeMergedEvent("3", 1003l, newRev = "rev3", isMergeCommit = true),
- aChangeMergedEvent("4", 1004l, newRev = "rev4"),
- aChangeMergedEvent("5", 1005l, newRev = "rev5")
- ).map(_.event)
-
- val summaries: Iterable[UserActivitySummary] = GerritEventsTransformations.extractUserActivitySummary(
- changes = events,
- year = 2018, month = 1, day = 10, hour = 1
- )
-
- summaries should have size 2
-
- summaries.foreach { summary =>
- inside(summary) {
- case UserActivitySummary(year, month, day, hour, name, email,_, _, _, _, _, _, _, _, _) =>
- year shouldBe 2018
- month shouldBe 1
- day shouldBe 10
- hour shouldBe 1
- name shouldBe "Administrator"
- email shouldBe "admin@example.com"
- }
- }
-
- summaries.foreach { summary =>
- inside(summary) {
- case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, _, last_commit_date, false) =>
- num_commits shouldBe 3
- last_commit_date shouldBe 1005000l
- commits should contain only(
- CommitInfo("rev2", 1002000l, false),
- CommitInfo("rev4", 1004000l, false),
- CommitInfo("rev5", 1005000l, false)
- )
-
- case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, _, last_commit_date, true) =>
- num_commits shouldBe 2
- last_commit_date shouldBe 1003000l
- commits should contain only(
- CommitInfo("rev1", 1001000l, true),
- CommitInfo("rev3", 1003000l, true)
- )
- }
- }
-
- }
- }
-
- "Pimped Per Project UserActivitySummary RDD" should {
- "Allow conversion to a DataFrame equivalent to what extracted from the Analytics plugin" in {
- import GerritEventsTransformations._
- import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
- import spark.implicits._
-
- val aliasDF = sc.parallelize(Seq(
- ("stefano_alias", "stefano@galarraga-org.com", "")
- )).toDF("author", "email", "organization")
-
- val expectedDate = System.currentTimeMillis
-
- val analyticsJobOutput =
- sc.parallelize(Seq(
- "project1" -> UserActivitySummary(2018, 1, 20, 10, "Stefano", "stefano@galarraga-org.com", 1, 2, 1, 10, 4, Array(CommitInfo("sha1", expectedDate, false)),
- Array("master", "stable-2.14"), expectedDate, false)
- ))
- .asEtlDataFrame(sql)
- .addOrganization()
- .handleAliases(Some(aliasDF))
- .dropCommits
-
- val expected = sc.parallelize(Seq(
- ("project1", "stefano_alias", "stefano@galarraga-org.com", 2018, 1, 20, 10, 2, 1, 10, 4, 1, expectedDate, false, Array("master", "stable-2.14"), "galarraga-org")
- )).toDF("project", "author", "email", "year", "month", "day", "hour", "num_files", "num_distinct_files",
- "added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "branches", "organization")
-
- val collected = analyticsJobOutput.collect()
-
- collected should contain theSameElementsAs expected.collect()
- }
- }
-
- "removeEventsForCommits" should {
- "remove any event modifying a gerrit repo to go toward the commits to be excluded" in {
- import GerritEventsTransformations._
-
- val toKeep1 = aRefUpdatedEvent("oldRev", "RevToKeep")
- val toKeep2 = aChangeMergedEvent(changeId = "changeId2", newRev = "RevToKeep2")
- val events: RDD[GerritRefHasNewRevisionEvent] = sc.parallelize(Seq(
- aRefUpdatedEvent("oldRev", "RevToExclude1"),
- toKeep1,
- aRefUpdatedEvent("oldRev", "RevToExclude2"),
- aChangeMergedEvent(changeId = "changeId1", newRev = "RevToExclude3"),
- toKeep2
- ).map(_.event))
-
- events
- .removeEventsForCommits(sc.parallelize(Seq("RevToExclude1", "RevToExclude2", "RevToExclude3")))
- .collect() should contain only (toKeep1.event, toKeep2.event )
- }
- }
-}
-
-trait EventFixture {
-
- // Forcing early type failures
- case class JsonEvent[T <: GerritJsonEvent](json: String) {
- val event: T = EventParser.fromJson(json).get.asInstanceOf[T]
- }
-
- val refUpdated: JsonEvent[RefUpdatedEvent] = aRefUpdatedEvent(oldRev = "863b64002f2a9922deba69407804a44703c996e0", newRev = "d3131be8d7c920badd28b70d8c039682568c8de5")
-
- val changeMerged: JsonEvent[ChangeMergedEvent] = aChangeMergedEvent("I5e6b5a3bbe8a29fb0393e4a28da536e0a198b755")
-
- def aRefUpdatedEvent(oldRev: String, newRev: String, createdOn: Long = 1000l) = JsonEvent[RefUpdatedEvent](
- s"""{"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"},
- | "refUpdate":{"oldRev":"$oldRev",
- | "newRev":"$newRev",
- | "refName": "refs/heads/master","project":"subcut"},
- |"type":"ref-updated","eventCreatedOn":$createdOn}""".stripMargin)
-
- def aChangeMergedEvent(changeId: String, createdOnInSecs: Long = 1000l, newRev: String = "863b64002f2a9922deba69407804a44703c996e0",
- isMergeCommit: Boolean = false, insertions: Integer = 0, deletions: Integer = 0, branch: String = "master") = JsonEvent[ChangeMergedEvent](
- s"""{
- |"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"},
- |"newRev":"$newRev",
- |"patchSet":{
- | "number":1,
- | "revision":"$newRev",
- | "parents": ${if (isMergeCommit) """["4a4e59272f1f88824d805c0f4233c1ee7331e986", "4a4e59272f1f88824d805c0f4233c1ee7331e987"]""" else """["4a4e59272f1f88824d805c0f4233c1ee7331e986"]"""},
- | "ref":"refs/changes/01/1/1",
- | "uploader":{"name":"Administrator","email":"admin@example.com","username":"admin"},
- | "createdOn":1516530259,
- | "author":{"name":"Stefano Galarraga","email":"galarragas@gmail.com","username":""},
- | "isDraft":false,
- | "kind":"REWORK",
- | "sizeInsertions":$insertions,
- | "sizeDeletions":$deletions
- |},
- |"change":{
- | "project":"subcut","branch":"$branch","topic":"TestEvents","id":"$changeId","number":1,"subject":"Generating some changes to test events",
- | "owner":{"name":"Administrator","email":"admin@example.com","username":"admin"},
- | "url":"http://842860da5b33:8080/1","commitMessage":"Generating some changes to test events Change-Id: $changeId",
- | "createdOn":1516530259,"status":"MERGED"
- |},
- |"project":{"name":"subcut"},
- |"refName":"refs/heads/$branch",
- |"changeKey":{"id":"$changeId"},
- |"type":"change-merged",
- |"eventCreatedOn": $createdOnInSecs
- |}""".stripMargin)
-
-}
\ No newline at end of file
diff --git a/src/test/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformationsSpec.scala
new file mode 100644
index 0000000..043bd5b
--- /dev/null
+++ b/src/test/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformationsSpec.scala
@@ -0,0 +1,424 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.gitcommits.engine.events
+
+import com.gerritforge.analytics.SparkTestSupport
+import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations.{
+ CommitInfo,
+ UserActivitySummary
+}
+import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations.NotParsableJsonEvent
+import org.apache.spark.rdd.RDD
+import org.scalatest.{Inside, Matchers, WordSpec}
+
+class GerritEventsTransformationsSpec
+ extends WordSpec
+ with Matchers
+ with SparkTestSupport
+ with Inside
+ with EventFixture {
+
+ "tryParseGerritTriggeredEvent" should {
+
+ implicit val eventParser: GerritJsonEventParser = EventParser
+
+ "Parse a correctly formed event" in new EventFixture {
+ GerritEventsTransformations.tryParseGerritTriggeredEvent(refUpdated.json) shouldBe Right(
+ refUpdated.event)
+ }
+
+ "Return a description of the failure in with the original event source a Left object if the JSON provided is invalid" in {
+ val invalidJson = "invalid json string"
+ GerritEventsTransformations.tryParseGerritTriggeredEvent(invalidJson) shouldBe Left(
+ NotParsableJsonEvent(invalidJson, "unknown token i - Near: i"))
+ }
+
+ "Return a description of the failure with the original event source in a Left object if the JSON event is not supported" in {
+ val unsupportedEvent = """{"type":"ref-updated-UNSUPPORTED","eventCreatedOn":1516531868}"""
+
+ GerritEventsTransformations.tryParseGerritTriggeredEvent(unsupportedEvent) shouldBe Left(
+ NotParsableJsonEvent(unsupportedEvent, "Unsupported event type 'ref-updated-UNSUPPORTED'"))
+ }
+ }
+
+ "PimpedJsonRDD" should {
+ "Convert an RDD of JSON events into an RDD of events or unparsed json strings" in {
+ val jsonRdd = sc.parallelize(Seq(refUpdated.json, changeMerged.json, "invalid json string"))
+
+ import GerritEventsTransformations._
+
+ jsonRdd.parseEvents(EventParser).collect() should contain only (
+ Right(refUpdated.event),
+ Right(changeMerged.event),
+ Left(NotParsableJsonEvent("invalid json string", "unknown token i - Near: i"))
+ )
+ }
+ }
+
+ "extractUserActivitySummary" should {
+ implicit val eventParser: GerritJsonEventParser = EventParser
+
+ "Build one UserActivitySummary object if given a series of non-merge commits" in {
+ val events: Seq[ChangeMergedEvent] = Seq(
+ aChangeMergedEvent("1",
+ 1001l,
+ newRev = "rev1",
+ insertions = 2,
+ deletions = 1,
+ branch = "stable-1.14"),
+ aChangeMergedEvent("2",
+ 1002l,
+ newRev = "rev2",
+ insertions = 3,
+ deletions = 0,
+ branch = "stable-1.14"),
+ aChangeMergedEvent("3",
+ 1003l,
+ newRev = "rev3",
+ insertions = 1,
+ deletions = 4,
+ branch = "stable-1.14"),
+ aChangeMergedEvent("4",
+ 1004l,
+ newRev = "rev4",
+ insertions = 0,
+ deletions = 2,
+ branch = "stable-1.14"),
+ aChangeMergedEvent("5",
+ 1005l,
+ newRev = "rev5",
+ insertions = 1,
+ deletions = 1,
+ branch = "stable-1.14")
+ ).map(_.event)
+
+ val summaries: Iterable[UserActivitySummary] =
+ GerritEventsTransformations.extractUserActivitySummary(
+ changes = events,
+ year = 2018,
+ month = 1,
+ day = 10,
+ hour = 1
+ )
+
+ summaries should have size 1
+
+ inside(summaries.head) {
+ case UserActivitySummary(year,
+ month,
+ day,
+ hour,
+ name,
+ email,
+ num_commits,
+ _,
+ _,
+ added_lines,
+ deleted_lines,
+ commits,
+ branches,
+ last_commit_date,
+ is_merge) =>
+ year shouldBe 2018
+ month shouldBe 1
+ day shouldBe 10
+ hour shouldBe 1
+ name shouldBe "Administrator"
+ email shouldBe "admin@example.com"
+ num_commits shouldBe events.size
+ last_commit_date shouldBe 1005000l
+ is_merge shouldBe false
+ added_lines shouldBe 7
+ deleted_lines shouldBe 8
+ commits should contain only (
+ CommitInfo("rev1", 1001000l, false),
+ CommitInfo("rev2", 1002000l, false),
+ CommitInfo("rev3", 1003000l, false),
+ CommitInfo("rev4", 1004000l, false),
+ CommitInfo("rev5", 1005000l, false)
+ )
+ branches should contain only "stable-1.14"
+ }
+ }
+
+ "Build two UserActivitySummaries object if given a mixed series of merge and non merge commits" in {
+ val events: Seq[ChangeMergedEvent] = Seq(
+ aChangeMergedEvent("1", 1001l, newRev = "rev1", isMergeCommit = true),
+ aChangeMergedEvent("2", 1002l, newRev = "rev2"),
+ aChangeMergedEvent("3", 1003l, newRev = "rev3", isMergeCommit = true),
+ aChangeMergedEvent("4", 1004l, newRev = "rev4"),
+ aChangeMergedEvent("5", 1005l, newRev = "rev5")
+ ).map(_.event)
+
+ val summaries: Iterable[UserActivitySummary] =
+ GerritEventsTransformations.extractUserActivitySummary(
+ changes = events,
+ year = 2018,
+ month = 1,
+ day = 10,
+ hour = 1
+ )
+
+ summaries should have size 2
+
+ summaries.foreach { summary =>
+ inside(summary) {
+ case UserActivitySummary(year,
+ month,
+ day,
+ hour,
+ name,
+ email,
+ _,
+ _,
+ _,
+ _,
+ _,
+ _,
+ _,
+ _,
+ _) =>
+ year shouldBe 2018
+ month shouldBe 1
+ day shouldBe 10
+ hour shouldBe 1
+ name shouldBe "Administrator"
+ email shouldBe "admin@example.com"
+ }
+ }
+
+ summaries.foreach { summary =>
+ inside(summary) {
+ case UserActivitySummary(_,
+ _,
+ _,
+ _,
+ _,
+ _,
+ num_commits,
+ _,
+ _,
+ _,
+ _,
+ commits,
+ _,
+ last_commit_date,
+ false) =>
+ num_commits shouldBe 3
+ last_commit_date shouldBe 1005000l
+ commits should contain only (
+ CommitInfo("rev2", 1002000l, false),
+ CommitInfo("rev4", 1004000l, false),
+ CommitInfo("rev5", 1005000l, false)
+ )
+
+ case UserActivitySummary(_,
+ _,
+ _,
+ _,
+ _,
+ _,
+ num_commits,
+ _,
+ _,
+ _,
+ _,
+ commits,
+ _,
+ last_commit_date,
+ true) =>
+ num_commits shouldBe 2
+ last_commit_date shouldBe 1003000l
+ commits should contain only (
+ CommitInfo("rev1", 1001000l, true),
+ CommitInfo("rev3", 1003000l, true)
+ )
+ }
+ }
+
+ }
+ }
+
+ "Pimped Per Project UserActivitySummary RDD" should {
+ "Allow conversion to a DataFrame equivalent to what extracted from the Analytics plugin" in {
+ import GerritEventsTransformations._
+ import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations._
+ import spark.implicits._
+
+ val aliasDF = sc
+ .parallelize(
+ Seq(
+ ("stefano_alias", "stefano@galarraga-org.com", "")
+ ))
+ .toDF("author", "email", "organization")
+
+ val expectedDate = System.currentTimeMillis
+
+ val analyticsJobOutput =
+ sc.parallelize(
+ Seq(
+ "project1" -> UserActivitySummary(
+ 2018,
+ 1,
+ 20,
+ 10,
+ "Stefano",
+ "stefano@galarraga-org.com",
+ 1,
+ 2,
+ 1,
+ 10,
+ 4,
+ Array(CommitInfo("sha1", expectedDate, false)),
+ Array("master", "stable-2.14"),
+ expectedDate,
+ false
+ )
+ ))
+ .asEtlDataFrame(sql)
+ .addOrganization()
+ .handleAliases(Some(aliasDF))
+ .dropCommits
+
+ val expected = sc
+ .parallelize(
+ Seq(
+ ("project1",
+ "stefano_alias",
+ "stefano@galarraga-org.com",
+ 2018,
+ 1,
+ 20,
+ 10,
+ 2,
+ 1,
+ 10,
+ 4,
+ 1,
+ expectedDate,
+ false,
+ Array("master", "stable-2.14"),
+ "galarraga-org")
+ ))
+ .toDF(
+ "project",
+ "author",
+ "email",
+ "year",
+ "month",
+ "day",
+ "hour",
+ "num_files",
+ "num_distinct_files",
+ "added_lines",
+ "deleted_lines",
+ "num_commits",
+ "last_commit_date",
+ "is_merge",
+ "branches",
+ "organization"
+ )
+
+ val collected = analyticsJobOutput.collect()
+
+ collected should contain theSameElementsAs expected.collect()
+ }
+ }
+
+ "removeEventsForCommits" should {
+ "remove any event modifying a gerrit repo to go toward the commits to be excluded" in {
+ import GerritEventsTransformations._
+
+ val toKeep1 = aRefUpdatedEvent("oldRev", "RevToKeep")
+ val toKeep2 = aChangeMergedEvent(changeId = "changeId2", newRev = "RevToKeep2")
+ val events: RDD[GerritRefHasNewRevisionEvent] = sc.parallelize(
+ Seq(
+ aRefUpdatedEvent("oldRev", "RevToExclude1"),
+ toKeep1,
+ aRefUpdatedEvent("oldRev", "RevToExclude2"),
+ aChangeMergedEvent(changeId = "changeId1", newRev = "RevToExclude3"),
+ toKeep2
+ ).map(_.event))
+
+ events
+ .removeEventsForCommits(
+ sc.parallelize(Seq("RevToExclude1", "RevToExclude2", "RevToExclude3")))
+ .collect() should contain only (toKeep1.event, toKeep2.event)
+ }
+ }
+}
+
+trait EventFixture {
+
+ // Forcing early type failures
+ case class JsonEvent[T <: GerritJsonEvent](json: String) {
+ val event: T = EventParser.fromJson(json).get.asInstanceOf[T]
+ }
+
+ val refUpdated: JsonEvent[RefUpdatedEvent] = aRefUpdatedEvent(
+ oldRev = "863b64002f2a9922deba69407804a44703c996e0",
+ newRev = "d3131be8d7c920badd28b70d8c039682568c8de5")
+
+ val changeMerged: JsonEvent[ChangeMergedEvent] = aChangeMergedEvent(
+ "I5e6b5a3bbe8a29fb0393e4a28da536e0a198b755")
+
+ def aRefUpdatedEvent(oldRev: String, newRev: String, createdOn: Long = 1000l) =
+ JsonEvent[RefUpdatedEvent](
+ s"""{"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"},
+ | "refUpdate":{"oldRev":"$oldRev",
+ | "newRev":"$newRev",
+ | "refName": "refs/heads/master","project":"subcut"},
+ |"type":"ref-updated","eventCreatedOn":$createdOn}""".stripMargin)
+
+ def aChangeMergedEvent(changeId: String,
+ createdOnInSecs: Long = 1000l,
+ newRev: String = "863b64002f2a9922deba69407804a44703c996e0",
+ isMergeCommit: Boolean = false,
+ insertions: Integer = 0,
+ deletions: Integer = 0,
+ branch: String = "master") =
+ JsonEvent[ChangeMergedEvent](
+ s"""{
+ |"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"},
+ |"newRev":"$newRev",
+ |"patchSet":{
+ | "number":1,
+ | "revision":"$newRev",
+ | "parents": ${if (isMergeCommit)
+ """["4a4e59272f1f88824d805c0f4233c1ee7331e986", "4a4e59272f1f88824d805c0f4233c1ee7331e987"]"""
+ else """["4a4e59272f1f88824d805c0f4233c1ee7331e986"]"""},
+ | "ref":"refs/changes/01/1/1",
+ | "uploader":{"name":"Administrator","email":"admin@example.com","username":"admin"},
+ | "createdOn":1516530259,
+ | "author":{"name":"Stefano Galarraga","email":"galarragas@gmail.com","username":""},
+ | "isDraft":false,
+ | "kind":"REWORK",
+ | "sizeInsertions":$insertions,
+ | "sizeDeletions":$deletions
+ |},
+ |"change":{
+ | "project":"subcut","branch":"$branch","topic":"TestEvents","id":"$changeId","number":1,"subject":"Generating some changes to test events",
+ | "owner":{"name":"Administrator","email":"admin@example.com","username":"admin"},
+ | "url":"http://842860da5b33:8080/1","commitMessage":"Generating some changes to test events Change-Id: $changeId",
+ | "createdOn":1516530259,"status":"MERGED"
+ |},
+ |"project":{"name":"subcut"},
+ |"refName":"refs/heads/$branch",
+ |"changeKey":{"id":"$changeId"},
+ |"type":"change-merged",
+ |"eventCreatedOn": $createdOnInSecs
+ |}""".stripMargin)
+
+}
diff --git a/src/test/scala/com/gerritforge/analytics/model/EmailSpec.scala b/src/test/scala/com/gerritforge/analytics/gitcommits/model/EmailSpec.scala
similarity index 97%
rename from src/test/scala/com/gerritforge/analytics/model/EmailSpec.scala
rename to src/test/scala/com/gerritforge/analytics/gitcommits/model/EmailSpec.scala
index e18e328..fe65ec5 100644
--- a/src/test/scala/com/gerritforge/analytics/model/EmailSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/gitcommits/model/EmailSpec.scala
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.gerritforge.analytics.model
+package com.gerritforge.analytics.gitcommits.model
import org.scalatest.{FlatSpec, Matchers}
@@ -81,4 +81,4 @@
case _ =>
}
}
-}
\ No newline at end of file
+}
diff --git a/src/test/scala/com/gerritforge/analytics/model/GerritEndpointConfigTest.scala b/src/test/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfigTest.scala
similarity index 95%
rename from src/test/scala/com/gerritforge/analytics/model/GerritEndpointConfigTest.scala
rename to src/test/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfigTest.scala
index 591184b..a839430 100644
--- a/src/test/scala/com/gerritforge/analytics/model/GerritEndpointConfigTest.scala
+++ b/src/test/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfigTest.scala
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.gerritforge.analytics.model
+package com.gerritforge.analytics.gitcommits.model
import org.scalatest.{FlatSpec, Matchers}
diff --git a/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala b/src/test/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOpsSpec.scala
similarity index 95%
rename from src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala
rename to src/test/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOpsSpec.scala
index 1c9adb8..c8fc602 100644
--- a/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOpsSpec.scala
@@ -12,16 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.gerritforge.analytics.support.ops
+package com.gerritforge.analytics.gitcommits.support.ops
import java.time.{LocalDate, LocalDateTime, ZoneOffset}
-import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
+import com.gerritforge.analytics.gitcommits.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
import org.scalatest.{FlatSpec, Matchers}
class AnalyticsTimeOpsSpec extends FlatSpec with Matchers {
-
"String parser - Given a correct string and date format" should "return an epoch value" in {
val epochValueUTC =
LocalDateTime
@@ -49,7 +48,6 @@
stringDate.parseStringToLocalDate(dateFormat).get should equal(utcLocalDate)
}
-
"String parser - An incorrect string a given format" should "return None" in {
val stringDate = "2018-01-01 12:00:00.000000000"
val dateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd
@@ -58,7 +56,6 @@
stringDate.parseStringToUTCEpoch(dateFormat) should equal(None)
}
-
"Simple Date Formats" should "convert to the correct strings - yyyyMMddHH" in {
val epochValueUTC =
LocalDateTime
@@ -103,7 +100,6 @@
AnalyticsDateTimeFormater.yyyy.format(epochValueUTC) should equal(yyyyStr)
}
-
"UTC conversion" should "check date operations return always UTC" in {
val dateTime =
LocalDateTime