Use gerrit events to calculate UserActivitySummary
The code reads events generated by the events plugin and exported to file system
to generate the same DataFrame structure as the one importing data from the analytics
plugin.
Change-Id: I664cd4291f2674181a570967fc813e8f7615b84e
Jira-Id: GERICS-634
diff --git a/README.md b/README.md
index 7fc13ee..6205ee7 100644
--- a/README.md
+++ b/README.md
@@ -14,6 +14,8 @@
--since 2000-06-01 \
--aggregate email_hour \
--url http://gerrit.mycompany.com \
+ --events file:///tmp/gerrit-events-export.json
+ --writeNotProcessedEventsTo file:///tmp/failed-events
-e gerrit/analytics
```
@@ -31,6 +33,12 @@
the system temporary directory
- -a --email-aliases (*optional*) "emails to author alias" input data path.
+- --events location where to load the Gerrit Events
+ If not specified events will be ignored
+- --writeNotProcessedEventsTo location where to write a TSV file containing the events we couldn't process
+ with a description fo the reason why
+
+
CSVs with 3 columns are expected in input.
Here an example of the required files structure:
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index 846fb36..b52540f 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -89,6 +89,19 @@
val schema = Encoders.product[UserActivitySummary].schema
+ /**
+ * Assumes the data frame contains the 'commits' column with an array of CommitInfo in it
+ * and returns a DataSet[String] with the commits SHA1
+ */
+ def extractCommits(df: DataFrame)(implicit spark: SparkSession) : Dataset[String] = {
+ import spark.implicits._
+
+ df
+ .select(explode($"commits.sha1"))
+ .as[String]
+ .distinct() //might be useless this distinct, just want to be sure I'm respecting the contract
+ }
+
implicit class PimpedDataFrame(val df: DataFrame) extends AnyVal {
def transformCommitterInfo()(implicit spark: SparkSession): DataFrame = {
import org.apache.spark.sql.functions.from_json
@@ -100,7 +113,7 @@
"json.num_files as num_files", "json.num_distinct_files as num_distinct_files",
"json.added_lines as added_lines", "json.deleted_lines as deleted_lines",
"json.num_commits as num_commits", "json.last_commit_date as last_commit_date",
- "json.is_merge as is_merge"
+ "json.is_merge as is_merge", "json.commits as commits"
)
}
@@ -127,15 +140,23 @@
df.withColumn(columnName, longDateToISOUdf(col(columnName)))
}
+ def dropCommits(implicit spark: SparkSession): DataFrame = {
+ df.drop("commits")
+ }
+
def addOrganization()(implicit spark: SparkSession): DataFrame =
df.withColumn("organization", emailToDomainUdf(col("email")))
+ def commitSet(implicit spark: SparkSession) : Dataset[String] = {
+ extractCommits(df)
+ }
def dashboardStats(aliasesDFMaybe: Option[DataFrame])(implicit spark: SparkSession) : DataFrame = {
df
.addOrganization()
.handleAliases(aliasesDFMaybe)
.convertDates("last_commit_date")
+ .dropCommits
}
}
diff --git a/src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala b/src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala
new file mode 100644
index 0000000..9b5990a
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala
@@ -0,0 +1,84 @@
+// Copyright (C) 2017 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.engine.events
+
+import java.text.{DateFormat, SimpleDateFormat}
+import java.time.{LocalDateTime, ZoneOffset}
+
+import scala.util.Try
+
+sealed trait AggregationStrategy extends Serializable {
+ def aggregationKey(event: GerritJsonEvent): String
+
+ case class DateTimeParts(year: Integer, month: Integer, day: Integer, hour: Integer)
+
+ def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts = {
+ val date = LocalDateTime.ofEpochSecond(event.eventCreatedOn, 0, ZoneOffset.UTC)
+ DateTimeParts(date.getYear, date.getMonthValue, date.getDayOfMonth, date.getHour)
+ }
+}
+
+object AggregationStrategy {
+
+ @throws(classOf[IllegalArgumentException])
+ def byName(aggregationName: String): Try[AggregationStrategy] = Try {
+ aggregationName.toUpperCase match {
+ case "EMAIL" => aggregateByEmail
+ case "EMAIL_YEAR" => aggregateByEmailAndYear
+ case "EMAIL_MONTH" => aggregateByEmailAndMonth
+ case "EMAIL_DAY" => aggregateByEmailAndDay
+ case "EMAIL_HOUR" => aggregateByEmailAndHour
+ case unsupported =>
+ throw new IllegalArgumentException(s"Unsupported aggregation '$aggregationName")
+ }
+ }
+
+ object aggregateByEmail extends AggregationStrategy {
+ override def aggregationKey(event: GerritJsonEvent): String = event.account.email
+ }
+
+ trait EmailAndTimeBasedAggregation extends AggregationStrategy {
+ val dateFormat: DateFormat
+
+ final override def aggregationKey(event: GerritJsonEvent): String = {
+ s"${event.account.email}/${dateFormat.format(event.eventCreatedOn)}"
+ }
+ }
+
+ object aggregateByEmailAndHour extends EmailAndTimeBasedAggregation {
+ val dateFormat = new SimpleDateFormat("yyyyMMddHH")
+ }
+
+ object aggregateByEmailAndDay extends EmailAndTimeBasedAggregation {
+ val dateFormat = new SimpleDateFormat("yyyyMMdd")
+
+ override def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts =
+ super.decomposeTimeOfAggregatedEvent(event).copy(hour = 0)
+ }
+
+ object aggregateByEmailAndMonth extends EmailAndTimeBasedAggregation {
+ val dateFormat = new SimpleDateFormat("yyyyMM")
+
+ override def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts =
+ super.decomposeTimeOfAggregatedEvent(event).copy(day = 0, hour = 0)
+ }
+
+ object aggregateByEmailAndYear extends EmailAndTimeBasedAggregation {
+ val dateFormat = new SimpleDateFormat("yyyy")
+
+ override def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts =
+ super.decomposeTimeOfAggregatedEvent(event).copy(month = 0, day = 0, hour = 0)
+ }
+}
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala
new file mode 100644
index 0000000..4b03b72
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala
@@ -0,0 +1,189 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.engine.events
+
+import java.time.{LocalDateTime, ZoneOffset}
+
+import com.gerritforge.analytics.engine.GerritAnalyticsTransformations.{CommitInfo, UserActivitySummary}
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
+
+import scala.util.{Failure, Success}
+
+
+object GerritEventsTransformations extends LazyLogging {
+
+ case class NotParsableJsonEvent(source: String, failureDescription: String)
+
+ implicit class PimpedJsonRDD(val self: RDD[String]) extends AnyVal {
+ def parseEvents(implicit eventParser: GerritJsonEventParser): RDD[Either[NotParsableJsonEvent, GerritJsonEvent]] = {
+ self.map(tryParseGerritTriggeredEvent)
+ }
+ }
+
+ implicit class PimpedGerritJsonEventRDD[T <: GerritJsonEvent](val self: RDD[T]) extends AnyVal {
+ /**
+ * Returns the UTC date of the earliest event in the RDD
+ *
+ * @return
+ */
+ def earliestEventTime: LocalDateTime = {
+ val earliestEpoch = self.map { event =>
+ event.eventCreatedOn
+ }.min()
+
+ LocalDateTime.ofEpochSecond(earliestEpoch, 0, ZoneOffset.UTC)
+ }
+
+ def repositoryWithNewRevisionEvents: RDD[GerritRefHasNewRevisionEvent] = self.collect { case e: GerritRefHasNewRevisionEvent => e }
+ }
+
+ implicit class PimpedRepositoryModifiedGerritEventRDD(val self: RDD[GerritRefHasNewRevisionEvent]) extends AnyVal {
+ def removeEventsForCommits(commitsToExclude: RDD[String]): RDD[GerritRefHasNewRevisionEvent] = {
+ self
+ .keyBy(_.newRev)
+ .leftOuterJoin(commitsToExclude.keyBy(identity))
+ .flatMap[GerritRefHasNewRevisionEvent] {
+ case (_, (_, Some(_))) => None
+
+ case (_, (event, None)) => Some(event)
+ }
+ }
+
+ def userActivitySummaryPerProject(aggregationStrategy: AggregationStrategy, eventParser: GerritJsonEventParser): RDD[(String, UserActivitySummary)] = {
+ self
+ .collect { case e: ChangeMergedEvent => e }
+ .groupBy { event =>
+ (event.change.project, aggregationStrategy.aggregationKey(event))
+ }.flatMap { case ((project, _), changesPerUserAndTimeWindow) =>
+ val dateParts: aggregationStrategy.DateTimeParts = aggregationStrategy.decomposeTimeOfAggregatedEvent(changesPerUserAndTimeWindow.head)
+
+ // val branchesPerCommitForThisProject = broadcastBranchesPerCommitByProject.value.getOrElse(project, Map.empty)
+
+ extractUserActivitySummary(
+ changesPerUserAndTimeWindow, dateParts.year, dateParts.month, dateParts.day, dateParts.hour
+ ).map(
+ project -> _
+ )
+ }
+ }
+ }
+
+ implicit class PimpedUserActivitySummaryPerProjectRDD(val self: RDD[(String, UserActivitySummary)]) extends AnyVal {
+ def asEtlDataFrame(implicit sqlContext: SQLContext): DataFrame = {
+ convertAsDataFrame(self)
+ }
+ }
+
+ def tryParseGerritTriggeredEvent(eventJson: String)(implicit eventParser: GerritJsonEventParser): Either[NotParsableJsonEvent, GerritJsonEvent] = {
+ eventParser.fromJson(eventJson) match {
+ case Success(event) => Right(event)
+ case Failure(exception) =>
+ logger.warn(s"Unable to parse event '$eventJson'", exception)
+ Left(NotParsableJsonEvent(eventJson, exception.getMessage.replace("\n", " - ")))
+ }
+ }
+
+
+ private def addCommitSummary(summaryTemplate: UserActivitySummary, changes: Iterable[ChangeMergedEvent]): UserActivitySummary = {
+ // We assume all the changes are consistent on the is_merge filter
+ summaryTemplate.copy(
+ num_commits = changes.size,
+ commits = changes.map(changeMerged =>
+ CommitInfo(changeMerged.newRev, changeMerged.eventCreatedOn * 1000, changeMerged.patchSet.parents.size > 1)
+ ).toArray,
+ last_commit_date = changes.map(_.eventCreatedOn).max * 1000,
+ added_lines = changes.map(_.patchSet.sizeInsertions).sum,
+ deleted_lines = changes.map(_.patchSet.sizeDeletions).sum,
+ // We cannot calculate anything about files until we export the list of files in the gerrit event
+ num_files = 0,
+ num_distinct_files = 0
+ )
+ }
+
+ /**
+ * Extract up to two UserActivitySummary object from the given iterable of ChangeMerged events
+ * depending if there is a mix of merge and non merge changes
+ */
+ def extractUserActivitySummary(changes: Iterable[ChangeMergedEvent],
+ //commitIdToBranchesMap: Map[String, Set[String]],
+ year: Integer,
+ month: Integer,
+ day: Integer,
+ hour: Integer): Iterable[UserActivitySummary] = {
+
+ changes.headOption.fold(List.empty[UserActivitySummary]) { firstChange =>
+ val name = firstChange.account.name
+ val email = firstChange.account.email
+
+ val summaryTemplate = UserActivitySummary(
+ year = year,
+ month = month,
+ day = day,
+ hour = hour,
+ name = name,
+ email = email,
+ num_commits = 0,
+ num_files = 0,
+ num_distinct_files = 0,
+ added_lines = 0,
+ deleted_lines = 0,
+ commits = Array.empty,
+ last_commit_date = 0l,
+ is_merge = false)
+
+ val (mergeCommits, nonMergeCommits) =
+ changes.foldLeft(List.empty[ChangeMergedEvent], List.empty[ChangeMergedEvent]) { case ((mergeCommits, nonMergeCommits), currentCommit) =>
+ if (currentCommit.patchSet.parents.size > 1) {
+ (currentCommit :: mergeCommits, nonMergeCommits)
+ } else {
+ (mergeCommits, currentCommit :: nonMergeCommits)
+ }
+ }
+
+ List(
+ (summaryTemplate.copy(is_merge = true), mergeCommits.reverse),
+ (summaryTemplate.copy(is_merge = false), nonMergeCommits.reverse)
+ )
+ .filter(_._2.nonEmpty)
+ .map { case (template, commits) =>
+ addCommitSummary(template, commits)
+ }
+ }
+ }
+
+ def convertAsDataFrame(self: RDD[(String, UserActivitySummary)])(implicit sqlContext: SQLContext): DataFrame = {
+ import sqlContext.implicits._
+
+ self.map { case (project, summary) =>
+ (project, summary.name, summary.email, summary.year, summary.month, summary.day, summary.hour, summary.num_files, summary.num_distinct_files,
+ summary.added_lines, summary.deleted_lines, Option(summary.num_commits), Option(summary.last_commit_date), Option(summary.is_merge), summary.commits)
+ }.toDF("project", "author", "email", "year", "month", "day", "hour", "num_files", "num_distinct_files",
+ "added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "commits")
+ }
+
+ def getContributorStatsFromGerritEvents(events: RDD[GerritRefHasNewRevisionEvent],
+ commitsToExclude: RDD[String],
+ aggregationStrategy: AggregationStrategy)(implicit spark: SparkSession) = {
+ implicit val sqlCtx = spark.sqlContext
+
+ events
+ .removeEventsForCommits(commitsToExclude)
+ .userActivitySummaryPerProject(aggregationStrategy, EventParser)
+ .asEtlDataFrame
+ }
+
+}
diff --git a/src/main/scala/com/gerritforge/analytics/engine/events/model.scala b/src/main/scala/com/gerritforge/analytics/engine/events/model.scala
new file mode 100644
index 0000000..15956a6
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/engine/events/model.scala
@@ -0,0 +1,179 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.engine.events
+
+import com.typesafe.scalalogging.LazyLogging
+import scala.util.{Failure, Success, Try}
+
+// for documentation about this object model see https://gerrit-review.googlesource.com/Documentation/cmd-stream-events.html
+
+trait GerritJsonEventParser extends Serializable {
+ def fromJson(json: String): Try[GerritJsonEvent]
+}
+
+
+object EventParser extends GerritJsonEventParser with LazyLogging {
+ val ChangeMergedEventType = "change-merged"
+ val RefUpdatedEventType = "ref-updated"
+
+
+ override def fromJson(json: String): Try[GerritJsonEvent] = {
+ import org.json4s._
+ import org.json4s.native.JsonMethods._
+
+ implicit val formats: DefaultFormats.type = DefaultFormats
+
+ def tryParse[T](jvalue: JValue)(implicit formats: Formats, mf: scala.reflect.Manifest[T]): Try[T] = {
+ try {
+ Success(jvalue.extract[T])
+ } catch {
+ case e: MappingException =>
+ Failure(MappingException(s"Invalid event of type `${mf.runtimeClass.getName}`: ${e.getMessage}", e))
+ }
+ }
+
+ Try(parse(json)).flatMap { parsedJson =>
+ parsedJson \ "type" match {
+ case JString(ChangeMergedEventType) =>
+ tryParse[ChangeMergedEvent](parsedJson)
+
+ case JString(RefUpdatedEventType) =>
+ tryParse[RefUpdatedEvent](parsedJson)
+
+ case JNothing =>
+ Failure(new IllegalArgumentException("Invalid JSON object received, missing 'type' field"))
+
+ case JString(unsupportedType) =>
+ Failure(new IllegalArgumentException(s"Unsupported event type '$unsupportedType'"))
+
+ case unexpectedJsonType =>
+ Failure(new IllegalArgumentException(s"Invalid JSON format for field 'type' `${unexpectedJsonType.getClass.getName}`"))
+ }
+ }
+ }
+}
+
+case class GerritAccount(name: String, email: String, username: String)
+object GerritAccount {
+ val NoAccountInfo = GerritAccount("", "", "")
+}
+
+case class GerritComment(reviewer: GerritAccount, message: String)
+
+case class GerritChange(project: String,
+ branch: String,
+ topic: Option[String],
+ id: String,
+ number: String,
+ subject: String,
+ commitMessage: String,
+ owner: GerritAccount,
+ url: String,
+ createdOn: Long,
+ lastUpdated: Option[Long],
+ comments: List[GerritComment])
+
+case class GerritApproval(
+ `type`: String,
+ value: String,
+ updated: Boolean = false,
+ oldValue: String,
+ by: GerritAccount
+ )
+
+
+case class GerritPatchSet(
+ number: String,
+ revision: String,
+ ref: String,
+ draft: Option[Boolean] ,
+ kind: String,
+ uploader: GerritAccount,
+ author: GerritAccount,
+ approvals: List[GerritApproval],
+ parents: List[String],
+ createdOn: Long,
+ sizeInsertions: Int,
+ sizeDeletions: Int
+ ) {
+ def isDraft : Boolean = draft.getOrElse(false)
+}
+
+
+sealed trait GerritJsonEvent {
+ def `type`: String
+
+ def eventCreatedOn: Long
+
+ def account: GerritAccount
+}
+
+
+sealed trait GerritRepositoryModifiedEvent extends GerritJsonEvent {
+ def modifiedProject: String
+
+ def modifiedRef: String
+}
+
+sealed trait GerritRefHasNewRevisionEvent extends GerritRepositoryModifiedEvent {
+ def newRev: String
+}
+
+sealed trait GerritChangeBasedEvent extends GerritRefHasNewRevisionEvent {
+ def change: GerritChange
+
+ def patchSet: GerritPatchSet
+
+ //def files: Set[String]
+}
+
+case class GerritChangeKey(id: String, `type`: Option[String], eventCreatedOn: Option[Long])
+
+//https://gerrit-review.googlesource.com/Documentation/cmd-stream-events.html#_change_merged
+case class ChangeMergedEvent(
+ override val change: GerritChange,
+ override val patchSet: GerritPatchSet,
+ submitter: GerritAccount,
+ override val newRev: String,
+ override val eventCreatedOn: Long,
+ changeKey: GerritChangeKey
+ ) extends GerritChangeBasedEvent {
+ override val `type`: String = "change-merged"
+
+ override def account: GerritAccount = submitter
+
+ def modifiedProject: String = change.project
+
+ def modifiedRef: String = patchSet.ref
+}
+
+case class GerritRefUpdate(project: String, refName: String, oldRev: String, newRev: String)
+
+
+case class RefUpdatedEvent(
+ refUpdate: GerritRefUpdate,
+ submitter: Option[GerritAccount],
+ override val eventCreatedOn: Long
+ ) extends GerritRefHasNewRevisionEvent {
+ override val `type`: String = "ref-updated"
+
+ override def account: GerritAccount = submitter.getOrElse(GerritAccount.NoAccountInfo)
+
+ def modifiedProject: String = refUpdate.project
+
+ def modifiedRef: String = refUpdate.refName
+
+ def newRev: String = refUpdate.newRev
+}
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index 1ce5b73..d52b135 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -17,15 +17,19 @@
import java.time.format.DateTimeFormatter
import java.time.{LocalDate, ZoneId}
+import com.gerritforge.analytics.engine.events.GerritEventsTransformations.NotParsableJsonEvent
+import com.gerritforge.analytics.engine.events.{AggregationStrategy, EventParser, GerritJsonEvent}
import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjectsSupport}
import com.typesafe.scalalogging.LazyLogging
import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import scopt.Read.reads
import scopt.{OptionParser, Read}
import scala.io.{Codec, Source}
import scala.util.control.NonFatal
+import scala.util.{Failure, Success}
object Main extends App with Job with LazyLogging {
@@ -74,6 +78,12 @@
opt[String]('a', "email-aliases") optional() validate fileExists action { (path, c) =>
c.copy(emailAlias = Some(path))
} text "\"emails to author alias\" input data path"
+ opt[String]("events") optional() action { (eventsPath, config) =>
+ config.copy(eventsPath = Some(eventsPath))
+ } text "location where to load the Gerrit Events"
+ opt[String]("writeNotProcessedEventsTo") optional() action{ (failedEventsPath, config) =>
+ config.copy(eventsFailureOutputPath = Some(failedEventsPath))
+ } text "location where to write a TSV file containing the events we couldn't process with a description fo the reason why"
}
cliOptionParser.parse(args, GerritEndpointConfig()) match {
@@ -102,18 +112,76 @@
def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
+ import com.gerritforge.analytics.engine.events.GerritEventsTransformations._
implicit val sc: SparkContext = spark.sparkContext
+ val aggregationStrategy: AggregationStrategy =
+ config.aggregate.flatMap { agg =>
+ AggregationStrategy.byName(agg) match {
+ case Success(strategy) => Some(strategy)
+ case Failure(e) =>
+ logger.warn(s"Invalid aggregation strategy '$agg", e)
+ None
+ }
+ }.getOrElse(AggregationStrategy.aggregateByEmail)
+
val projects = GerritProjectsSupport.parseJsonProjectListResponse(Source.fromURL(config.gerritProjectsUrl))
logger.info(s"Loaded a list of ${projects.size} projects ${if(projects.size > 20) projects.take(20).mkString("[", ",", ", ...]") else projects.mkString("[", ",", "]")}")
val aliasesDF = getAliasDF(config.emailAlias)
- getContributorStatsFromAnalyticsPlugin(spark.sparkContext.parallelize(projects), config.contributorsUrl)
- .dashboardStats(aliasesDF)
+ val events = loadEvents
+ val failedEvents: RDD[NotParsableJsonEvent] = events.collect { case Left(eventFailureDescription) => eventFailureDescription }
+
+ if(!failedEvents.isEmpty()) {
+ config.eventsFailureOutputPath.foreach { failurePath =>
+ logger.info(s"Events failures will be stored at '$failurePath'")
+
+ import spark.implicits._
+ failedEvents.toDF().write.option("sep", "\t").option("header", true).csv(failurePath)
+ }
+ }
+
+ //We might want to use the time of the events as information to feed to the collection of data from the repository
+ val repositoryAlteringEvents = events.collect { case Right(event) => event }.repositoryWithNewRevisionEvents
+
+ val firstEventDateMaybe: Option[LocalDate] = if(repositoryAlteringEvents.isEmpty()) None else Some(repositoryAlteringEvents.earliestEventTime.toLocalDate)
+
+ val configWithOverriddenUntil = firstEventDateMaybe.fold(config) { firstEventDate =>
+ val lastAggregationDate = firstEventDate.plusMonths(1)
+ if(lastAggregationDate.isBefore(LocalDate.now())) {
+ logger.info(s"Overriding 'until' date '${config.until}' with '$lastAggregationDate' since events ara available until $firstEventDate")
+ config.copy(until = Some(lastAggregationDate))
+ } else {
+ config
+ }
+ }
+
+ val statsFromAnalyticsPlugin =
+ getContributorStatsFromAnalyticsPlugin(spark.sparkContext.parallelize(projects), configWithOverriddenUntil.contributorsUrl)
+
+ val statsFromEvents = getContributorStatsFromGerritEvents(repositoryAlteringEvents, statsFromAnalyticsPlugin.commitSet.rdd, aggregationStrategy)
+
+ require(statsFromAnalyticsPlugin.schema == statsFromEvents.schema,
+ s""" Schemas from the stats collected from events and from the analytics datasets differs!!
+ | From analytics plugin: ${statsFromAnalyticsPlugin.schema}
+ | From gerrit events: ${statsFromEvents.schema}
+ """.stripMargin)
+
+ (statsFromAnalyticsPlugin union statsFromEvents).dashboardStats(aliasesDF)
+ }
+
+ def loadEvents(implicit config: GerritEndpointConfig, spark: SparkSession): RDD[Either[NotParsableJsonEvent,GerritJsonEvent]] = { // toDF
+ import com.gerritforge.analytics.engine.events.GerritEventsTransformations._
+
+ config.eventsPath.fold(spark.sparkContext.emptyRDD[Either[NotParsableJsonEvent,GerritJsonEvent]]) { eventsPath =>
+ spark
+ .read.textFile(eventsPath).rdd
+ .parseEvents(EventParser)
+ }
}
def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
index 316847f..899e56a 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
@@ -24,7 +24,10 @@
since: Option[LocalDate] = None,
until: Option[LocalDate] = None,
aggregate: Option[String] = None,
- emailAlias: Option[String] = None) {
+ emailAlias: Option[String] = None,
+ eventsPath: Option[String] = None,
+ eventsFailureOutputPath: Option[String] = None
+ ) {
val gerritProjectsUrl: String = s"${baseUrl}/projects/" + prefix.fold("")("?p=" + _)
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index f8c0fab..95b2fe7 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -24,6 +24,7 @@
import org.json4s.jackson.JsonMethods.{compact, render}
import org.scalatest.{FlatSpec, Inside, Matchers}
+import scala.collection.mutable
import scala.io.Source
class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers with SparkTestSupport with Inside {
@@ -102,17 +103,20 @@
df.count should be(3)
val collected = df.collect
- df.schema.fields.map(_.name) should contain inOrder (
+ df.schema.fields.map(_.name) should contain inOrder(
"project", "author", "email",
"year", "month", "day", "hour",
"num_files", "num_distinct_files", "added_lines", "deleted_lines",
"num_commits", "last_commit_date",
- "is_merge")
+ "is_merge", "commits")
collected should contain allOf(
- Row("p1", "a", "a@mail.com", 2017, 9, 11, 23, 2, 2, 1, 1, 1, 0, false),
- Row("p2", "b", "b@mail.com", 2017, 9, 11, 23, 2, 3, 1, 1, 428, 1500000000000L, true),
- Row("p3", "c", "c@mail.com", null, null, null, null, 4, 2, 1, 1, 12, 1600000000000L, true)
+ Row("p1", "a", "a@mail.com", 2017, 9, 11, 23, 2, 2, 1, 1, 1, 0, false,
+ new mutable.WrappedArray.ofRef(Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, false)))),
+ Row("p2", "b", "b@mail.com", 2017, 9, 11, 23, 2, 3, 1, 1, 428, 1500000000000L, true,
+ new mutable.WrappedArray.ofRef[Row](Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, true), Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 1500000000000L, true)))),
+ Row("p3", "c", "c@mail.com", null, null, null, null, 4, 2, 1, 1, 12, 1600000000000L, true,
+ new mutable.WrappedArray.ofRef[Row](Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, true), Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 1600000000000L, true))))
)
}
@@ -273,6 +277,27 @@
)
}
+ "extractCommitsPerProject" should "generate a Dataset with the all the SHA of commits with associated project" in {
+ import sql.implicits._
+
+ val committerInfo = sc.parallelize(Seq(
+ ("p1","""{"name":"a","email":"a@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":1, "num_files": 2, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":0, "is_merge": false, "commits":[{ "sha1": "sha_1", "date":0,"merge":false, "files": ["file1.txt", "file2.txt"]}] }"""),
+ ("p2","""{"name":"b","email":"b@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":428, "num_files": 2, "num_distinct_files": 3, "added_lines":1, "deleted_lines":1, "last_commit_date":1500000000000, "is_merge": true, "commits":[{"sha1":"sha_2", "date":0,"merge":true, "files": ["file3.txt", "file4.txt"] },{"sha1":"sha_3", "date":1500000000000,"merge":true, "files": ["file1.txt", "file4.txt"]}]}"""),
+ // last commit is missing hour,day,month,year to check optionality
+ ("p3","""{"name":"c","email":"c@mail.com","num_commits":12,"num_files": 4, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":1600000000000,"is_merge": true,"commits":[{"sha1":"sha_4", "date":0,"merge":true, "files": ["file1.txt", "file2.txt"] },{"sha1":"sha_5", "date":1600000000000,"merge":true, "files": ["file1.txt", "file2.txt"]}]}""")
+ )).toDF("project", "json")
+ .transformCommitterInfo
+
+ committerInfo.commitSet.collect() should contain only(
+ "sha_1",
+ "sha_2",
+ "sha_3",
+ "sha_4",
+ "sha_5"
+ )
+
+ }
+
private def newSource(contributorsJson: JObject*): String = {
val tmpFile = File.createTempFile(System.getProperty("java.io.tmpdir"),
s"${getClass.getName}-${System.nanoTime()}")
diff --git a/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala
new file mode 100644
index 0000000..bde0705
--- /dev/null
+++ b/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala
@@ -0,0 +1,259 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.engine.events
+
+import java.time.format.DateTimeFormatter
+import java.time.temporal.ChronoField.{MILLI_OF_SECOND, NANO_OF_SECOND}
+import java.time.{ZoneId, ZonedDateTime}
+
+import com.gerritforge.analytics.SparkTestSupport
+import com.gerritforge.analytics.engine.GerritAnalyticsTransformations.{CommitInfo, UserActivitySummary}
+import com.gerritforge.analytics.engine.events.GerritEventsTransformations.NotParsableJsonEvent
+import org.apache.spark.rdd.RDD
+import org.scalatest.{Inside, Matchers, WordSpec}
+
+class GerritEventsTransformationsSpec extends WordSpec with Matchers with SparkTestSupport with Inside with EventFixture {
+
+ "tryParseGerritTriggeredEvent" should {
+
+ implicit val eventParser: GerritJsonEventParser = EventParser
+
+ "Parse a correctly formed event" in new EventFixture {
+ GerritEventsTransformations.tryParseGerritTriggeredEvent(refUpdated.json) shouldBe Right(refUpdated.event)
+ }
+
+ "Return a description of the failure in with the original event source a Left object if the JSON provided is invalid" in {
+ val invalidJson = "invalid json string"
+ GerritEventsTransformations.tryParseGerritTriggeredEvent(invalidJson) shouldBe Left(NotParsableJsonEvent(invalidJson, "unknown token i - Near: i"))
+ }
+
+ "Return a description of the failure with the original event source in a Left object if the JSON event is not supported" in {
+ val unsupportedEvent = """{"type":"ref-updated-UNSUPPORTED","eventCreatedOn":1516531868}"""
+
+ GerritEventsTransformations.tryParseGerritTriggeredEvent(unsupportedEvent) shouldBe Left(NotParsableJsonEvent(unsupportedEvent, "Unsupported event type 'ref-updated-UNSUPPORTED'"))
+ }
+ }
+
+ "PimpedJsonRDD" should {
+ "Convert an RDD of JSON events into an RDD of events or unparsed json strings" in {
+ val jsonRdd = sc.parallelize(Seq(refUpdated.json, changeMerged.json, "invalid json string"))
+
+ import GerritEventsTransformations._
+
+ jsonRdd.parseEvents(EventParser).collect() should contain only(
+ Right(refUpdated.event),
+ Right(changeMerged.event),
+ Left(NotParsableJsonEvent("invalid json string", "unknown token i - Near: i"))
+ )
+ }
+ }
+
+ "extractUserActivitySummary" should {
+ implicit val eventParser: GerritJsonEventParser = EventParser
+
+ "Build one UserActivitySummary object if given a series of non-merge commits" in {
+ val events: Seq[ChangeMergedEvent] = Seq(
+ aChangeMergedEvent("1", 1001l, newRev = "rev1", insertions = 2, deletions = 1),
+ aChangeMergedEvent("2", 1002l, newRev = "rev2", insertions = 3, deletions = 0),
+ aChangeMergedEvent("3", 1003l, newRev = "rev3", insertions = 1, deletions = 4),
+ aChangeMergedEvent("4", 1004l, newRev = "rev4", insertions = 0, deletions = 2),
+ aChangeMergedEvent("5", 1005l, newRev = "rev5", insertions = 1, deletions = 1)
+ ).map(_.event)
+
+ val summaries: Iterable[UserActivitySummary] = GerritEventsTransformations.extractUserActivitySummary(
+ changes = events,
+ year = 2018, month = 1, day = 10, hour = 1
+ )
+
+ summaries should have size 1
+
+ inside(summaries.head) {
+ case UserActivitySummary(year, month, day, hour, name, email, num_commits, _, _, added_lines, deleted_lines, commits, last_commit_date, is_merge) =>
+ year shouldBe 2018
+ month shouldBe 1
+ day shouldBe 10
+ hour shouldBe 1
+ name shouldBe "Administrator"
+ email shouldBe "admin@example.com"
+ num_commits shouldBe events.size
+ last_commit_date shouldBe 1005000l
+ is_merge shouldBe false
+ added_lines shouldBe 7
+ deleted_lines shouldBe 8
+ commits should contain only(
+ CommitInfo("rev1", 1001000l, false),
+ CommitInfo("rev2", 1002000l, false),
+ CommitInfo("rev3", 1003000l, false),
+ CommitInfo("rev4", 1004000l, false),
+ CommitInfo("rev5", 1005000l, false)
+ )
+ }
+ }
+
+ "Build two UserActivitySummaries object if given a mixed series of merge and non merge commits" in {
+ val events: Seq[ChangeMergedEvent] = Seq(
+ aChangeMergedEvent("1", 1001l, newRev = "rev1", isMergeCommit = true),
+ aChangeMergedEvent("2", 1002l, newRev = "rev2"),
+ aChangeMergedEvent("3", 1003l, newRev = "rev3", isMergeCommit = true),
+ aChangeMergedEvent("4", 1004l, newRev = "rev4"),
+ aChangeMergedEvent("5", 1005l, newRev = "rev5")
+ ).map(_.event)
+
+ val summaries: Iterable[UserActivitySummary] = GerritEventsTransformations.extractUserActivitySummary(
+ changes = events,
+ year = 2018, month = 1, day = 10, hour = 1
+ )
+
+ summaries should have size 2
+
+ summaries.foreach { summary =>
+ inside(summary) {
+ case UserActivitySummary(year, month, day, hour, name, email,_, _, _, _, _, _, _, _) =>
+ year shouldBe 2018
+ month shouldBe 1
+ day shouldBe 10
+ hour shouldBe 1
+ name shouldBe "Administrator"
+ email shouldBe "admin@example.com"
+ }
+ }
+
+ summaries.foreach { summary =>
+ inside(summary) {
+ case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, last_commit_date, false) =>
+ num_commits shouldBe 3
+ last_commit_date shouldBe 1005000l
+ commits should contain only(
+ CommitInfo("rev2", 1002000l, false),
+ CommitInfo("rev4", 1004000l, false),
+ CommitInfo("rev5", 1005000l, false)
+ )
+
+ case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, last_commit_date, true) =>
+ num_commits shouldBe 2
+ last_commit_date shouldBe 1003000l
+ commits should contain only(
+ CommitInfo("rev1", 1001000l, true),
+ CommitInfo("rev3", 1003000l, true)
+ )
+ }
+ }
+
+ }
+ }
+
+ "Pimped Per Project UserActivitySummary RDD" should {
+ "Allow conversion to a DataFrame equivalent to what extracted from the Analytics plugin" in {
+ import GerritEventsTransformations._
+ import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
+ import spark.implicits._
+
+ val aliasDF = sc.parallelize(Seq(
+ ("stefano_alias", "stefano@galarraga-org.com", "")
+ )).toDF("author", "email", "organization")
+
+ val expectedDate : ZonedDateTime = ZonedDateTime.now(ZoneId.of("UTC")).`with`(MILLI_OF_SECOND, 0).`with`(NANO_OF_SECOND, 0)
+
+ val analyticsJobOutput =
+ sc.parallelize(Seq(
+ "project1" -> UserActivitySummary(2018, 1, 20, 10, "Stefano", "stefano@galarraga-org.com", 1, 2, 1, 10, 4, Array(CommitInfo("sha1", expectedDate.toInstant.toEpochMilli, false)),
+ expectedDate.toInstant.toEpochMilli, false)
+ ))
+ .asEtlDataFrame(sql)
+ .addOrganization()
+ .handleAliases(Some(aliasDF))
+ .convertDates("last_commit_date")
+ .dropCommits
+
+ val expected = sc.parallelize(Seq(
+ ("project1", "stefano_alias", "stefano@galarraga-org.com", 2018, 1, 20, 10, 2, 1, 10, 4, 1, expectedDate.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME), false, "galarraga-org")
+ )).toDF("project", "author", "email", "year", "month", "day", "hour", "num_files", "num_distinct_files",
+ "added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "organization")
+
+ analyticsJobOutput.collect() should contain theSameElementsAs expected.collect()
+ }
+ }
+
+ "removeEventsForCommits" should {
+ "remove any event modifying a gerrit repo to go toward the commits to be excluded" in {
+ import GerritEventsTransformations._
+
+ val toKeep1 = aRefUpdatedEvent("oldRev", "RevToKeep")
+ val toKeep2 = aChangeMergedEvent(changeId = "changeId2", newRev = "RevToKeep2")
+ val events: RDD[GerritRefHasNewRevisionEvent] = sc.parallelize(Seq(
+ aRefUpdatedEvent("oldRev", "RevToExclude1"),
+ toKeep1,
+ aRefUpdatedEvent("oldRev", "RevToExclude2"),
+ aChangeMergedEvent(changeId = "changeId1", newRev = "RevToExclude3"),
+ toKeep2
+ ).map(_.event))
+
+ events
+ .removeEventsForCommits(sc.parallelize(Seq("RevToExclude1", "RevToExclude2", "RevToExclude3")))
+ .collect() should contain only (toKeep1.event, toKeep2.event )
+ }
+ }
+}
+
+trait EventFixture {
+
+ // Forcing early type failures
+ case class JsonEvent[T <: GerritJsonEvent](json: String) {
+ val event: T = EventParser.fromJson(json).get.asInstanceOf[T]
+ }
+
+ val refUpdated: JsonEvent[RefUpdatedEvent] = aRefUpdatedEvent(oldRev = "863b64002f2a9922deba69407804a44703c996e0", newRev = "d3131be8d7c920badd28b70d8c039682568c8de5")
+
+ val changeMerged: JsonEvent[ChangeMergedEvent] = aChangeMergedEvent("I5e6b5a3bbe8a29fb0393e4a28da536e0a198b755")
+
+ def aRefUpdatedEvent(oldRev: String, newRev: String, createdOn: Long = 1000l) = JsonEvent[RefUpdatedEvent](
+ s"""{"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"},
+ | "refUpdate":{"oldRev":"$oldRev",
+ | "newRev":"$newRev",
+ | "refName": "refs/heads/master","project":"subcut"},
+ |"type":"ref-updated","eventCreatedOn":$createdOn}""".stripMargin)
+
+ def aChangeMergedEvent(changeId: String, createdOnInSecs: Long = 1000l, newRev: String = "863b64002f2a9922deba69407804a44703c996e0",
+ isMergeCommit: Boolean = false, insertions: Integer = 0, deletions: Integer = 0) = JsonEvent[ChangeMergedEvent](
+ s"""{
+ |"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"},
+ |"newRev":"$newRev",
+ |"patchSet":{
+ | "number":1,
+ | "revision":"$newRev",
+ | "parents": ${if (isMergeCommit) """["4a4e59272f1f88824d805c0f4233c1ee7331e986", "4a4e59272f1f88824d805c0f4233c1ee7331e987"]""" else """["4a4e59272f1f88824d805c0f4233c1ee7331e986"]"""},
+ | "ref":"refs/changes/01/1/1",
+ | "uploader":{"name":"Administrator","email":"admin@example.com","username":"admin"},
+ | "createdOn":1516530259,
+ | "author":{"name":"Stefano Galarraga","email":"galarragas@gmail.com","username":""},
+ | "isDraft":false,
+ | "kind":"REWORK",
+ | "sizeInsertions":$insertions,
+ | "sizeDeletions":$deletions
+ |},
+ |"change":{
+ | "project":"subcut","branch":"master","topic":"TestEvents","id":"$changeId","number":1,"subject":"Generating some changes to test events",
+ | "owner":{"name":"Administrator","email":"admin@example.com","username":"admin"},
+ | "url":"http://842860da5b33:8080/1","commitMessage":"Generating some changes to test events Change-Id: $changeId",
+ | "createdOn":1516530259,"status":"MERGED"
+ |},
+ |"project":{"name":"subcut"},
+ |"refName":"refs/heads/master",
+ |"changeKey":{"id":"$changeId"},
+ |"type":"change-merged",
+ |"eventCreatedOn": $createdOnInSecs
+ |}""".stripMargin)
+
+}
\ No newline at end of file