Reformat with scalafmt
Change-Id: I95b02e03348b26ebb1ef43a872fac13a904c479d
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/AdditionalUserInfo.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/AdditionalUserInfo.scala
index 4400a70..5399dba 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/AdditionalUserInfo.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/AdditionalUserInfo.scala
@@ -6,12 +6,15 @@
import scala.util.Try
-case class AdditionalUsersInfo(usersInfo: Map[GerritAccountId,AdditionalUserInfo]) {
- def getUserType(who: Int): String = usersInfo.get(who).map(_.`type`).getOrElse(AdditionalUserInfo.DEFAULT_USER_TYPE)
+case class AdditionalUsersInfo(usersInfo: Map[GerritAccountId, AdditionalUserInfo]) {
+ def getUserType(who: Int): String =
+ usersInfo.get(who).map(_.`type`).getOrElse(AdditionalUserInfo.DEFAULT_USER_TYPE)
}
object AdditionalUsersInfo {
- val empty: AdditionalUsersInfo = AdditionalUsersInfo(Map.empty[GerritAccountId,AdditionalUserInfo])
+ val empty: AdditionalUsersInfo = AdditionalUsersInfo(
+ Map.empty[GerritAccountId, AdditionalUserInfo]
+ )
}
case class AdditionalUserInfo(id: GerritAccountId, `type`: String)
@@ -19,16 +22,19 @@
object AdditionalUserInfo {
val DEFAULT_USER_TYPE = "human"
- def loadAdditionalUserInfo(config: AuditLogETLConfig)(implicit spark: SparkSession): Try[AdditionalUsersInfo] = {
+ def loadAdditionalUserInfo(
+ config: AuditLogETLConfig
+ )(implicit spark: SparkSession): Try[AdditionalUsersInfo] = {
val schema = new StructType()
- .add("id", IntegerType,false)
- .add("type", StringType,false)
+ .add("id", IntegerType, false)
+ .add("type", StringType, false)
import spark.implicits._
Try {
AdditionalUsersInfo(
- config.additionalUserInfoPath.map { path =>
+ config.additionalUserInfoPath
+ .map { path =>
spark.read
.option("header", "true")
.schema(schema)
@@ -38,7 +44,8 @@
.collect
.map(additionalUserInfo => additionalUserInfo.id -> additionalUserInfo)
.toMap
- }.getOrElse(Map.empty[GerritAccountId,AdditionalUserInfo])
+ }
+ .getOrElse(Map.empty[GerritAccountId, AdditionalUserInfo])
)
}
}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritProjects.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritProjects.scala
index 6f428e2..b901b17 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritProjects.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritProjects.scala
@@ -26,17 +26,25 @@
import scala.util.{Failure, Success, Try}
case class GerritProject(name: GerritProjectName)
-case class GerritProjects(private val projects: Map[GerritProjectName, GerritProject]) extends LazyLogging with RegexUtil {
+case class GerritProjects(private val projects: Map[GerritProjectName, GerritProject])
+ extends LazyLogging
+ with RegexUtil {
- private val PROJECT_SSH_WITH_SPACES = capture(r = """project:(.+?)\)?\s""")
- private val PROJECT_SSH_WITH_BRACKETS = capture(r = """project:\{\^?(.*)\}""")
- private val PROJECT_SSH_NO_SPACES = capture(r = """project(?::|.)(.*)""")
- private val PROJECT_SSH_PACK = capture(r = """(?:git-receive-pack|git-upload-pack)\.\/(.+)""")
- private val PROJECT_SSH_REPLICATION_START = capture(r = """replication\.start\.([\.\/a-zA-Z0-9]+(?:-[a-zA-Z0-9\.]+)*)(?:\.|\s|$)""")
+ private val PROJECT_SSH_WITH_SPACES = capture(r = """project:(.+?)\)?\s""")
+ private val PROJECT_SSH_WITH_BRACKETS = capture(r = """project:\{\^?(.*)\}""")
+ private val PROJECT_SSH_NO_SPACES = capture(r = """project(?::|.)(.*)""")
+ private val PROJECT_SSH_PACK = capture(r = """(?:git-receive-pack|git-upload-pack)\.\/(.+)""")
+ private val PROJECT_SSH_REPLICATION_START = capture(
+ r = """replication\.start\.([\.\/a-zA-Z0-9]+(?:-[a-zA-Z0-9\.]+)*)(?:\.|\s|$)"""
+ )
private val PROJECT_REST_API_CHANGES_SEGMENT = capture(r = """changes\/([^\/]+)~""")
private val PROJECT_REST_API_PROJECTS_SEGMENT = capture(r = """projects\/([^\/]+)""")
- private val PROJECT_HTTP_PACK_INFO_REF = capture(r = """https?:\/\/(.+)\/info\/refs\?service=(?:git-upload-pack|git-receive-pack)""")
- private val PROJECT_HTTP_PACK = capture(r = """https?:\/\/(.+)\/(?:git-upload-pack|git-receive-pack)""")
+ private val PROJECT_HTTP_PACK_INFO_REF = capture(
+ r = """https?:\/\/(.+)\/info\/refs\?service=(?:git-upload-pack|git-receive-pack)"""
+ )
+ private val PROJECT_HTTP_PACK = capture(
+ r = """https?:\/\/(.+)\/(?:git-upload-pack|git-receive-pack)"""
+ )
private val NO_PROJECT_RELATED_COMMANDS = capture(r = """(LOGIN|LOGOUT|AUTH)""")
@@ -46,17 +54,33 @@
// For example, the string `redhat-performance/quads.github.com.status:open.foo:bar` will match the project
// redhat-performance/quads.github.com, if that project exists in `gerritProject`
private def findProjectStringAtStart(rawProject: String, sep: Char = '.'): Option[String] =
- rawProject.split(sep).foldLeft(List.empty[String]) { case (acc, token) =>
- acc.headOption.map { t => (t + sep + token) +: acc }.getOrElse(List(token))
- }.find(existProject)
+ rawProject
+ .split(sep)
+ .foldLeft(List.empty[String]) {
+ case (acc, token) =>
+ acc.headOption
+ .map { t =>
+ (t + sep + token) +: acc
+ }
+ .getOrElse(List(token))
+ }
+ .find(existProject)
// Helper method to find a project at the end of a string.
// For example, the string `gerrit.foo.bar.baz.redhat-performance/quads.github.com` will match the project
// redhat-performance/quads.github.com, if that project exists in `gerritProject`
private def findProjectStringAtEnd(rawProject: String, sep: Char = '.'): Option[String] =
- rawProject.split(sep).foldRight(List.empty[String]) { case (token, acc) =>
- acc.headOption.map { t => (token + sep + t) +: acc }.getOrElse(List(token))
- }.find(existProject)
+ rawProject
+ .split(sep)
+ .foldRight(List.empty[String]) {
+ case (token, acc) =>
+ acc.headOption
+ .map { t =>
+ (token + sep + t) +: acc
+ }
+ .getOrElse(List(token))
+ }
+ .find(existProject)
def extractProject(what: String, accessPath: String): Option[String] = accessPath match {
case _ if matches(NO_PROJECT_RELATED_COMMANDS, what) =>
@@ -79,7 +103,9 @@
.orElse(extractGroup(PROJECT_HTTP_PACK, what))
.flatMap(findProjectStringAtEnd(_, '/'))
case unexpected =>
- logger.warn(s"Unexpected access path '$unexpected' encountered when extracting project from '$what'")
+ logger.warn(
+ s"Unexpected access path '$unexpected' encountered when extracting project from '$what'"
+ )
None
}
}
@@ -90,14 +116,21 @@
implicit private val formats = DefaultFormats
- def loadProjects(gerritConnectivity: GerritConnectivity, gerritUrl: String): Try[GerritProjects] = {
+ def loadProjects(
+ gerritConnectivity: GerritConnectivity,
+ gerritUrl: String
+ ): Try[GerritProjects] = {
val PAGE_SIZE = 500
logger.debug(s"Loading gerrit projects...")
- val baseUrl = s"""$gerritUrl/projects/?n=$PAGE_SIZE&query=state%3Aactive%20OR%20state%3Aread-only"""
+ val baseUrl =
+ s"""$gerritUrl/projects/?n=$PAGE_SIZE&query=state%3Aactive%20OR%20state%3Aread-only"""
@tailrec
- def loopThroughPages(more: Boolean, triedAcc: Try[GerritProjects] = Success(empty)): Try[GerritProjects] = {
+ def loopThroughPages(
+ more: Boolean,
+ triedAcc: Try[GerritProjects] = Success(empty)
+ ): Try[GerritProjects] = {
if (!more)
triedAcc
else {
@@ -109,16 +142,21 @@
logger.debug(s"Getting gerrit projects - start: ${acc.projects.size}")
val pageInfo = Try(parse(accountsJsonPage)).map { jsMapProjects =>
- val thisPageProjects = jsMapProjects.extract[List[GerritProject]].map(prj => prj.name -> prj)
- (thisPageProjects.size == PAGE_SIZE, acc.copy(projects = acc.projects ++ thisPageProjects))
+ val thisPageProjects =
+ jsMapProjects.extract[List[GerritProject]].map(prj => prj.name -> prj)
+ (
+ thisPageProjects.size == PAGE_SIZE,
+ acc.copy(projects = acc.projects ++ thisPageProjects)
+ )
}
pageInfo match {
- case Success((newMore, newGerritProjects)) => loopThroughPages(newMore, Success(newGerritProjects))
- case Failure(exception) => loopThroughPages(more=false, Failure(exception))
+ case Success((newMore, newGerritProjects)) =>
+ loopThroughPages(newMore, Success(newGerritProjects))
+ case Failure(exception) => loopThroughPages(more = false, Failure(exception))
}
}
}
- loopThroughPages(more=true)
+ loopThroughPages(more = true)
}
}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritUserIdentifiers.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritUserIdentifiers.scala
index 9f040d2..94aba7d 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritUserIdentifiers.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritUserIdentifiers.scala
@@ -24,14 +24,19 @@
import scala.util.{Failure, Success, Try}
import org.json4s.FieldSerializer._
-
case class GerritUserIdentifiers(private val accounts: Map[GerritAccountId, GerritUserIdentifier]) {
- def getIdentifier(accountId: GerritAccountId): GerritUserIdentifier = accounts.getOrElse(accountId, s"$accountId")
+ def getIdentifier(accountId: GerritAccountId): GerritUserIdentifier =
+ accounts.getOrElse(accountId, s"$accountId")
}
object GerritUserIdentifiers extends LazyLogging {
- private case class GerritAccount(accountId: GerritAccountId, username: Option[String], email: Option[String], name: Option[String]) {
+ private case class GerritAccount(
+ accountId: GerritAccountId,
+ username: Option[String],
+ email: Option[String],
+ name: Option[String]
+ ) {
val getIdentifier: GerritUserIdentifier =
name.getOrElse(
email.getOrElse(
@@ -43,46 +48,53 @@
val empty = GerritUserIdentifiers(Map.empty[GerritAccountId, GerritUserIdentifier])
private val gerritAccountSerializer = FieldSerializer[GerritAccount](
- deserializer=renameFrom(name="_account_id",newName="accountId")
+ deserializer = renameFrom(name = "_account_id", newName = "accountId")
)
implicit val formats: Formats = DefaultFormats + gerritAccountSerializer
- def loadAccounts(gerritConnectivity: GerritConnectivity, gerritUrl: String): Try[GerritUserIdentifiers] = {
+ def loadAccounts(
+ gerritConnectivity: GerritConnectivity,
+ gerritUrl: String
+ ): Try[GerritUserIdentifiers] = {
logger.debug(s"Loading gerrit accounts...")
val baseUrl = s"""$gerritUrl/accounts/?q=name:""&o=details"""
@tailrec
- def loopThroughPages(more: Boolean, triedAcc: Try[GerritUserIdentifiers] = Success(empty)): Try[GerritUserIdentifiers] = {
- if (!more)
- triedAcc
- else {
- val acc = triedAcc.get
+ def loopThroughPages(
+ more: Boolean,
+ triedAcc: Try[GerritUserIdentifiers] = Success(empty)
+ ): Try[GerritUserIdentifiers] = {
+ if (!more)
+ triedAcc
+ else {
+ val acc = triedAcc.get
- val url = baseUrl + s"&start=${ acc.accounts.size}"
- val accountsJsonPage = gerritConnectivity.getContentFromApi(url).dropGerritPrefix.mkString
+ val url = baseUrl + s"&start=${acc.accounts.size}"
+ val accountsJsonPage = gerritConnectivity.getContentFromApi(url).dropGerritPrefix.mkString
- logger.debug(s"Getting gerrit accounts - start: ${acc.accounts.size}")
+ logger.debug(s"Getting gerrit accounts - start: ${acc.accounts.size}")
- val pageInfo = Try(parse(accountsJsonPage)).map { jsListAccounts =>
- val more = (jsListAccounts \ "_more_accounts").extractOrElse(default = false)
+ val pageInfo = Try(parse(accountsJsonPage)).map { jsListAccounts =>
+ val more = (jsListAccounts \ "_more_accounts").extractOrElse(default = false)
- val thisPageAccounts = jsListAccounts
- .extract[List[GerritAccount]]
- .map( ga => ga.accountId -> ga.getIdentifier)
- .toMap
+ val thisPageAccounts = jsListAccounts
+ .extract[List[GerritAccount]]
+ .map(ga => ga.accountId -> ga.getIdentifier)
+ .toMap
- (more, acc.copy(accounts = acc.accounts ++ thisPageAccounts))
- }
-
- pageInfo match {
- case Success((newMore, newGerritAccounts)) => loopThroughPages(newMore, Success(newGerritAccounts))
- case Failure(exception) => loopThroughPages(more=false, Failure(exception))
- }
+ (more, acc.copy(accounts = acc.accounts ++ thisPageAccounts))
}
+
+ pageInfo match {
+ case Success((newMore, newGerritAccounts)) =>
+ loopThroughPages(newMore, Success(newGerritAccounts))
+ case Failure(exception) => loopThroughPages(more = false, Failure(exception))
+ }
+ }
}
- loopThroughPages(more=true)
+ loopThroughPages(more = true)
}
}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/package.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/package.scala
index e06aa27..374bbef 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/package.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/package.scala
@@ -15,7 +15,7 @@
package com.gerritforge.analytics.auditlog
package object broadcast {
- type GerritAccountId = Int
- type GerritProjectName = String
+ type GerritAccountId = Int
+ type GerritProjectName = String
type GerritUserIdentifier = String
}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
index 158435d..e3e267e 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
@@ -14,7 +14,11 @@
package com.gerritforge.analytics.auditlog.job
-import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUserInfo, GerritProjects, GerritUserIdentifiers}
+import com.gerritforge.analytics.auditlog.broadcast.{
+ AdditionalUserInfo,
+ GerritProjects,
+ GerritUserIdentifiers
+}
import com.gerritforge.analytics.auditlog.model.ElasticSearchFields._
import com.gerritforge.analytics.auditlog.model._
import com.gerritforge.analytics.auditlog.range.TimeRange
@@ -29,9 +33,12 @@
CommandLineArguments(args) match {
case Some(config) =>
-
val tryProjects = GerritProjects.loadProjects(
- new GerritConnectivity(config.gerritUsername, config.gerritPassword, config.ignoreSSLCert.getOrElse(false)),
+ new GerritConnectivity(
+ config.gerritUsername,
+ config.gerritPassword,
+ config.ignoreSSLCert.getOrElse(false)
+ ),
config.gerritUrl.get
)
@@ -41,7 +48,11 @@
}
val tryUserIdentifiers = GerritUserIdentifiers.loadAccounts(
- new GerritConnectivity(config.gerritUsername, config.gerritPassword, config.ignoreSSLCert.getOrElse(false)),
+ new GerritConnectivity(
+ config.gerritUsername,
+ config.gerritPassword,
+ config.ignoreSSLCert.getOrElse(false)
+ ),
config.gerritUrl.get
)
@@ -52,7 +63,10 @@
val triedAdditionalUserInfo = AdditionalUserInfo.loadAdditionalUserInfo(config)
if (triedAdditionalUserInfo.isFailure) {
- logger.error("Error loading additional user information", triedAdditionalUserInfo.failed.get)
+ logger.error(
+ "Error loading additional user information",
+ triedAdditionalUserInfo.failed.get
+ )
sys.exit(1)
}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala
index 8b6667c..bd1e7e1 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala
@@ -16,15 +16,15 @@
import java.sql.Timestamp
case class AggregatedAuditEvent(
- events_time_bucket: Timestamp,
- audit_type: String,
- user_identifier: Option[String],
- user_type: Option[String],
- access_path: Option[String],
- command: String,
- command_arguments: String,
- sub_command: Option[String],
- project: Option[String],
- result: String,
- num_events: Long
-)
\ No newline at end of file
+ events_time_bucket: Timestamp,
+ audit_type: String,
+ user_identifier: Option[String],
+ user_type: Option[String],
+ access_path: Option[String],
+ command: String,
+ command_arguments: String,
+ sub_command: Option[String],
+ project: Option[String],
+ result: String,
+ num_events: Long
+)
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditEvent.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditEvent.scala
index d8ab00d..bc2284c 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditEvent.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditEvent.scala
@@ -22,7 +22,6 @@
import scala.util.{Failure, Try}
-
sealed trait AuditEvent {
def auditType: String
def accessPath: Option[String]
@@ -45,14 +44,14 @@
}
final case class SshAuditEvent(
- accessPath: Option[String],
- sessionId: String,
- who: Option[Int],
- timeAtStart: Long,
- what: String,
- elapsed: Int,
- uuid: String,
- result: String
+ accessPath: Option[String],
+ sessionId: String,
+ who: Option[Int],
+ timeAtStart: Long,
+ what: String,
+ elapsed: Int,
+ uuid: String,
+ result: String
) extends AuditEvent {
override val auditType = SshAuditEvent.auditType
@@ -82,15 +81,15 @@
}
final case class HttpAuditEvent(
- accessPath: Option[String],
- httpMethod: String,
- result: String,
- sessionId: String,
- who: Option[Int],
- timeAtStart: Long,
- what: String,
- elapsed: Int,
- uuid: String
+ accessPath: Option[String],
+ httpMethod: String,
+ result: String,
+ sessionId: String,
+ who: Option[Int],
+ timeAtStart: Long,
+ what: String,
+ elapsed: Int,
+ uuid: String
) extends BaseHttpAuditEvent {
override val auditType = HttpAuditEvent.auditType
}
@@ -115,15 +114,15 @@
}
final case class ExtendedHttpAuditEvent(
- accessPath: Option[String],
- httpMethod: String,
- result: String,
- sessionId: String,
- who: Option[Int],
- timeAtStart: Long,
- what: String,
- elapsed: Int,
- uuid: String
+ accessPath: Option[String],
+ httpMethod: String,
+ result: String,
+ sessionId: String,
+ who: Option[Int],
+ timeAtStart: Long,
+ what: String,
+ elapsed: Int,
+ uuid: String
) extends BaseHttpAuditEvent {
override val auditType = ExtendedHttpAuditEvent.auditType
}
@@ -152,11 +151,14 @@
def parseRaw(json: String): Try[AuditEvent] = {
Try(parse(json)).flatMap { jsValueEvent =>
jsValueEvent \ "type" match {
- case JString(eventType) if eventType == "HttpAuditEvent" => HttpAuditEvent(jsValueEvent \ "event")
- case JString(eventType) if eventType == "ExtendedHttpAuditEvent" => ExtendedHttpAuditEvent(jsValueEvent \"event")
- case JString(eventType) if eventType == "SshAuditEvent" => SshAuditEvent(jsValueEvent \ "event")
+ case JString(eventType) if eventType == "HttpAuditEvent" =>
+ HttpAuditEvent(jsValueEvent \ "event")
+ case JString(eventType) if eventType == "ExtendedHttpAuditEvent" =>
+ ExtendedHttpAuditEvent(jsValueEvent \ "event")
+ case JString(eventType) if eventType == "SshAuditEvent" =>
+ SshAuditEvent(jsValueEvent \ "event")
case _ => Failure(new MappingException(s"Could not parse json into an audit event: $json"))
}
}
}
-}
\ No newline at end of file
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditLogETLConfig.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditLogETLConfig.scala
index b95d81a..20d2396 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditLogETLConfig.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditLogETLConfig.scala
@@ -17,14 +17,14 @@
import java.time.LocalDate
case class AuditLogETLConfig(
- gerritUrl: Option[String] = None,
- gerritUsername: Option[String] = None,
- gerritPassword: Option[String] = None,
- ignoreSSLCert: Option[Boolean] = None,
- elasticSearchIndex: Option[String] = None,
- since: Option[LocalDate] = None,
- until: Option[LocalDate] = None,
- eventsPath: Option[String] = None,
- eventsTimeAggregation: Option[String] = Some("hour"),
- additionalUserInfoPath: Option[String] = None
+ gerritUrl: Option[String] = None,
+ gerritUsername: Option[String] = None,
+ gerritPassword: Option[String] = None,
+ ignoreSSLCert: Option[Boolean] = None,
+ elasticSearchIndex: Option[String] = None,
+ since: Option[LocalDate] = None,
+ until: Option[LocalDate] = None,
+ eventsPath: Option[String] = None,
+ eventsTimeAggregation: Option[String] = Some("hour"),
+ additionalUserInfoPath: Option[String] = None
)
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/CommandLineArguments.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/CommandLineArguments.scala
index dca84ed..021061b 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/CommandLineArguments.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/CommandLineArguments.scala
@@ -53,19 +53,21 @@
opt[String]('e', "eventsTimeAggregation") optional () action { (input, c) =>
c.copy(eventsTimeAggregation = Some(input))
} text "Events of the same type, produced by the same user will be aggregated with this time granularity: " +
- "'second', 'minute', 'hour', 'week', 'month', 'quarter'. (Optional) - Default: 'hour'"
+ "'second', 'minute', 'hour', 'week', 'month', 'quarter'. (Optional) - Default: 'hour'"
opt[Boolean]('k', "ignoreSSLCert") optional () action { (input, c) =>
c.copy(ignoreSSLCert = Some(input))
} text "Ignore SSL certificate validation (Optional) - Default: false"
- opt[LocalDate]('s', "since") optional () action { (input, c) => c.copy(since = Some(input))
+ opt[LocalDate]('s', "since") optional () action { (input, c) =>
+ c.copy(since = Some(input))
} text "process only auditLogs occurred after (and including) this date, expressed as 'yyyy-MM-dd' (Optional)"
- opt[LocalDate]('u', "until") optional () action { (input, c) => c.copy(until = Some(input))
+ opt[LocalDate]('u', "until") optional () action { (input, c) =>
+ c.copy(until = Some(input))
} text "process only auditLogs occurred before (and including) this date, expressed as 'yyyy-MM-dd' (Optional)"
}
- parser.parse(args, AuditLogETLConfig())
+ parser.parse(args, AuditLogETLConfig())
}
-}
\ No newline at end of file
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/range/TimeRange.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/range/TimeRange.scala
index 83071f0..2ffedc1 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/range/TimeRange.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/range/TimeRange.scala
@@ -25,9 +25,10 @@
private val maybeSinceMs: Option[Long] = since.map(_.atStartOfDay().convertToUTCEpochMillis)
private val maybeUntilMs: Option[Long] = until.map(_.atStartOfDay().convertToUTCEpochMillis)
- def isWithin(timeMs: Long): Boolean = maybeSinceMs.forall(_ <= timeMs) && maybeUntilMs.forall(_ >= timeMs)
+ def isWithin(timeMs: Long): Boolean =
+ maybeSinceMs.forall(_ <= timeMs) && maybeUntilMs.forall(_ >= timeMs)
}
object TimeRange {
val always = TimeRange(None, None)
-}
\ No newline at end of file
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
index 14ca58b..04dbb90 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
@@ -14,7 +14,11 @@
package com.gerritforge.analytics.auditlog.spark
-import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUsersInfo, GerritProjects, GerritUserIdentifiers}
+import com.gerritforge.analytics.auditlog.broadcast.{
+ AdditionalUsersInfo,
+ GerritProjects,
+ GerritUserIdentifiers
+}
import com.gerritforge.analytics.auditlog.model.{AggregatedAuditEvent, AuditEvent}
import com.gerritforge.analytics.auditlog.model.ElasticSearchFields._
import com.gerritforge.analytics.auditlog.range.TimeRange
@@ -24,16 +28,20 @@
import org.apache.spark.sql.{Dataset, SparkSession}
case class AuditLogsTransformer(
- gerritIdentifiers: GerritUserIdentifiers = GerritUserIdentifiers.empty,
- additionalUsersInfo: AdditionalUsersInfo = AdditionalUsersInfo.empty,
- gerritProjects: GerritProjects = GerritProjects.empty
+ gerritIdentifiers: GerritUserIdentifiers = GerritUserIdentifiers.empty,
+ additionalUsersInfo: AdditionalUsersInfo = AdditionalUsersInfo.empty,
+ gerritProjects: GerritProjects = GerritProjects.empty
)(implicit spark: SparkSession) {
- private val broadcastUserIdentifiers = spark.sparkContext.broadcast(gerritIdentifiers)
+ private val broadcastUserIdentifiers = spark.sparkContext.broadcast(gerritIdentifiers)
private val broadcastAdditionalUsersInfo = spark.sparkContext.broadcast(additionalUsersInfo)
- private val broadcastGerritProjects = spark.sparkContext.broadcast(gerritProjects)
+ private val broadcastGerritProjects = spark.sparkContext.broadcast(gerritProjects)
- def transform(auditEventsRDD: RDD[AuditEvent], timeAggregation: String, timeRange: TimeRange = TimeRange.always): Dataset[AggregatedAuditEvent] = {
+ def transform(
+ auditEventsRDD: RDD[AuditEvent],
+ timeAggregation: String,
+ timeRange: TimeRange = TimeRange.always
+ ): Dataset[AggregatedAuditEvent] = {
import spark.implicits._
auditEventsRDD
.filterWithinRange(TimeRange(timeRange.since, timeRange.until))
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
index 9ae3b0c..9cdc721 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
@@ -14,47 +14,69 @@
package com.gerritforge.analytics.auditlog.spark.dataframe.ops
-import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUsersInfo, GerritProjects, GerritUserIdentifiers}
-import com.gerritforge.analytics.auditlog.spark.sql.udf.SparkExtractors.{extractCommandArgumentsUDF, extractCommandUDF, extractSubCommandUDF}
+import com.gerritforge.analytics.auditlog.broadcast.{
+ AdditionalUsersInfo,
+ GerritProjects,
+ GerritUserIdentifiers
+}
+import com.gerritforge.analytics.auditlog.spark.sql.udf.SparkExtractors.{
+ extractCommandArgumentsUDF,
+ extractCommandUDF,
+ extractSubCommandUDF
+}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{udf, _}
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.Try
-
object DataFrameOps {
implicit class PimpedAuditLogDataFrame(dataFrame: DataFrame) {
private def hasColumn(path: String) = Try(dataFrame(path)).isSuccess
- private def ifExistThenGetOrNull(column: String, targetColumn: => Column) = if (hasColumn(column)) targetColumn else lit(null)
+ private def ifExistThenGetOrNull(column: String, targetColumn: => Column) =
+ if (hasColumn(column)) targetColumn else lit(null)
- def hydrateWithUserIdentifierColumn(userIdentifierCol: String, gerritAccounts: GerritUserIdentifiers): DataFrame = {
- def extractIdentifier: UserDefinedFunction = udf((who: Int) => gerritAccounts.getIdentifier(who))
+ def hydrateWithUserIdentifierColumn(
+ userIdentifierCol: String,
+ gerritAccounts: GerritUserIdentifiers
+ ): DataFrame = {
+ def extractIdentifier: UserDefinedFunction =
+ udf((who: Int) => gerritAccounts.getIdentifier(who))
- dataFrame.withColumn(userIdentifierCol, ifExistThenGetOrNull("who", extractIdentifier(col("who"))))
+ dataFrame.withColumn(
+ userIdentifierCol,
+ ifExistThenGetOrNull("who", extractIdentifier(col("who")))
+ )
}
def withTimeBucketColumn(timeBucketCol: String, timeAggregation: String): DataFrame = {
dataFrame
- .withColumn(timeBucketCol, date_trunc(format=timeAggregation, from_unixtime(col("time_at_start").divide(1000))))
+ .withColumn(
+ timeBucketCol,
+ date_trunc(format = timeAggregation, from_unixtime(col("time_at_start").divide(1000)))
+ )
}
def withCommandColumns(commandCol: String, commandArgsCol: String): DataFrame = {
dataFrame
- .withColumn(commandCol,
+ .withColumn(
+ commandCol,
extractCommandUDF(
col("what"),
col("access_path"),
- ifExistThenGetOrNull("http_method", col("http_method"))))
+ ifExistThenGetOrNull("http_method", col("http_method"))
+ )
+ )
.withColumn(commandArgsCol, extractCommandArgumentsUDF(col("what"), col("access_path")))
}
def withSubCommandColumns(subCommandCol: String): DataFrame = {
- dataFrame.withColumn(subCommandCol,
+ dataFrame.withColumn(
+ subCommandCol,
extractSubCommandUDF(
col("what"),
col("access_path")
@@ -63,22 +85,30 @@
}
def withProjectColumn(projectCol: String, gerritProjects: GerritProjects): DataFrame = {
- def extractProjectUDF: UserDefinedFunction = udf((what: String, accessPath: String) => gerritProjects.extractProject(what, accessPath))
+ def extractProjectUDF: UserDefinedFunction =
+ udf((what: String, accessPath: String) => gerritProjects.extractProject(what, accessPath))
dataFrame
.withColumn(projectCol, extractProjectUDF(col("what"), col("access_path")))
}
- def withUserTypeColumn(commandCol: String, additionalUsersInfo: AdditionalUsersInfo): DataFrame = {
- def extractUserType: UserDefinedFunction = udf((who: Int) => additionalUsersInfo.getUserType(who))
+ def withUserTypeColumn(
+ commandCol: String,
+ additionalUsersInfo: AdditionalUsersInfo
+ ): DataFrame = {
+ def extractUserType: UserDefinedFunction =
+ udf((who: Int) => additionalUsersInfo.getUserType(who))
dataFrame.withColumn(commandCol, ifExistThenGetOrNull("who", extractUserType(col("who"))))
}
def aggregateNumEventsColumn(numEventsCol: String, cols: List[String]): DataFrame = {
- dataFrame.groupBy(cols.map(c => col(c)): _*)
- .agg(count("*")
- .alias(numEventsCol))
+ dataFrame
+ .groupBy(cols.map(c => col(c)): _*)
+ .agg(
+ count("*")
+ .alias(numEventsCol)
+ )
}
}
}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/rdd/ops/SparkRDDOps.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/rdd/ops/SparkRDDOps.scala
index 3887aca..77ac6e4 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/rdd/ops/SparkRDDOps.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/rdd/ops/SparkRDDOps.scala
@@ -14,7 +14,11 @@
package com.gerritforge.analytics.auditlog.spark.rdd.ops
-import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUsersInfo, GerritProjects, GerritUserIdentifiers}
+import com.gerritforge.analytics.auditlog.broadcast.{
+ AdditionalUsersInfo,
+ GerritProjects,
+ GerritUserIdentifiers
+}
import com.gerritforge.analytics.auditlog.model.{AggregatedAuditEvent, AuditEvent}
import com.gerritforge.analytics.auditlog.range.TimeRange
import com.gerritforge.analytics.auditlog.spark.AuditLogsTransformer
@@ -30,11 +34,11 @@
def toJsonString: RDD[String] = rdd.map(_.toJsonString)
def transformEvents(
- gerritUserIdentifiers: GerritUserIdentifiers,
- additionalUsersInfo: AdditionalUsersInfo,
- gerritProjects: GerritProjects,
- timeAggregation: String,
- timeRange: TimeRange
+ gerritUserIdentifiers: GerritUserIdentifiers,
+ additionalUsersInfo: AdditionalUsersInfo,
+ gerritProjects: GerritProjects,
+ timeAggregation: String,
+ timeRange: TimeRange
)(implicit spark: SparkSession): Dataset[AggregatedAuditEvent] = {
AuditLogsTransformer(gerritUserIdentifiers, additionalUsersInfo, gerritProjects)
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/session/ops/SparkSessionOps.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/session/ops/SparkSessionOps.scala
index 0d221ee..cbf133e 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/session/ops/SparkSessionOps.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/session/ops/SparkSessionOps.scala
@@ -22,8 +22,7 @@
implicit class PimpedSparkSession(spark: SparkSession) {
def getEventsFromPath(path: String): RDD[AuditEvent] =
- spark
- .read
+ spark.read
.textFile(path)
.rdd
.flatMap(auditString => AuditEvent.parseRaw(auditString).toOption)
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala
index 3cc9899..d197421 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala
@@ -38,41 +38,58 @@
val FAILED_SSH_AUTH = "FAILED_SSH_AUTH"
- def extractCommand(what: String, accessPath: String, httpMethod: String = null): String = accessPath match {
- case "SSH_COMMAND" => extractOrElse(GERRIT_SSH_COMMAND, what, what)
- case "GIT" => extractOrElse(GIT_COMMAND, what, what)
- case "REST_API"|"UNKNOWN" => Option(httpMethod).getOrElse(what)
- case "JSON_RPC" => what
- case null if what == "AUTH" => FAILED_SSH_AUTH
- case unexpected =>
- logger.warn(s"Unexpected access path '$unexpected' encountered when extracting command from '$what'")
- what
- }
+ def extractCommand(what: String, accessPath: String, httpMethod: String = null): String =
+ accessPath match {
+ case "SSH_COMMAND" => extractOrElse(GERRIT_SSH_COMMAND, what, what)
+ case "GIT" => extractOrElse(GIT_COMMAND, what, what)
+ case "REST_API" | "UNKNOWN" => Option(httpMethod).getOrElse(what)
+ case "JSON_RPC" => what
+ case null if what == "AUTH" => FAILED_SSH_AUTH
+ case unexpected =>
+ logger.warn(
+ s"Unexpected access path '$unexpected' encountered when extracting command from '$what'"
+ )
+ what
+ }
- def extractCommandUDF: UserDefinedFunction = udf((rawCommand: String, accessPath: String, httpMethod: String) => extractCommand(rawCommand, accessPath, httpMethod))
+ def extractCommandUDF: UserDefinedFunction =
+ udf(
+ (rawCommand: String, accessPath: String, httpMethod: String) =>
+ extractCommand(rawCommand, accessPath, httpMethod)
+ )
def extractCommandArguments(what: String, accessPath: String): Option[String] = accessPath match {
- case "SSH_COMMAND" => extractGroup(GERRIT_SSH_COMMAND_ARGUMENTS, what)
- case "GIT" => Option(extractGroup(GIT_SSH_COMMAND_ARGUMENTS, what).getOrElse(extractOrElse(GIT_HTTP_COMMAND_ARGUMENTS, what, null)))
- case "REST_API"|"UNKNOWN" => Some(what)
+ case "SSH_COMMAND" => extractGroup(GERRIT_SSH_COMMAND_ARGUMENTS, what)
+ case "GIT" =>
+ Option(
+ extractGroup(GIT_SSH_COMMAND_ARGUMENTS, what)
+ .getOrElse(extractOrElse(GIT_HTTP_COMMAND_ARGUMENTS, what, null))
+ )
+ case "REST_API" | "UNKNOWN" => Some(what)
case "JSON_RPC" => None
case null if what == "AUTH" => None
- case unexpected =>
- logger.warn(s"Unexpected access path '$unexpected' encountered when extracting command arguments from '$what'")
+ case unexpected =>
+ logger.warn(
+ s"Unexpected access path '$unexpected' encountered when extracting command arguments from '$what'"
+ )
None
}
- def extractCommandArgumentsUDF: UserDefinedFunction = udf((rawCommand: String, accessPath: String) => extractCommandArguments(rawCommand, accessPath))
+ def extractCommandArgumentsUDF: UserDefinedFunction =
+ udf((rawCommand: String, accessPath: String) => extractCommandArguments(rawCommand, accessPath))
def extractSubCommand(what: String, accessPath: String): Option[String] = accessPath match {
- case "REST_API"|"UNKNOWN" => Some(extractOrElse(REST_API_SUB_COMMAND, what, what))
+ case "REST_API" | "UNKNOWN" => Some(extractOrElse(REST_API_SUB_COMMAND, what, what))
case "SSH_COMMAND" => Some(extractOrElse(SSH_SUB_COMMAND, what, what))
case "GIT" => None
case "JSON_RPC" => None
- case unexpected =>
- logger.warn(s"Unexpected access path '$unexpected' encountered when extracting sub-command from '$what'")
+ case unexpected =>
+ logger.warn(
+ s"Unexpected access path '$unexpected' encountered when extracting sub-command from '$what'"
+ )
None
}
- def extractSubCommandUDF: UserDefinedFunction = udf((rawCommand: String, accessPath: String) => extractSubCommand(rawCommand, accessPath))
-}
\ No newline at end of file
+ def extractSubCommandUDF: UserDefinedFunction =
+ udf((rawCommand: String, accessPath: String) => extractSubCommand(rawCommand, accessPath))
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/util/RegexUtil.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/util/RegexUtil.scala
index f2cbce3..4dbb2c5 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/util/RegexUtil.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/util/RegexUtil.scala
@@ -20,8 +20,10 @@
def matches(regex: Regex, string: String): Boolean = regex.findFirstIn(string).isDefined
- def extractOrElse(rx: Regex, target: String, default: String): String = extractGroup(rx, target).getOrElse(default)
+ def extractOrElse(rx: Regex, target: String, default: String): String =
+ extractGroup(rx, target).getOrElse(default)
- def extractGroup(rx: Regex, target: String): Option[String] = rx.findAllMatchIn(target).toList.headOption.map(_.group("capture"))
+ def extractGroup(rx: Regex, target: String): Option[String] =
+ rx.findAllMatchIn(target).toList.headOption.map(_.group("capture"))
}
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
index 0ee35e6..86e3f48 100644
--- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
@@ -17,18 +17,28 @@
import com.gerritforge.analytics.SparkTestSupport
import com.gerritforge.analytics.auditlog.broadcast._
-import com.gerritforge.analytics.auditlog.model.{AggregatedAuditEvent, ElasticSearchFields, HttpAuditEvent, SshAuditEvent}
+import com.gerritforge.analytics.auditlog.model.{
+ AggregatedAuditEvent,
+ ElasticSearchFields,
+ HttpAuditEvent,
+ SshAuditEvent
+}
import com.gerritforge.analytics.auditlog.spark.AuditLogsTransformer
import com.gerritforge.analytics.support.ops.CommonTimeOperations._
import org.scalatest.{FlatSpec, Matchers}
-class AuditLogsTransformerSpec extends FlatSpec with Matchers with SparkTestSupport with TestFixtures {
+class AuditLogsTransformerSpec
+ extends FlatSpec
+ with Matchers
+ with SparkTestSupport
+ with TestFixtures {
behavior of "AuditLogsTransformer"
it should "process an anonymous http audit entry" in {
val events = Seq(anonymousHttpAuditEvent)
- val aggregatedEventsDS = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
+ val aggregatedEventsDS =
+ AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation = "hour")
aggregatedEventsDS.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
@@ -50,7 +60,8 @@
it should "process an authenticated http audit entry where gerrit account couldn't be identified" in {
val events = Seq(authenticatedHttpAuditEvent)
- val aggregatedEventsDS = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
+ val aggregatedEventsDS =
+ AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation = "hour")
aggregatedEventsDS.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
@@ -70,12 +81,13 @@
}
it should "process an authenticated http audit entry where gerrit account could be identified" in {
- val events = Seq(authenticatedHttpAuditEvent)
+ val events = Seq(authenticatedHttpAuditEvent)
val gerritUserIdentifier = "Antonio Barone"
val aggregatedEventsDS =
- AuditLogsTransformer(GerritUserIdentifiers(Map(authenticatedHttpAuditEvent.who.get -> gerritUserIdentifier)))
- .transform(sc.parallelize(events), timeAggregation="hour")
+ AuditLogsTransformer(
+ GerritUserIdentifiers(Map(authenticatedHttpAuditEvent.who.get -> gerritUserIdentifier))
+ ).transform(sc.parallelize(events), timeAggregation = "hour")
aggregatedEventsDS.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
@@ -97,7 +109,8 @@
it should "process an SSH audit entry" in {
val events = Seq(sshAuditEvent)
- val aggregatedEventsDS = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
+ val aggregatedEventsDS =
+ AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation = "hour")
aggregatedEventsDS.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
@@ -119,7 +132,8 @@
it should "group ssh events from the same user together, if they fall within the same time bucket (hour)" in {
val events = Seq(sshAuditEvent, sshAuditEvent.copy(timeAtStart = timeAtStart + 1000))
- val aggregatedEventsDS = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
+ val aggregatedEventsDS =
+ AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation = "hour")
aggregatedEventsDS.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
@@ -140,9 +154,10 @@
it should "group ssh events from different users separately, even if they fall within the same time bucket (hour)" in {
val user2Id = sshAuditEvent.who.map(_ + 1)
- val events = Seq(sshAuditEvent, sshAuditEvent.copy(who=user2Id))
+ val events = Seq(sshAuditEvent, sshAuditEvent.copy(who = user2Id))
- val aggregatedEventsDS = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
+ val aggregatedEventsDS =
+ AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation = "hour")
aggregatedEventsDS.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
@@ -177,9 +192,13 @@
}
it should "group different event types separately, event if they fall within the same time bucket (hour)" in {
- val events = Seq(sshAuditEvent, authenticatedHttpAuditEvent.copy(timeAtStart = sshAuditEvent.timeAtStart + 1000))
+ val events = Seq(
+ sshAuditEvent,
+ authenticatedHttpAuditEvent.copy(timeAtStart = sshAuditEvent.timeAtStart + 1000)
+ )
- val aggregatedEventsDS = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
+ val aggregatedEventsDS =
+ AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation = "hour")
aggregatedEventsDS.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
@@ -216,31 +235,36 @@
it should "process user type" in {
val events = Seq(authenticatedHttpAuditEvent)
- val userType = "nonDefaultUserType"
+ val userType = "nonDefaultUserType"
val additionalUserInfo = AdditionalUserInfo(authenticatedHttpAuditEvent.who.get, userType)
- val aggregatedEventsDS = AuditLogsTransformer(additionalUsersInfo = AdditionalUsersInfo(Map(authenticatedHttpAuditEvent.who.get -> additionalUserInfo))).transform(
- auditEventsRDD = sc.parallelize(events),
- timeAggregation = "hour"
+ val aggregatedEventsDS = AuditLogsTransformer(
+ additionalUsersInfo =
+ AdditionalUsersInfo(Map(authenticatedHttpAuditEvent.who.get -> additionalUserInfo))
+ ).transform(
+ auditEventsRDD = sc.parallelize(events),
+ timeAggregation = "hour"
)
aggregatedEventsDS.collect.length shouldBe 1
aggregatedEventsDS.collect.head.user_type should contain(userType)
}
it should "process user type when gerrit account could be identified" in {
- val events = Seq(authenticatedHttpAuditEvent)
+ val events = Seq(authenticatedHttpAuditEvent)
val gerritUserIdentifier = "Antonio Barone"
- val userType = "nonDefaultUserType"
+ val userType = "nonDefaultUserType"
val additionalUserInfo = AdditionalUserInfo(authenticatedHttpAuditEvent.who.get, userType)
val aggregatedEventsDS =
AuditLogsTransformer(
- gerritIdentifiers = GerritUserIdentifiers(Map(authenticatedHttpAuditEvent.who.get -> gerritUserIdentifier)),
- additionalUsersInfo = AdditionalUsersInfo(Map(authenticatedHttpAuditEvent.who.get -> additionalUserInfo))
+ gerritIdentifiers =
+ GerritUserIdentifiers(Map(authenticatedHttpAuditEvent.who.get -> gerritUserIdentifier)),
+ additionalUsersInfo =
+ AdditionalUsersInfo(Map(authenticatedHttpAuditEvent.who.get -> additionalUserInfo))
).transform(
- auditEventsRDD = sc.parallelize(events),
- timeAggregation = "hour"
+ auditEventsRDD = sc.parallelize(events),
+ timeAggregation = "hour"
)
aggregatedEventsDS.collect.length shouldBe 1
@@ -250,8 +274,9 @@
it should "extract gerrit project from an http event" in {
val events = Seq(authenticatedHttpAuditEvent)
- val aggregatedEventsDS = AuditLogsTransformer(gerritProjects = GerritProjects(Map(project -> GerritProject(project))))
- .transform(sc.parallelize(events), timeAggregation="hour")
+ val aggregatedEventsDS =
+ AuditLogsTransformer(gerritProjects = GerritProjects(Map(project -> GerritProject(project))))
+ .transform(sc.parallelize(events), timeAggregation = "hour")
aggregatedEventsDS.collect.length shouldBe 1
aggregatedEventsDS.collect.head.project should contain(project)
@@ -260,8 +285,9 @@
it should "extract gerrit project from an ssh event" in {
val events = Seq(sshAuditEvent)
- val aggregatedEventsDS = AuditLogsTransformer(gerritProjects = GerritProjects(Map(project -> GerritProject(project))))
- .transform(sc.parallelize(events), timeAggregation="hour")
+ val aggregatedEventsDS =
+ AuditLogsTransformer(gerritProjects = GerritProjects(Map(project -> GerritProject(project))))
+ .transform(sc.parallelize(events), timeAggregation = "hour")
aggregatedEventsDS.collect.length shouldBe 1
aggregatedEventsDS.collect.head.project should contain(project)
@@ -270,7 +296,8 @@
it should "extract sub-command" in {
val events = Seq(sshAuditEvent.copy(what = "aCommand.aSubCommand"))
- val aggregatedEventsDS = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
+ val aggregatedEventsDS =
+ AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation = "hour")
aggregatedEventsDS.collect.length shouldBe 1
aggregatedEventsDS.collect.head.sub_command should contain("aSubCommand")
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/TestFixtures.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/TestFixtures.scala
index a69ac29..0a19f72 100644
--- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/TestFixtures.scala
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/TestFixtures.scala
@@ -17,30 +17,49 @@
trait TestFixtures {
- val userId = 123
- val sessionId = "someSessionId"
+ val userId = 123
+ val sessionId = "someSessionId"
val gitAccessPath = "GIT"
- val timeAtStart = 1544802407000L
- val elapsed = 12
- val uuid = "audit:5f10fea5-35d1-4252-b86f-99db7a9b549b"
- val project = "Mirantis/tcp-qa"
+ val timeAtStart = 1544802407000L
+ val elapsed = 12
+ val uuid = "audit:5f10fea5-35d1-4252-b86f-99db7a9b549b"
+ val project = "Mirantis/tcp-qa"
val GIT_UPLOAD_PACK = "git-upload-pack"
val httpMethod = "GET"
val httpStatus = "200"
- val httpWhat=s"https://review.gerrithub.io/$project/$GIT_UPLOAD_PACK"
+ val httpWhat = s"https://review.gerrithub.io/$project/$GIT_UPLOAD_PACK"
- val anonymousHttpAuditEvent = HttpAuditEvent(Some(gitAccessPath), httpMethod, httpStatus, sessionId, None, timeAtStart, httpWhat, elapsed, uuid)
- val authenticatedHttpAuditEvent: HttpAuditEvent = anonymousHttpAuditEvent.copy(who=Some(userId))
+ val anonymousHttpAuditEvent = HttpAuditEvent(
+ Some(gitAccessPath),
+ httpMethod,
+ httpStatus,
+ sessionId,
+ None,
+ timeAtStart,
+ httpWhat,
+ elapsed,
+ uuid
+ )
+ val authenticatedHttpAuditEvent: HttpAuditEvent = anonymousHttpAuditEvent.copy(who = Some(userId))
- val sshAccessPath = "SSH_COMMAND"
- val sshResult = "0"
- val SSH_GERRIT_COMMAND = "gerrit"
+ val sshAccessPath = "SSH_COMMAND"
+ val sshResult = "0"
+ val SSH_GERRIT_COMMAND = "gerrit"
val SSH_GERRIT_COMMAND_ARGUMENTS = s"query.--format.json.--current-patch-set.project:$project"
- val sshWhat = s"$SSH_GERRIT_COMMAND.$SSH_GERRIT_COMMAND_ARGUMENTS"
+ val sshWhat = s"$SSH_GERRIT_COMMAND.$SSH_GERRIT_COMMAND_ARGUMENTS"
- val sshAuditEvent = SshAuditEvent(Some(sshAccessPath), sessionId, Some(userId), timeAtStart, sshWhat, elapsed, uuid, sshResult)
+ val sshAuditEvent = SshAuditEvent(
+ Some(sshAccessPath),
+ sessionId,
+ Some(userId),
+ timeAtStart,
+ sshWhat,
+ elapsed,
+ uuid,
+ sshResult
+ )
}
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/broadcast/GerritProjectsSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/broadcast/GerritProjectsSpec.scala
index aed1980..c6f13b7 100644
--- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/broadcast/GerritProjectsSpec.scala
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/broadcast/GerritProjectsSpec.scala
@@ -21,22 +21,22 @@
behavior of "extract project from SSH commands"
it should "extract nothing where project could not be found in existing projects" in {
- val project = "spdk/spdk.github.io"
- val what = s"gerrit.query.--format.json.--current-patch-set.project:$project"
+ val project = "spdk/spdk.github.io"
+ val what = s"gerrit.query.--format.json.--current-patch-set.project:$project"
val accessPath = "SSH_COMMAND"
GerritProjects.empty.extractProject(what, accessPath) shouldBe empty
}
it should "extract nothing where command does not contain project" in {
- val what = s"gerrit.command.not.targeting.any.project"
+ val what = s"gerrit.command.not.targeting.any.project"
val accessPath = "SSH_COMMAND"
GerritProjects.empty.extractProject(what, accessPath) shouldBe empty
}
it should "extract nothing where command is LOGIN" in {
- val what = s"LOGIN"
+ val what = s"LOGIN"
val accessPath = "SSH_COMMAND"
GerritProjects.empty.extractProject(what, accessPath) shouldBe empty
@@ -44,48 +44,58 @@
it should "extract a project from a gerrit query command, where the project is delimited by a space" in {
val project = "spdk/spdk.github.io"
- val what = s"gerrit.query.--format.json.--current-patch-set.project:$project commit:7f4e8237114e957e727707ab6549d482d40d7c92 NOT is:draft"
+ val what =
+ s"gerrit.query.--format.json.--current-patch-set.project:$project commit:7f4e8237114e957e727707ab6549d482d40d7c92 NOT is:draft"
val accessPath = "SSH_COMMAND"
- val existingProjects = GerritProjects(Map(
- project -> GerritProject(project)
- ))
+ val existingProjects = GerritProjects(
+ Map(
+ project -> GerritProject(project)
+ )
+ )
existingProjects.extractProject(what, accessPath) should contain(project)
}
it should "extract a project from a gerrit query command, where the project is wrapped by curly brackets" in {
val project = "dalihub/dali-scene-template"
- val what = s"gerrit.query.project:{^$project}.--format=JSON.--all-approvals.--comments.--all-reviewers"
+ val what =
+ s"gerrit.query.project:{^$project}.--format=JSON.--all-approvals.--comments.--all-reviewers"
val accessPath = "SSH_COMMAND"
- val existingProjects = GerritProjects(Map(
- project -> GerritProject(project)
- ))
+ val existingProjects = GerritProjects(
+ Map(
+ project -> GerritProject(project)
+ )
+ )
existingProjects.extractProject(what, accessPath) should contain(project)
}
it should "extract a project from a gerrit query command, where the project is dot '.' joined with its arguments" in {
- val project = "redhat-performance/quads"
- val what = s"gerrit.query.--format=JSON.project:$project.status:open"
+ val project = "redhat-performance/quads"
+ val what = s"gerrit.query.--format=JSON.project:$project.status:open"
val accessPath = "SSH_COMMAND"
- val existingProjects = GerritProjects(Map(
- project -> GerritProject(project)
- ))
+ val existingProjects = GerritProjects(
+ Map(
+ project -> GerritProject(project)
+ )
+ )
existingProjects.extractProject(what, accessPath) should contain(project)
}
it should "extract a project from a gerrit index command" in {
- val project = "Accumbo/iOS"
- val what = s"gerrit.index.project.$project"
+ val project = "Accumbo/iOS"
+ val what = s"gerrit.index.project.$project"
val accessPath = "SSH_COMMAND"
- val existingProjects = GerritProjects(Map(
- project -> GerritProject(project)
- ))
+ val existingProjects = GerritProjects(
+ Map(
+ project -> GerritProject(project)
+ )
+ )
existingProjects.extractProject(what, accessPath) should contain(project)
}
@@ -94,7 +104,8 @@
val specificProject = s"spdk/spdk.gerrithub.io"
- val what = s"""gerrit.query.--format.json.--current-patch-set.(project:$specificProject) commit:"d2da91b1878f7b7ccd3f30f766ffa2f35cc2011d" NOT is:draft"""
+ val what =
+ s"""gerrit.query.--format.json.--current-patch-set.(project:$specificProject) commit:"d2da91b1878f7b7ccd3f30f766ffa2f35cc2011d" NOT is:draft"""
val accessPath = "SSH_COMMAND"
GerritProjects.empty.extractProject(what, accessPath) should contain(specificProject)
@@ -105,64 +116,68 @@
val specificProject = s"$genericProject.github.com"
- val what = s"gerrit.query.--format=JSON.project:$specificProject.status:open"
+ val what = s"gerrit.query.--format=JSON.project:$specificProject.status:open"
val accessPath = "SSH_COMMAND"
- val existingProjects = GerritProjects(Map(
- genericProject -> GerritProject(genericProject),
- specificProject -> GerritProject(specificProject)
- ))
+ val existingProjects = GerritProjects(
+ Map(
+ genericProject -> GerritProject(genericProject),
+ specificProject -> GerritProject(specificProject)
+ )
+ )
existingProjects.extractProject(what, accessPath) should contain(specificProject)
}
it should "extract a project from a receive pack command" in {
- val project = "att-comdev/prometheus-openstack-exporter"
- val what = s"git-receive-pack./$project"
+ val project = "att-comdev/prometheus-openstack-exporter"
+ val what = s"git-receive-pack./$project"
val accessPath = "SSH_COMMAND"
GerritProjects.empty.extractProject(what, accessPath) should contain(project)
}
it should "extract a project from an upload pack command" in {
- val project = "dalihub/dali-toolkit"
- val what = s"git-upload-pack./$project"
+ val project = "dalihub/dali-toolkit"
+ val what = s"git-upload-pack./$project"
val accessPath = "SSH_COMMAND"
GerritProjects.empty.extractProject(what, accessPath) should contain(project)
}
it should "extract a project from a replication start command that has some arguments" in {
- val project = "singh4java/springboot"
- val what = s"replication.start.$project.--wait.--now"
+ val project = "singh4java/springboot"
+ val what = s"replication.start.$project.--wait.--now"
val accessPath = "SSH_COMMAND"
GerritProjects.empty.extractProject(what, accessPath) should contain(project)
}
it should "extract a project from a replication start command that has no arguments" in {
- val project = "singh4java/springboot.github.com"
- val what = s"replication.start.$project"
+ val project = "singh4java/springboot.github.com"
+ val what = s"replication.start.$project"
val accessPath = "SSH_COMMAND"
GerritProjects.empty.extractProject(what, accessPath) should contain(project)
}
it should "extract nothing when replication start refers to no project" in {
- val what = s"replication.start.--help"
+ val what = s"replication.start.--help"
val accessPath = "SSH_COMMAND"
GerritProjects.empty.extractProject(what, accessPath) shouldBe empty
}
it should "extract a project when is found in at the end of the command, even though the word 'project' is not part of the command" in {
- val project = "Sonos-Inc/old97s"
- val what = s"gerrit.query.--format=JSON.--current-patch-set.$project"
+ val project = "Sonos-Inc/old97s"
+ val what = s"gerrit.query.--format=JSON.--current-patch-set.$project"
val accessPath = "SSH_COMMAND"
- val existingProjects = GerritProjects(Map(
- project -> GerritProject(project)
- ))
+ val existingProjects = GerritProjects(
+ Map(
+ project -> GerritProject(project)
+ )
+ )
existingProjects.extractProject(what, accessPath) should contain(project)
}
@@ -170,16 +185,16 @@
behavior of "REST API calls"
it should "extract the decoded project name from a /changes/ url" in {
- val project = "true-wealth/b2b"
- val what = s"/changes/${URLEncoder.encode(project, "UTF-8")}~425967/revisions/5/actions"
+ val project = "true-wealth/b2b"
+ val what = s"/changes/${URLEncoder.encode(project, "UTF-8")}~425967/revisions/5/actions"
val accessPath = "REST_API"
GerritProjects.empty.extractProject(what, accessPath) should contain(project)
}
it should "extract the decoded project name from a /projects/ url" in {
- val project = "VidScale/UDNTests"
- val what = s"/projects/${URLEncoder.encode(project, "UTF-8")}"
+ val project = "VidScale/UDNTests"
+ val what = s"/projects/${URLEncoder.encode(project, "UTF-8")}"
val accessPath = "REST_API"
GerritProjects.empty.extractProject(what, accessPath) should contain(project)
@@ -188,61 +203,71 @@
behavior of "GIT HTTP calls"
it should "extract the project from an info ref upload pack target" in {
- val project = "dushyantRathore/Jenkins_Test"
- val what = s"https://review.gerrithub.io/$project/info/refs?service=git-upload-pack"
+ val project = "dushyantRathore/Jenkins_Test"
+ val what = s"https://review.gerrithub.io/$project/info/refs?service=git-upload-pack"
val accessPath = "GIT"
- val existingProjects = GerritProjects(Map(
- project -> GerritProject(project)
- ))
+ val existingProjects = GerritProjects(
+ Map(
+ project -> GerritProject(project)
+ )
+ )
existingProjects.extractProject(what, accessPath) should contain(project)
}
it should "extract a project from an SSH upload pack command" in {
- val project = "cbdr/Gjallarhorn"
- val what = s"git-upload-pack./$project"
+ val project = "cbdr/Gjallarhorn"
+ val what = s"git-upload-pack./$project"
val accessPath = "GIT"
- val existingProjects = GerritProjects(Map(
- project -> GerritProject(project)
- ))
+ val existingProjects = GerritProjects(
+ Map(
+ project -> GerritProject(project)
+ )
+ )
existingProjects.extractProject(what, accessPath) should contain(project)
}
it should "extract the project from an info ref receive pack target" in {
- val project = "dushyantRathore/Jenkins_Test"
- val what = s"https://review.gerrithub.io/$project/info/refs?service=git-receive-pack"
+ val project = "dushyantRathore/Jenkins_Test"
+ val what = s"https://review.gerrithub.io/$project/info/refs?service=git-receive-pack"
val accessPath = "GIT"
- val existingProjects = GerritProjects(Map(
- project -> GerritProject(project)
- ))
+ val existingProjects = GerritProjects(
+ Map(
+ project -> GerritProject(project)
+ )
+ )
existingProjects.extractProject(what, accessPath) should contain(project)
}
it should "extract the project from an upload pack target" in {
- val project = "att-comdev/cicd"
- val what = s"https://review.gerrithub.io/$project/git-upload-pack"
+ val project = "att-comdev/cicd"
+ val what = s"https://review.gerrithub.io/$project/git-upload-pack"
val accessPath = "GIT"
- val existingProjects = GerritProjects(Map(
- project -> GerritProject(project)
- ))
+ val existingProjects = GerritProjects(
+ Map(
+ project -> GerritProject(project)
+ )
+ )
existingProjects.extractProject(what, accessPath) should contain(project)
}
it should "extract the project from a receive pack target" in {
- val project = "att-comdev/cicd"
- val what = s"https://review.gerrithub.io/$project/git-receive-pack"
+ val project = "att-comdev/cicd"
+ val what = s"https://review.gerrithub.io/$project/git-receive-pack"
val accessPath = "GIT"
- val existingProjects = GerritProjects(Map(
- project -> GerritProject(project)
- ))
+ val existingProjects = GerritProjects(
+ Map(
+ project -> GerritProject(project)
+ )
+ )
existingProjects.extractProject(what, accessPath) should contain(project)
}
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/model/AuditEventSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/model/AuditEventSpec.scala
index 1b4bf80..c0c935f 100644
--- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/model/AuditEventSpec.scala
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/model/AuditEventSpec.scala
@@ -39,15 +39,15 @@
"A json representing an http audit event log" should "be parsed into a success of HttpAuditEvent" in {
- val httpMethod = "POST"
- val httpStatus = "200"
- val sessionId = "1r7ywi4vd3jk410dv60pvd19vk"
- val accountId = 1009124
- val accessPath = "GIT"
+ val httpMethod = "POST"
+ val httpStatus = "200"
+ val sessionId = "1r7ywi4vd3jk410dv60pvd19vk"
+ val accountId = 1009124
+ val accessPath = "GIT"
val timeAtStart = 1542240322369L
- val what = "https://review.gerrithub.io/Mirantis/tcp-qa/git-upload-pack"
- val elapsed = 12
- val auditUUID = "audit:fe4cff68-d094-474a-9d97-502270b0b2e6"
+ val what = "https://review.gerrithub.io/Mirantis/tcp-qa/git-upload-pack"
+ val elapsed = 12
+ val auditUUID = "audit:fe4cff68-d094-474a-9d97-502270b0b2e6"
val jsonEvent =
s"""
|{
@@ -83,18 +83,27 @@
""".stripMargin
val triedEvent = AuditEvent.parseRaw(jsonEvent)
- inside (triedEvent.success.value) {
- case HttpAuditEvent(gotAccessPath,gotHttpMethod,gotHttpStatus,gotSessionId,gotWho,gotTimeAtStart,gotWhat,gotElapsed,gotUUID
- ) =>
- gotSessionId shouldBe sessionId
- gotWho should contain(accountId)
+ inside(triedEvent.success.value) {
+ case HttpAuditEvent(
+ gotAccessPath,
+ gotHttpMethod,
+ gotHttpStatus,
+ gotSessionId,
+ gotWho,
+ gotTimeAtStart,
+ gotWhat,
+ gotElapsed,
+ gotUUID
+ ) =>
+ gotSessionId shouldBe sessionId
+ gotWho should contain(accountId)
gotTimeAtStart shouldBe timeAtStart
- gotHttpMethod shouldBe httpMethod
- gotHttpStatus shouldBe httpStatus
- gotWhat shouldBe what
- gotElapsed shouldBe elapsed
- gotUUID shouldBe auditUUID
- gotAccessPath should contain(accessPath)
+ gotHttpMethod shouldBe httpMethod
+ gotHttpStatus shouldBe httpStatus
+ gotWhat shouldBe what
+ gotElapsed shouldBe elapsed
+ gotUUID shouldBe auditUUID
+ gotAccessPath should contain(accessPath)
}
}
@@ -107,7 +116,7 @@
val what = "gerrit.stream-events.-s.patchset-created.-s.change-restored.-s.comment-added"
val elapsed = 12
val auditUUID = "audit:dd74e098-9260-4720-9143-38a0a0a5e500"
- val sshResult = "0"
+ val sshResult = "0"
val jsonEvent =
s"""
|{
@@ -134,26 +143,35 @@
|}
""".stripMargin
- inside (AuditEvent.parseRaw(jsonEvent).success.value) {
- case SshAuditEvent(gotAccessPath, gotSessionId, gotWho, gotTimeAtStart, gotWhat, gotElapsed, gotUUID, gotResult) =>
- gotSessionId shouldBe sessionId
- gotWho should contain(accountId)
+ inside(AuditEvent.parseRaw(jsonEvent).success.value) {
+ case SshAuditEvent(
+ gotAccessPath,
+ gotSessionId,
+ gotWho,
+ gotTimeAtStart,
+ gotWhat,
+ gotElapsed,
+ gotUUID,
+ gotResult
+ ) =>
+ gotSessionId shouldBe sessionId
+ gotWho should contain(accountId)
gotTimeAtStart shouldBe timeAtStart
- gotWhat shouldBe what
- gotElapsed shouldBe elapsed
- gotUUID shouldBe auditUUID
- gotAccessPath should contain(accessPath)
- gotResult shouldBe sshResult
+ gotWhat shouldBe what
+ gotElapsed shouldBe elapsed
+ gotUUID shouldBe auditUUID
+ gotAccessPath should contain(accessPath)
+ gotResult shouldBe sshResult
}
}
"A json representing a failed SSH authentication failure" should "be parsed into a success of SshAuditEvent" in {
- val sessionId = "000000000000000000000000000"
- val timeAtStart = 1542240154088L
- val what = "AUTH"
- val elapsed = 0
- val auditUUID = "audit:8d40c495-7b51-4003-81f2-718bc04addf3"
+ val sessionId = "000000000000000000000000000"
+ val timeAtStart = 1542240154088L
+ val what = "AUTH"
+ val elapsed = 0
+ val auditUUID = "audit:8d40c495-7b51-4003-81f2-718bc04addf3"
val failedResult = "FAIL"
val jsonEvent =
s"""
@@ -174,15 +192,24 @@
|}
""".stripMargin
- inside (AuditEvent.parseRaw(jsonEvent).success.value) {
- case SshAuditEvent(gotAccessPath, gotSessionId, gotWho, gotTimeAtStart, gotWhat, gotElapsed, gotUUID, gotResult) =>
- gotSessionId shouldBe sessionId
- gotWho shouldBe empty
+ inside(AuditEvent.parseRaw(jsonEvent).success.value) {
+ case SshAuditEvent(
+ gotAccessPath,
+ gotSessionId,
+ gotWho,
+ gotTimeAtStart,
+ gotWhat,
+ gotElapsed,
+ gotUUID,
+ gotResult
+ ) =>
+ gotSessionId shouldBe sessionId
+ gotWho shouldBe empty
gotTimeAtStart shouldBe timeAtStart
- gotWhat shouldBe what
- gotElapsed shouldBe elapsed
- gotUUID shouldBe auditUUID
- gotAccessPath shouldBe empty
+ gotWhat shouldBe what
+ gotElapsed shouldBe elapsed
+ gotUUID shouldBe auditUUID
+ gotAccessPath shouldBe empty
gotResult shouldBe failedResult
}
}
@@ -198,7 +225,7 @@
val what = "/config/server/info"
val elapsed = 177
val auditUUID = "audit:5f10fea5-35d1-4252-b86f-99db7a9b549b"
- val jsonEvent =
+ val jsonEvent =
s"""
|{
| "type": "ExtendedHttpAuditEvent",
@@ -225,17 +252,27 @@
|}
""".stripMargin
- inside (AuditEvent.parseRaw(jsonEvent).success.value) {
- case ExtendedHttpAuditEvent(gotAccessPath,gotHttpMethod,gotHttpResult,gotSessionId,gotWho,gotTimeAtStart,gotWhat,gotElapsed,gotUUID) =>
- gotSessionId shouldBe sessionId
- gotWho should contain(accountId)
+ inside(AuditEvent.parseRaw(jsonEvent).success.value) {
+ case ExtendedHttpAuditEvent(
+ gotAccessPath,
+ gotHttpMethod,
+ gotHttpResult,
+ gotSessionId,
+ gotWho,
+ gotTimeAtStart,
+ gotWhat,
+ gotElapsed,
+ gotUUID
+ ) =>
+ gotSessionId shouldBe sessionId
+ gotWho should contain(accountId)
gotTimeAtStart shouldBe timeAtStart
- gotHttpMethod shouldBe httpMethod
- gotWhat shouldBe what
- gotHttpResult shouldBe httpStatus
- gotElapsed shouldBe elapsed
- gotUUID shouldBe auditUUID
- gotAccessPath should contain(accessPath)
+ gotHttpMethod shouldBe httpMethod
+ gotWhat shouldBe what
+ gotHttpResult shouldBe httpStatus
+ gotElapsed shouldBe elapsed
+ gotUUID shouldBe auditUUID
+ gotAccessPath should contain(accessPath)
}
}
@@ -243,85 +280,113 @@
"an HttpAuditEvent" should "be serializable into json" in {
- val httpMethod = "GET"
- val httpStatus = "200"
- val sessionId = "someSessionId"
- val accessPath = "GIT"
+ val httpMethod = "GET"
+ val httpStatus = "200"
+ val sessionId = "someSessionId"
+ val accessPath = "GIT"
val timeAtStart = 1000L
- val elapsed = 12
- val what="https://review.gerrithub.io/Mirantis/tcp-qa/git-upload-pack"
- val uuid = "audit:5f10fea5-35d1-4252-b86f-99db7a9b549b"
+ val elapsed = 12
+ val what = "https://review.gerrithub.io/Mirantis/tcp-qa/git-upload-pack"
+ val uuid = "audit:5f10fea5-35d1-4252-b86f-99db7a9b549b"
val event = HttpAuditEvent(
- Some(accessPath), httpMethod, httpStatus, sessionId, None, timeAtStart, what, elapsed, uuid)
+ Some(accessPath),
+ httpMethod,
+ httpStatus,
+ sessionId,
+ None,
+ timeAtStart,
+ what,
+ elapsed,
+ uuid
+ )
val expectedJson: JValue =
- ("session_id" -> sessionId) ~
- ("access_path" -> accessPath) ~
- ("time_at_start" -> timeAtStart) ~
- ("http_method" -> httpMethod) ~
- ("result" -> httpStatus) ~
- ("what" -> what) ~
- ("elapsed" -> elapsed) ~
- ("uuid" -> uuid) ~
- ("audit_type" -> HttpAuditEvent.auditType)
+ ("session_id" -> sessionId) ~
+ ("access_path" -> accessPath) ~
+ ("time_at_start" -> timeAtStart) ~
+ ("http_method" -> httpMethod) ~
+ ("result" -> httpStatus) ~
+ ("what" -> what) ~
+ ("elapsed" -> elapsed) ~
+ ("uuid" -> uuid) ~
+ ("audit_type" -> HttpAuditEvent.auditType)
parse(event.toJsonString) shouldBe expectedJson
}
"an ExtendedHttpAuditEvent" should "be serializable into json" in {
- val httpMethod = "GET"
- val httpStatus = "200"
- val accessPath = "REST_API"
- val sessionId = "someSessionId"
- val accountId = 123
+ val httpMethod = "GET"
+ val httpStatus = "200"
+ val accessPath = "REST_API"
+ val sessionId = "someSessionId"
+ val accountId = 123
val timeAtStart = 1000L
- val what="/config/server/info"
- val elapsed = 22
- val uuid = "audit:5f10fea5-35d1-4252-b86f-99db7a9b549b"
+ val what = "/config/server/info"
+ val elapsed = 22
+ val uuid = "audit:5f10fea5-35d1-4252-b86f-99db7a9b549b"
- val event = ExtendedHttpAuditEvent(Some(accessPath), httpMethod, httpStatus, sessionId, Some(accountId), timeAtStart, what, elapsed, uuid)
+ val event = ExtendedHttpAuditEvent(
+ Some(accessPath),
+ httpMethod,
+ httpStatus,
+ sessionId,
+ Some(accountId),
+ timeAtStart,
+ what,
+ elapsed,
+ uuid
+ )
val expectedJson: JValue =
- ("session_id" -> sessionId) ~
- ("who" -> accountId) ~
- ("access_path" -> accessPath) ~
- ("time_at_start" -> timeAtStart) ~
- ("http_method" -> httpMethod) ~
- ("result" -> httpStatus) ~
- ("what" -> what) ~
- ("elapsed" -> elapsed) ~
- ("uuid" -> uuid) ~
- ("audit_type" -> ExtendedHttpAuditEvent.auditType)
+ ("session_id" -> sessionId) ~
+ ("who" -> accountId) ~
+ ("access_path" -> accessPath) ~
+ ("time_at_start" -> timeAtStart) ~
+ ("http_method" -> httpMethod) ~
+ ("result" -> httpStatus) ~
+ ("what" -> what) ~
+ ("elapsed" -> elapsed) ~
+ ("uuid" -> uuid) ~
+ ("audit_type" -> ExtendedHttpAuditEvent.auditType)
parse(event.toJsonString) shouldBe expectedJson
}
"an SshAuditEvent" should "be serializable into json" in {
val sshAuditEvent = "SSH"
- val sessionId = "2adc5bef"
- val accountId = 1009124
- val accessPath = "SSH_COMMAND"
- val timeAtStart = 1542240322369L
- val what = "gerrit.stream-events.-s.patchset-created.-s.change-restored.-s.comment-added"
- val elapsed = 12
- val uuid = "audit:dd74e098-9260-4720-9143-38a0a0a5e500"
- val sshResult = "0"
+ val sessionId = "2adc5bef"
+ val accountId = 1009124
+ val accessPath = "SSH_COMMAND"
+ val timeAtStart = 1542240322369L
+ val what = "gerrit.stream-events.-s.patchset-created.-s.change-restored.-s.comment-added"
+ val elapsed = 12
+ val uuid = "audit:dd74e098-9260-4720-9143-38a0a0a5e500"
+ val sshResult = "0"
- val event = SshAuditEvent(Some(accessPath), sessionId, Some(accountId), timeAtStart, what, elapsed, uuid, sshResult)
+ val event = SshAuditEvent(
+ Some(accessPath),
+ sessionId,
+ Some(accountId),
+ timeAtStart,
+ what,
+ elapsed,
+ uuid,
+ sshResult
+ )
val expectedJson: JValue =
- ("session_id" -> sessionId) ~
- ("who" -> accountId) ~
- ("access_path" -> accessPath) ~
- ("time_at_start" -> timeAtStart) ~
- ("what" -> what) ~
- ("elapsed" -> elapsed) ~
- ("uuid" -> uuid) ~
- ("audit_type" -> sshAuditEvent) ~
- ("result" -> sshResult)
+ ("session_id" -> sessionId) ~
+ ("who" -> accountId) ~
+ ("access_path" -> accessPath) ~
+ ("time_at_start" -> timeAtStart) ~
+ ("what" -> what) ~
+ ("elapsed" -> elapsed) ~
+ ("uuid" -> uuid) ~
+ ("audit_type" -> sshAuditEvent) ~
+ ("result" -> sshResult)
parse(event.toJsonString) shouldBe expectedJson
}
-}
\ No newline at end of file
+}
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/range/TimeRangeSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/range/TimeRangeSpec.scala
index db4f264..4efb4d9 100644
--- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/range/TimeRangeSpec.scala
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/range/TimeRangeSpec.scala
@@ -62,10 +62,10 @@
object TimeRangeSpec {
val yesterday: LocalDate = LocalDate.now().minusDays(1)
- val tomorrow: LocalDate = LocalDate.now().plusDays(1)
- val now: LocalDate = LocalDate.now()
- val nowMs: Long = Instant.now().toEpochMilli
- val yesterdayMs = yesterday.atStartOfDay().convertToUTCEpochMillis
- val tomorrowMs = tomorrow.atStartOfDay().convertToUTCEpochMillis
+ val tomorrow: LocalDate = LocalDate.now().plusDays(1)
+ val now: LocalDate = LocalDate.now()
+ val nowMs: Long = Instant.now().toEpochMilli
+ val yesterdayMs = yesterday.atStartOfDay().convertToUTCEpochMillis
+ val tomorrowMs = tomorrow.atStartOfDay().convertToUTCEpochMillis
-}
\ No newline at end of file
+}
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala
index 950fc12..c0b77b8 100644
--- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala
@@ -20,70 +20,70 @@
behavior of "extractCommand"
it should "extract gerrit command from gerrit ssh command" in {
- val what = s"gerrit.command.-f.with.some-r.options"
+ val what = s"gerrit.command.-f.with.some-r.options"
val accessPath = "SSH_COMMAND"
SparkExtractors.extractCommand(what, accessPath) shouldBe "gerrit"
}
it should "extract replication command from replication ssh command" in {
- val what = s"replication.start.GerritCodeReview/*"
+ val what = s"replication.start.GerritCodeReview/*"
val accessPath = "SSH_COMMAND"
SparkExtractors.extractCommand(what, accessPath) shouldBe "replication"
}
it should "extract 'LOGIN' command over SSH" in {
- val what = s"LOGIN"
+ val what = s"LOGIN"
val accessPath = "SSH_COMMAND"
SparkExtractors.extractCommand(what, accessPath) shouldBe what
}
it should "extract 'LOGOUT' command over SSH" in {
- val what = s"LOGOUT"
+ val what = s"LOGOUT"
val accessPath = "SSH_COMMAND"
SparkExtractors.extractCommand(what, accessPath) shouldBe what
}
it should "extract GIT 'git-upload-pack' command over SSH" in {
- val what = s"git-upload-pack./spdk/spdk"
+ val what = s"git-upload-pack./spdk/spdk"
val accessPath = "GIT"
SparkExtractors.extractCommand(what, accessPath) shouldBe "git-upload-pack"
}
it should "extract GIT 'git-receive-pack' command over SSH" in {
- val what = s"git-receive-pack./spdk/spdk"
+ val what = s"git-receive-pack./spdk/spdk"
val accessPath = "GIT"
SparkExtractors.extractCommand(what, accessPath) shouldBe "git-receive-pack"
}
it should "extract GIT 'git-upload-pack' command over HTTP" in {
- val what = s"https://review.gerrithub.io/rhos-infra/patch-components/git-upload-pack"
+ val what = s"https://review.gerrithub.io/rhos-infra/patch-components/git-upload-pack"
val accessPath = "GIT"
SparkExtractors.extractCommand(what, accessPath) shouldBe "git-upload-pack"
}
it should "extract GIT 'git-receive-pack' command over HTTP" in {
- val what = s"https://review.gerrithub.io/spdk/spdk/info/refs?service=git-receive-pack"
+ val what = s"https://review.gerrithub.io/spdk/spdk/info/refs?service=git-receive-pack"
val accessPath = "GIT"
SparkExtractors.extractCommand(what, accessPath) shouldBe "git-receive-pack"
}
it should "extract 'LOGOUT' command over GIT" in {
- val what = s"LOGOUT"
+ val what = s"LOGOUT"
val accessPath = "GIT"
SparkExtractors.extractCommand(what, accessPath) shouldBe what
}
it should "extract http method for rest api calls" in {
- val what = s"/config/server/version"
+ val what = s"/config/server/version"
val accessPath = "REST_API"
val httpMethod = "GET"
@@ -91,7 +91,7 @@
}
it should "extract http method for unknown access path" in {
- val what = s"/config/server/version"
+ val what = s"/config/server/version"
val accessPath = "UNKNOWN"
val httpMethod = "PUT"
@@ -99,21 +99,21 @@
}
it should "extract 'what' when access path is unexpected value" in {
- val what = s"any"
+ val what = s"any"
val accessPath = "unexpected"
SparkExtractors.extractCommand(what, accessPath) shouldBe what
}
it should "extract 'what' when access path is JSON_RPC" in {
- val what = s"ProjectAdminService.changeProjectAccess"
+ val what = s"ProjectAdminService.changeProjectAccess"
val accessPath = "JSON_RPC"
SparkExtractors.extractCommand(what, accessPath) shouldBe what
}
it should "extract failed SSH authentication when no access path is provided and what is AUTH" in {
- val what = s"AUTH"
+ val what = s"AUTH"
val accessPath = null
SparkExtractors.extractCommand(what, accessPath) shouldBe SparkExtractors.FAILED_SSH_AUTH
@@ -123,65 +123,65 @@
it should "extract SSH command arguments" in {
val sshArguments = "stream-events.-s.patchset-created.-s.change-restored.-s.comment-added"
- val what = s"gerrit.$sshArguments"
- val accessPath = "SSH_COMMAND"
+ val what = s"gerrit.$sshArguments"
+ val accessPath = "SSH_COMMAND"
SparkExtractors.extractCommandArguments(what, accessPath) should contain(sshArguments)
}
it should "extract GIT command arguments when in the form git-upload-pack.<gitArguments>" in {
val gitArguments = "/spdk/spdk.github.io"
- val what = s"git-upload-pack.$gitArguments"
- val accessPath = "GIT"
+ val what = s"git-upload-pack.$gitArguments"
+ val accessPath = "GIT"
SparkExtractors.extractCommandArguments(what, accessPath) should contain(gitArguments)
}
it should "extract GIT command arguments when in the form git-receive-pack.<gitArguments>" in {
val gitArguments = "/spdk/spdk.github.io"
- val what = s"git-receive-pack.$gitArguments"
- val accessPath = "GIT"
+ val what = s"git-receive-pack.$gitArguments"
+ val accessPath = "GIT"
SparkExtractors.extractCommandArguments(what, accessPath) should contain(gitArguments)
}
it should "extract GIT commands over HTTP endpoint as-is" in {
- val what = "https://review.gerrithub.io/redhat-openstack/infrared.git/git-upload-pack"
+ val what = "https://review.gerrithub.io/redhat-openstack/infrared.git/git-upload-pack"
val accessPath = "GIT"
SparkExtractors.extractCommandArguments(what, accessPath) should contain(what)
}
it should "extract empty arguments for 'LOGOUT' commands" in {
- val what = "LOGOUT"
+ val what = "LOGOUT"
val accessPath = "GIT"
SparkExtractors.extractCommandArguments(what, accessPath) shouldBe empty
}
it should "extract REST API endpoint as-is" in {
- val what = "/changes/ffilz%2Fnfs-ganesha~372229/comments"
+ val what = "/changes/ffilz%2Fnfs-ganesha~372229/comments"
val accessPath = "REST_API"
SparkExtractors.extractCommandArguments(what, accessPath) should contain(what)
}
it should "extract empty arguments for a failed ssh authorization" in {
- val what = s"AUTH"
+ val what = s"AUTH"
val accessPath = null
SparkExtractors.extractCommandArguments(what, accessPath) shouldBe empty
}
it should "extract empty arguments a JSON _RPC access path" in {
- val what = s"some_command"
+ val what = s"some_command"
val accessPath = "JSON_RPC"
SparkExtractors.extractCommandArguments(what, accessPath) shouldBe empty
}
it should "extract empty arguments for an unexpected access path" in {
- val what = s"any"
+ val what = s"any"
val accessPath = "unexpected"
SparkExtractors.extractCommandArguments(what, accessPath) shouldBe empty
@@ -190,65 +190,65 @@
behavior of "extractSubCommand"
it should "extract SSH gerrit sub-command" in {
- val what = s"gerrit.stream-events.-s.patchset-created.-s.change-restored.-s.comment-added"
+ val what = s"gerrit.stream-events.-s.patchset-created.-s.change-restored.-s.comment-added"
val accessPath = "SSH_COMMAND"
SparkExtractors.extractSubCommand(what, accessPath) shouldBe Some("stream-events")
}
it should "extract SSH replication sub-command" in {
- val what = s"replication.start.GerritCodeReview/*"
+ val what = s"replication.start.GerritCodeReview/*"
val accessPath = "SSH_COMMAND"
SparkExtractors.extractSubCommand(what, accessPath) shouldBe Some("start")
}
it should "return no sub-commands for GIT commands - SSH" in {
- val what = s"git-upload-pack./spdk/spdk.github.io"
+ val what = s"git-upload-pack./spdk/spdk.github.io"
val accessPath = "GIT"
SparkExtractors.extractSubCommand(what, accessPath) shouldBe None
}
it should "return no sub-commands for GIT commands - HTTP" in {
- val what = "https://review.gerrithub.io/redhat-openstack/infrared.git/git-upload-pack"
+ val what = "https://review.gerrithub.io/redhat-openstack/infrared.git/git-upload-pack"
val accessPath = "GIT"
SparkExtractors.extractSubCommand(what, accessPath) shouldBe None
}
it should "extract REST API sub-command" in {
- val what = "/changes/ffilz%2Fnfs-ganesha~372229/comments"
+ val what = "/changes/ffilz%2Fnfs-ganesha~372229/comments"
val accessPath = "REST_API"
SparkExtractors.extractSubCommand(what, accessPath) shouldBe Some("changes")
}
it should "extract authenticated REST API sub-command" in {
- val what = "/a/changes/ffilz%2Fnfs-ganesha~372229/comments"
+ val what = "/a/changes/ffilz%2Fnfs-ganesha~372229/comments"
val accessPath = "REST_API"
SparkExtractors.extractSubCommand(what, accessPath) shouldBe Some("changes")
}
it should "return no sub-commands failed AUTH" in {
- val what = s"AUTH"
+ val what = s"AUTH"
val accessPath = null
SparkExtractors.extractSubCommand(what, accessPath) shouldBe None
}
it should "return no sub-commands for JSON _RPC commands" in {
- val what = s"some_command"
+ val what = s"some_command"
val accessPath = "JSON_RPC"
SparkExtractors.extractSubCommand(what, accessPath) shouldBe None
}
it should "return no sub-commands for an unexpected access path" in {
- val what = s"any"
+ val what = s"any"
val accessPath = "unexpected"
SparkExtractors.extractSubCommand(what, accessPath) shouldBe None
}
-}
\ No newline at end of file
+}
diff --git a/build.sbt b/build.sbt
index 2a1b5d6..272e064 100644
--- a/build.sbt
+++ b/build.sbt
@@ -16,13 +16,18 @@
val artifact: File = assembly.value
val entryPointBase = s"/app"
- baseDockerfile(projectName = "gitcommits",
- artifact,
- artifactTargetPath = s"$entryPointBase/${name.value}-assembly.jar")
- .copy(baseDirectory(_ / "scripts" / "gerrit-analytics-etl-gitcommits.sh").value,
- file(s"$entryPointBase/gerrit-analytics-etl-gitcommits.sh"))
- .copy(baseDirectory(_ / "scripts" / "wait-for-elasticsearch.sh").value,
- file(s"$entryPointBase/wait-for-elasticsearch.sh"))
+ baseDockerfile(
+ projectName = "gitcommits",
+ artifact,
+ artifactTargetPath = s"$entryPointBase/${name.value}-assembly.jar"
+ ).copy(
+ baseDirectory(_ / "scripts" / "gerrit-analytics-etl-gitcommits.sh").value,
+ file(s"$entryPointBase/gerrit-analytics-etl-gitcommits.sh")
+ )
+ .copy(
+ baseDirectory(_ / "scripts" / "wait-for-elasticsearch.sh").value,
+ file(s"$entryPointBase/wait-for-elasticsearch.sh")
+ )
.cmd(s"/bin/sh", s"$entryPointBase/gerrit-analytics-etl-gitcommits.sh")
}
)
@@ -37,13 +42,18 @@
dockerfile in docker := {
val artifact: File = assembly.value
val entryPointBase = s"/app"
- baseDockerfile(projectName = "auditlog",
- artifact,
- artifactTargetPath = s"$entryPointBase/${name.value}-assembly.jar")
- .copy(baseDirectory(_ / "scripts" / "gerrit-analytics-etl-auditlog.sh").value,
- file(s"$entryPointBase/gerrit-analytics-etl-auditlog.sh"))
- .copy(baseDirectory(_ / "scripts" / "wait-for-elasticsearch.sh").value,
- file(s"$entryPointBase/wait-for-elasticsearch.sh"))
+ baseDockerfile(
+ projectName = "auditlog",
+ artifact,
+ artifactTargetPath = s"$entryPointBase/${name.value}-assembly.jar"
+ ).copy(
+ baseDirectory(_ / "scripts" / "gerrit-analytics-etl-auditlog.sh").value,
+ file(s"$entryPointBase/gerrit-analytics-etl-auditlog.sh")
+ )
+ .copy(
+ baseDirectory(_ / "scripts" / "wait-for-elasticsearch.sh").value,
+ file(s"$entryPointBase/wait-for-elasticsearch.sh")
+ )
.volume(s"$entryPointBase/events/")
.volume(s"$entryPointBase/data/")
.cmd(s"/bin/sh", s"$entryPointBase/gerrit-analytics-etl-auditlog.sh")
diff --git a/common/src/main/scala/com/gerritforge/analytics/common/api/ElasticSearchAliasOps.scala b/common/src/main/scala/com/gerritforge/analytics/common/api/ElasticSearchAliasOps.scala
index 81bf831..942049e 100644
--- a/common/src/main/scala/com/gerritforge/analytics/common/api/ElasticSearchAliasOps.scala
+++ b/common/src/main/scala/com/gerritforge/analytics/common/api/ElasticSearchAliasOps.scala
@@ -40,8 +40,10 @@
}
- def moveAliasToNewIndex(aliasName: String,
- newIndexName: String): Future[Response[AliasActionResponse]] = {
+ def moveAliasToNewIndex(
+ aliasName: String,
+ newIndexName: String
+ ): Future[Response[AliasActionResponse]] = {
val oldIndices: Future[Iterable[Index]] = getIndicesFromAlias(aliasName)
oldIndices.flatMap { indices =>
@@ -51,7 +53,8 @@
val addAliasAction: AddAliasActionRequest = addAlias(aliasName) on newIndexName
logger.info(
- s"Replacing old indices (${indices.mkString(",")}) with $newIndexName from alias $aliasName")
+ s"Replacing old indices (${indices.mkString(",")}) with $newIndexName from alias $aliasName"
+ )
esClient.execute {
aliases(
diff --git a/common/src/main/scala/com/gerritforge/analytics/common/api/TrustAll.scala b/common/src/main/scala/com/gerritforge/analytics/common/api/TrustAll.scala
index 8d68514..efcb3ff 100644
--- a/common/src/main/scala/com/gerritforge/analytics/common/api/TrustAll.scala
+++ b/common/src/main/scala/com/gerritforge/analytics/common/api/TrustAll.scala
@@ -19,11 +19,11 @@
import javax.net.ssl._
object TrustAll extends X509TrustManager {
- override val getAcceptedIssuers: Array[X509Certificate] = Array.empty[X509Certificate]
+ override val getAcceptedIssuers: Array[X509Certificate] = Array.empty[X509Certificate]
override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String): Unit = ()
override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String): Unit = ()
}
object VerifiesAllHostNames extends HostnameVerifier {
override def verify(s: String, sslSession: SSLSession) = true
-}
\ No newline at end of file
+}
diff --git a/common/src/main/scala/com/gerritforge/analytics/common/api/gerritApiConnectivity.scala b/common/src/main/scala/com/gerritforge/analytics/common/api/gerritApiConnectivity.scala
index 91fa6aa..8947cc3 100644
--- a/common/src/main/scala/com/gerritforge/analytics/common/api/gerritApiConnectivity.scala
+++ b/common/src/main/scala/com/gerritforge/analytics/common/api/gerritApiConnectivity.scala
@@ -24,7 +24,7 @@
sealed trait HttpBasicAuthentication {
- val BASIC = "Basic"
+ val BASIC = "Basic"
val AUTHORIZATION = "Authorization"
def encodeCredentials(username: String, password: String): String = {
@@ -35,12 +35,22 @@
BASIC + " " + encodeCredentials(username, password)
}
-class GerritConnectivity(maybeUsername: Option[String], maybePassword: Option[String], ignoreSSLCert: Boolean = false) extends HttpBasicAuthentication with Serializable with LazyLogging {
- private def createBasicSecuredConnection(url: String, username: String, password: String): BufferedSource = {
+class GerritConnectivity(
+ maybeUsername: Option[String],
+ maybePassword: Option[String],
+ ignoreSSLCert: Boolean = false
+) extends HttpBasicAuthentication
+ with Serializable
+ with LazyLogging {
+ private def createBasicSecuredConnection(
+ url: String,
+ username: String,
+ password: String
+ ): BufferedSource = {
try {
- if(ignoreSSLCert) trustAllSSLCerts()
+ if (ignoreSSLCert) trustAllSSLCerts()
- val unsecureURL = new URL(url)
+ val unsecureURL = new URL(url)
val endPointPath = unsecureURL.getFile
val basicAuthURL = unsecureURL.toString.replace(endPointPath, s"/a$endPointPath")
@@ -49,14 +59,13 @@
val connection = new URL(basicAuthURL).openConnection
connection.setRequestProperty(AUTHORIZATION, getHeader(username, password))
Source.fromInputStream(connection.getInputStream, Codec.UTF8.name)
- }
- catch {
+ } catch {
case e: Exception => throw new Exception(s"Unable to connect to $url. $e")
}
}
private def createNonSecuredConnection(url: String): BufferedSource = {
- if(ignoreSSLCert) trustAllSSLCerts()
+ if (ignoreSSLCert) trustAllSSLCerts()
logger.info(s"Connecting to API $url")
Source.fromURL(url, Codec.UTF8.name)
@@ -68,7 +77,7 @@
username <- maybeUsername
password <- maybePassword
} yield (createBasicSecuredConnection(url, username, password))
- ).getOrElse(createNonSecuredConnection(url))
+ ).getOrElse(createNonSecuredConnection(url))
}
private def trustAllSSLCerts(): Unit = {
diff --git a/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala b/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala
index 1c208aa..5ccaf08 100644
--- a/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala
+++ b/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala
@@ -36,13 +36,16 @@
with LazyLogging
with SparkEsClientProvider {
- def saveToEsWithAliasSwap(aliasName: String,
- documentType: String): EnrichedAliasActionResponse = {
+ def saveToEsWithAliasSwap(
+ aliasName: String,
+ documentType: String
+ ): EnrichedAliasActionResponse = {
val newIndexNameWithTime = IndexNameGenerator.timeBasedIndexName(aliasName, Instant.now())
val newPersistencePath = s"$newIndexNameWithTime/$documentType"
logger.info(
- s"Storing data into $newPersistencePath and swapping alias $aliasName to read from the new index")
+ s"Storing data into $newPersistencePath and swapping alias $aliasName to read from the new index"
+ )
import scala.concurrent.ExecutionContext.Implicits.global
// Save data
@@ -52,7 +55,8 @@
.saveToEs(newPersistencePath)
logger.info(
- s"Successfully stored the data into index $newIndexNameWithTime. Will now update the alias $aliasName")
+ s"Successfully stored the data into index $newIndexNameWithTime. Will now update the alias $aliasName"
+ )
moveAliasToNewIndex(aliasName, newIndexNameWithTime).flatMap { response =>
if (response.isSuccess && response.result.success) {
logger.info("Alias was updated successfully")
diff --git a/common/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala b/common/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
index 94c53f1..ca25ce4 100644
--- a/common/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
+++ b/common/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
@@ -22,58 +22,62 @@
import scala.util.Try
- object AnalyticsDateTimeFormatter {
+object AnalyticsDateTimeFormatter {
- val yyyy_MM_dd_HHmmss_SSSSSSSSS: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS")
- val yyyy_MM_dd: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
+ val yyyy_MM_dd_HHmmss_SSSSSSSSS: DateTimeFormatter =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS")
+ val yyyy_MM_dd: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
- val yyyyMMddHH: SimpleDateFormat = buildSimpleDateFormatUTC("yyyyMMddHH")
- val yyyyMMdd: SimpleDateFormat= buildSimpleDateFormatUTC("yyyyMMdd")
- val yyyyMM: SimpleDateFormat = buildSimpleDateFormatUTC("yyyyMM")
- val yyyy: SimpleDateFormat = buildSimpleDateFormatUTC("yyyy")
+ val yyyyMMddHH: SimpleDateFormat = buildSimpleDateFormatUTC("yyyyMMddHH")
+ val yyyyMMdd: SimpleDateFormat = buildSimpleDateFormatUTC("yyyyMMdd")
+ val yyyyMM: SimpleDateFormat = buildSimpleDateFormatUTC("yyyyMM")
+ val yyyy: SimpleDateFormat = buildSimpleDateFormatUTC("yyyy")
- private def buildSimpleDateFormatUTC(pattern: String): SimpleDateFormat = {
- val simpleDateFormat = new SimpleDateFormat(pattern)
- simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
- simpleDateFormat
- }
+ private def buildSimpleDateFormatUTC(pattern: String): SimpleDateFormat = {
+ val simpleDateFormat = new SimpleDateFormat(pattern)
+ simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
+ simpleDateFormat
+ }
+}
+
+object CommonTimeOperations {
+ def nowEpoch: Long = Instant.now().atOffset(ZoneOffset.UTC).toInstant.getEpochSecond
+
+ def epochToSqlTimestampOps(epoch: Long) = new Timestamp(epoch)
+
+ def nowSqlTimestmap: Timestamp = epochToSqlTimestampOps(nowEpoch)
+
+ def utcDateTimeFromEpoch(epoch: Long): LocalDateTime =
+ LocalDateTime.ofEpochSecond(epoch, 0, ZoneOffset.UTC)
+}
+
+object implicits {
+
+ implicit class LocalDateTimeOps(val localDateTime: LocalDateTime) extends AnyVal {
+ def convertToUTCEpochMillis: Long =
+ localDateTime.atOffset(ZoneOffset.UTC).toInstant.toEpochMilli
+
+ def convertToUTCLocalDateTime: OffsetDateTime = localDateTime.atOffset(ZoneOffset.UTC)
}
- object CommonTimeOperations {
- def nowEpoch: Long = Instant.now().atOffset(ZoneOffset.UTC).toInstant.getEpochSecond
+ implicit class StringToTimeParsingOps(val dateStr: String) extends AnyVal {
+ def parseStringToUTCEpoch(stringFormat: DateTimeFormatter): Option[Long] =
+ Try(LocalDateTime.parse(dateStr, stringFormat).convertToUTCEpochMillis).toOption
- def epochToSqlTimestampOps(epoch: Long) = new Timestamp(epoch)
-
- def nowSqlTimestmap: Timestamp = epochToSqlTimestampOps(nowEpoch)
-
- def utcDateTimeFromEpoch(epoch: Long): LocalDateTime = LocalDateTime.ofEpochSecond(epoch, 0, ZoneOffset.UTC)
+ def parseStringToLocalDate(stringFormat: DateTimeFormatter): Option[LocalDate] =
+ Try(LocalDate.parse(dateStr, stringFormat)).toOption
}
- object implicits {
+}
- implicit class LocalDateTimeOps(val localDateTime: LocalDateTime) extends AnyVal {
- def convertToUTCEpochMillis: Long = localDateTime.atOffset(ZoneOffset.UTC).toInstant.toEpochMilli
+trait DateConversions {
+ val NO_TIMESTAMP = new Timestamp(0L)
- def convertToUTCLocalDateTime: OffsetDateTime = localDateTime.atOffset(ZoneOffset.UTC)
- }
-
- implicit class StringToTimeParsingOps(val dateStr: String) extends AnyVal {
- def parseStringToUTCEpoch(stringFormat: DateTimeFormatter): Option[Long] =
- Try(LocalDateTime.parse(dateStr, stringFormat).convertToUTCEpochMillis).toOption
-
- def parseStringToLocalDate(stringFormat: DateTimeFormatter): Option[LocalDate] =
- Try(LocalDate.parse(dateStr, stringFormat)).toOption
- }
-
+ implicit def timestampToLocalDate(timestamp: Timestamp): Option[LocalDate] = timestamp match {
+ case NO_TIMESTAMP => None
+ case ts => Some(ts.toLocalDateTime.toLocalDate)
}
- trait DateConversions {
- val NO_TIMESTAMP = new Timestamp(0L)
-
- implicit def timestampToLocalDate(timestamp: Timestamp): Option[LocalDate] = timestamp match {
- case NO_TIMESTAMP => None
- case ts => Some(ts.toLocalDateTime.toLocalDate)
- }
-
- implicit def nullableStringToOption(nullableString: String): Option[String] = Option(nullableString)
- }
+ implicit def nullableStringToOption(nullableString: String): Option[String] =
+ Option(nullableString)
+}
diff --git a/common/src/main/scala/com/gerritforge/analytics/support/ops/ReadsOps.scala b/common/src/main/scala/com/gerritforge/analytics/support/ops/ReadsOps.scala
index 2e0ed66..1a61f62 100644
--- a/common/src/main/scala/com/gerritforge/analytics/support/ops/ReadsOps.scala
+++ b/common/src/main/scala/com/gerritforge/analytics/support/ops/ReadsOps.scala
@@ -31,7 +31,9 @@
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(
- s"Invalid date '$dateStr' expected format is '${cliDateFormat}'", e)
+ s"Invalid date '$dateStr' expected format is '${cliDateFormat}'",
+ e
+ )
}
}
}
diff --git a/common/src/test/scala/com/gerritforge/analytics/infrastructure/ElasticSearchPimpedWriterIT.scala b/common/src/test/scala/com/gerritforge/analytics/infrastructure/ElasticSearchPimpedWriterIT.scala
index 272bc8c..011ccc3 100644
--- a/common/src/test/scala/com/gerritforge/analytics/infrastructure/ElasticSearchPimpedWriterIT.scala
+++ b/common/src/test/scala/com/gerritforge/analytics/infrastructure/ElasticSearchPimpedWriterIT.scala
@@ -39,8 +39,10 @@
// Writing into the first index
val dataIntoIndexOne: Dataset[String] = "Content in the first index".split(" ").toList.toDS()
- Await.result(dataIntoIndexOne.saveToEsWithAliasSwap(aliasName, documentName).futureAction,
- 2 seconds)
+ Await.result(
+ dataIntoIndexOne.saveToEsWithAliasSwap(aliasName, documentName).futureAction,
+ 2 seconds
+ )
// Reading from the alias
val resultFromAliasFirst: Dataset[String] =
spark.read.format("es").load(s"$aliasName/$documentName").as[String]
@@ -52,8 +54,10 @@
// Writing into the second index
val dataIntoIndexTwo: Dataset[String] = "Content in the second index".split(" ").toList.toDS()
- Await.result(dataIntoIndexTwo.saveToEsWithAliasSwap(aliasName, documentName).futureAction,
- 2 seconds)
+ Await.result(
+ dataIntoIndexTwo.saveToEsWithAliasSwap(aliasName, documentName).futureAction,
+ 2 seconds
+ )
// Reading from the alias
val resultFromAliasSecond: Dataset[String] =
spark.read.format("es").load(s"$aliasName/$documentName").as[String]
diff --git a/common/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala b/common/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala
index 8c2d456..9bc1066 100644
--- a/common/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala
+++ b/common/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala
@@ -60,7 +60,8 @@
LocalDateTime
.of(2018, 1, 1, 12, 0, 0, 0)
.atOffset(ZoneOffset.UTC)
- .toInstant.toEpochMilli
+ .toInstant
+ .toEpochMilli
val yyyyMMddHHStr = "2018010112"
AnalyticsDateTimeFormatter.yyyyMMddHH.format(epochValueUTC) should equal(yyyyMMddHHStr)
@@ -71,7 +72,8 @@
LocalDateTime
.of(2018, 1, 1, 12, 0, 0, 0)
.atOffset(ZoneOffset.UTC)
- .toInstant.toEpochMilli
+ .toInstant
+ .toEpochMilli
val yyyyMMddStr = "20180101"
AnalyticsDateTimeFormatter.yyyyMMdd.format(epochValueUTC) should equal(yyyyMMddStr)
@@ -82,7 +84,8 @@
LocalDateTime
.of(2018, 1, 1, 12, 0, 0, 0)
.atOffset(ZoneOffset.UTC)
- .toInstant.toEpochMilli
+ .toInstant
+ .toEpochMilli
val yyyyMMStr = "201801"
AnalyticsDateTimeFormatter.yyyyMM.format(epochValueUTC) should equal(yyyyMMStr)
@@ -93,7 +96,8 @@
LocalDateTime
.of(2018, 1, 1, 12, 0, 0, 0)
.atOffset(ZoneOffset.UTC)
- .toInstant.toEpochMilli
+ .toInstant
+ .toEpochMilli
val yyyyStr = "2018"
AnalyticsDateTimeFormatter.yyyy.format(epochValueUTC) should equal(yyyyStr)
diff --git a/common/src/test/scala/com/gerritforge/analytics/support/ops/IndexNameGeneratorSpec.scala b/common/src/test/scala/com/gerritforge/analytics/support/ops/IndexNameGeneratorSpec.scala
index 542637c..69dd337 100644
--- a/common/src/test/scala/com/gerritforge/analytics/support/ops/IndexNameGeneratorSpec.scala
+++ b/common/src/test/scala/com/gerritforge/analytics/support/ops/IndexNameGeneratorSpec.scala
@@ -33,4 +33,4 @@
val expectedIndexName = s"${indexName}_2019-01-01_${instantUTC.toEpochMilli}"
timeBasedIndexName shouldEqual (expectedIndexName)
}
-}
\ No newline at end of file
+}
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala
index ce96f28..37f9edd 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala
@@ -30,22 +30,38 @@
implicit class PimpedGerritProjectRDD(val rdd: RDD[GerritProject]) extends AnyVal {
- def enrichWithSource(projectToContributorsAnalyticsUrlFactory: String => Option[String]): RDD[ProjectContributionSource] = {
+ def enrichWithSource(
+ projectToContributorsAnalyticsUrlFactory: String => Option[String]
+ ): RDD[ProjectContributionSource] = {
rdd.map { project =>
- ProjectContributionSource(project.name, projectToContributorsAnalyticsUrlFactory(project.id))
+ ProjectContributionSource(
+ project.name,
+ projectToContributorsAnalyticsUrlFactory(project.id)
+ )
}
}
}
- def getProjectJsonContributorsArray(project: String, sourceURL: Option[String], gerritApiConnection: GerritConnectivity): Array[(String, String)] = {
- sourceURL.toArray.flatMap(getProjectJsonContributorsArrayFromUrl(project, _, gerritApiConnection))
+ def getProjectJsonContributorsArray(
+ project: String,
+ sourceURL: Option[String],
+ gerritApiConnection: GerritConnectivity
+ ): Array[(String, String)] = {
+ sourceURL.toArray.flatMap(
+ getProjectJsonContributorsArrayFromUrl(project, _, gerritApiConnection)
+ )
}
def filterEmptyStrings(urlSource: BufferedSource): Iterator[String] =
- urlSource.getLines()
+ urlSource
+ .getLines()
.filterNot(_.trim.isEmpty)
- def getProjectJsonContributorsArrayFromUrl(project: String, sourceURL: String, gerritApiConnection: GerritConnectivity): Array[(String, String)] = {
+ def getProjectJsonContributorsArrayFromUrl(
+ project: String,
+ sourceURL: String,
+ gerritApiConnection: GerritConnectivity
+ ): Array[(String, String)] = {
try {
filterEmptyStrings(gerritApiConnection.getContentFromApi(sourceURL))
.map(s => (project, s))
@@ -70,22 +86,24 @@
case class CommitInfo(sha1: String, date: Long, merge: Boolean)
- case class UserActivitySummary(year: Integer,
- month: Integer,
- day: Integer,
- hour: Integer,
- name: String,
- email: String,
- num_commits: Integer,
- num_files: Integer,
- num_distinct_files: Integer,
- added_lines: Integer,
- deleted_lines: Integer,
- commits: Array[CommitInfo],
- branches: Array[String],
- last_commit_date: Long,
- is_merge: Boolean,
- is_bot_like: Boolean)
+ case class UserActivitySummary(
+ year: Integer,
+ month: Integer,
+ day: Integer,
+ hour: Integer,
+ name: String,
+ email: String,
+ num_commits: Integer,
+ num_files: Integer,
+ num_distinct_files: Integer,
+ added_lines: Integer,
+ deleted_lines: Integer,
+ commits: Array[CommitInfo],
+ branches: Array[String],
+ last_commit_date: Long,
+ is_merge: Boolean,
+ is_bot_like: Boolean
+ )
import org.apache.spark.sql.Encoders
@@ -98,8 +116,7 @@
def extractCommits(df: DataFrame)(implicit spark: SparkSession): Dataset[String] = {
import spark.implicits._
- df
- .select(explode($"commits.sha1"))
+ df.select(explode($"commits.sha1"))
.as[String]
.distinct() //might be useless this distinct, just want to be sure I'm respecting the contract
}
@@ -110,12 +127,22 @@
import spark.sqlContext.implicits._
df.withColumn("json", from_json($"json", schema))
.selectExpr(
- "project", "json.name as author", "json.email as email",
- "json.year as year", "json.month as month", "json.day as day", "json.hour as hour",
- "json.num_files as num_files", "json.num_distinct_files as num_distinct_files",
- "json.added_lines as added_lines", "json.deleted_lines as deleted_lines",
- "json.num_commits as num_commits", "json.last_commit_date as last_commit_date",
- "json.is_merge as is_merge", "json.commits as commits", "json.branches as branches",
+ "project",
+ "json.name as author",
+ "json.email as email",
+ "json.year as year",
+ "json.month as month",
+ "json.day as day",
+ "json.hour as hour",
+ "json.num_files as num_files",
+ "json.num_distinct_files as num_distinct_files",
+ "json.added_lines as added_lines",
+ "json.deleted_lines as deleted_lines",
+ "json.num_commits as num_commits",
+ "json.last_commit_date as last_commit_date",
+ "json.is_merge as is_merge",
+ "json.commits as commits",
+ "json.branches as branches",
"json.is_bot_like"
)
}
@@ -129,9 +156,13 @@
.withColumnRenamed("organization", "organization_alias")
df.join(renamedAliasesDF, df("email") === renamedAliasesDF("email_alias"), "left_outer")
- .withColumn("organization",
- when(renamedAliasesDF("organization_alias").notEqual(""), lower(renamedAliasesDF("organization_alias")))
- .otherwise(df("organization")))
+ .withColumn(
+ "organization",
+ when(
+ renamedAliasesDF("organization_alias").notEqual(""),
+ lower(renamedAliasesDF("organization_alias"))
+ ).otherwise(df("organization"))
+ )
.withColumn("author", coalesce(renamedAliasesDF("author_alias"), df("author")))
.drop("email_alias", "author_alias", "organization_alias")
}
@@ -149,9 +180,10 @@
extractCommits(df)
}
- def dashboardStats(aliasesDFMaybe: Option[DataFrame])(implicit spark: SparkSession): DataFrame = {
- df
- .addOrganization()
+ def dashboardStats(
+ aliasesDFMaybe: Option[DataFrame]
+ )(implicit spark: SparkSession): DataFrame = {
+ df.addOrganization()
.handleAliases(aliasesDFMaybe)
.dropCommits
}
@@ -159,16 +191,20 @@
private def emailToDomain(email: String): String = email match {
case Email(_, domain) => domain
- case _ => ""
+ case _ => ""
}
private def emailToDomainUdf = udf(emailToDomain(_: String))
- implicit class PimpedRDDProjectContributionSource(val projectsAndUrls: RDD[ProjectContributionSource]) extends AnyVal {
+ implicit class PimpedRDDProjectContributionSource(
+ val projectsAndUrls: RDD[ProjectContributionSource]
+ ) extends AnyVal {
- def fetchRawContributors(gerritApiConnection: GerritConnectivity)(implicit spark: SparkSession): RDD[(String, String)] = {
- projectsAndUrls.flatMap {
- p => getProjectJsonContributorsArray(p.name, p.contributorsUrl, gerritApiConnection)
+ def fetchRawContributors(
+ gerritApiConnection: GerritConnectivity
+ )(implicit spark: SparkSession): RDD[(String, String)] = {
+ projectsAndUrls.flatMap { p =>
+ getProjectJsonContributorsArray(p.name, p.contributorsUrl, gerritApiConnection)
}
}
}
@@ -180,10 +216,15 @@
def longDateToISO(in: Number): String =
ZonedDateTime.ofInstant(
LocalDateTime.ofEpochSecond(in.longValue() / 1000L, 0, ZoneOffset.UTC),
- ZoneOffset.UTC, ZoneId.of("Z")
+ ZoneOffset.UTC,
+ ZoneId.of("Z")
) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
- def getContributorStats(projects: RDD[GerritProject], projectToContributorsAnalyticsUrlFactory: String => Option[String], gerritApiConnection: GerritConnectivity)(implicit spark: SparkSession) = {
+ def getContributorStats(
+ projects: RDD[GerritProject],
+ projectToContributorsAnalyticsUrlFactory: String => Option[String],
+ gerritApiConnection: GerritConnectivity
+ )(implicit spark: SparkSession) = {
import spark.sqlContext.implicits._ // toDF
projects
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
index 6f51c0d..447102a 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
@@ -16,7 +16,11 @@
import java.time.LocalDate
-import com.gerritforge.analytics.gitcommits.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
+import com.gerritforge.analytics.gitcommits.model.{
+ GerritEndpointConfig,
+ GerritProject,
+ GerritProjectsSupport
+}
import com.gerritforge.analytics.spark.SparkApp
import com.gerritforge.analytics.support.ops.ReadsOps._
import com.typesafe.scalalogging.LazyLogging
@@ -115,13 +119,16 @@
logger.info(
s"Loaded a list of ${projects.size} projects ${if (projects.size > 20) projects.take(20).mkString("[", ",", ", ...]")
- else projects.mkString("[", ",", "]")}")
+ else projects.mkString("[", ",", "]")}"
+ )
val aliasesDF = getAliasDF(config.emailAlias)
- val contributorsStats = getContributorStats(spark.sparkContext.parallelize(projects),
- config.contributorsUrl,
- config.gerritApiConnection)
+ val contributorsStats = getContributorStats(
+ spark.sparkContext.parallelize(projects),
+ config.contributorsUrl,
+ config.gerritApiConnection
+ )
contributorsStats.dashboardStats(aliasesDF)
}
@@ -146,7 +153,8 @@
def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
config.gerritProjectsUrl.toSeq.flatMap { url =>
GerritProjectsSupport.parseJsonProjectListResponse(
- config.gerritApiConnection.getContentFromApi(url))
+ config.gerritApiConnection.getContentFromApi(url)
+ )
}
}
}
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/Email.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/Email.scala
index e99a8f1..c18fe0e 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/Email.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/Email.scala
@@ -18,13 +18,13 @@
object Email {
val emailWithOutExtension = """(.*?)@([^.]+)$""".r
- val emailWithExtension = """(.*?)@(.*?)(?:\.co)?.[a-z]{2,4}$""".r
+ val emailWithExtension = """(.*?)@(.*?)(?:\.co)?.[a-z]{2,4}$""".r
def unapply(emailString: String): Option[(String, String)] = {
emailString.toLowerCase match {
- case emailWithOutExtension(u,d) => Some(u,d)
- case emailWithExtension(u,d) => Some(u,d)
- case _ => None
+ case emailWithOutExtension(u, d) => Some(u, d)
+ case emailWithExtension(u, d) => Some(u, d)
+ case _ => None
}
}
}
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala
index c853b2e..a27611f 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala
@@ -33,9 +33,11 @@
username: Option[String] = None,
password: Option[String] = None,
ignoreSSLCert: Option[Boolean] = None,
- extractBranches: Option[Boolean] = None) {
+ extractBranches: Option[Boolean] = None
+) {
- val gerritApiConnection: GerritConnectivity = new GerritConnectivity(username, password, ignoreSSLCert.getOrElse(false))
+ val gerritApiConnection: GerritConnectivity =
+ new GerritConnectivity(username, password, ignoreSSLCert.getOrElse(false))
val gerritProjectsUrl: Option[String] = baseUrl.map { url =>
s"${url}/projects/" + prefix.fold("")("?p=" + _)
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritProject.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritProject.scala
index 31b6a3a..3259f4d 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritProject.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritProject.scala
@@ -37,8 +37,7 @@
object GerritProjectsSupport {
def parseJsonProjectListResponse(jsonSource: Source): Seq[GerritProject] = {
- parse(jsonSource.dropGerritPrefix.mkString)
- .values
+ parse(jsonSource.dropGerritPrefix.mkString).values
.asInstanceOf[Map[String, Map[String, String]]]
.mapValues(projectAttributes => projectAttributes("id"))
.toSeq
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/GerritConfigSupport.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/GerritConfigSupport.scala
index 51f1c39..3587b16 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/GerritConfigSupport.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/GerritConfigSupport.scala
@@ -13,7 +13,7 @@
listenUrl match {
case portRegex(_, port, path) =>
val url = s"http://127.0.0.1:$port/$path"
- if(url.endsWith("/")) {
+ if (url.endsWith("/")) {
Some(url.dropRight(1))
} else {
Some(url)
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
index 7fa8239..d647f84 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
@@ -4,7 +4,11 @@
import java.time.LocalDate
import com.gerritforge.analytics.gitcommits.job.{FetchProjects, Job}
-import com.gerritforge.analytics.gitcommits.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
+import com.gerritforge.analytics.gitcommits.model.{
+ GerritEndpointConfig,
+ GerritProject,
+ GerritProjectsSupport
+}
import com.google.gerrit.server.project.ProjectControl
import com.google.gerrit.sshd.{CommandMetaData, SshCommand}
import com.google.inject.Inject
@@ -15,11 +19,14 @@
import scala.util.{Failure, Success}
-@CommandMetaData(name = "processGitCommits",
- description = "Start the extraction of Git Commits Gerrit analytics")
-class ProcessGitCommitsCommand @Inject()(implicit val gerritProjects: GerritProjectsSupport,
- val gerritConfig: GerritConfigSupport)
- extends SshCommand
+@CommandMetaData(
+ name = "processGitCommits",
+ description = "Start the extraction of Git Commits Gerrit analytics"
+)
+class ProcessGitCommitsCommand @Inject()(
+ implicit val gerritProjects: GerritProjectsSupport,
+ val gerritConfig: GerritConfigSupport
+) extends SshCommand
with Job
with DateConversions
with FetchProjects
@@ -37,37 +44,46 @@
@ArgOption(name = "--until", aliases = Array("-u"), usage = "end date")
var endDate: Timestamp = NO_TIMESTAMP
- @ArgOption(name = "--aggregate",
- aliases = Array("-g"),
- usage = "aggregate email/email_hour/email_day/email_month/email_year")
+ @ArgOption(
+ name = "--aggregate",
+ aliases = Array("-g"),
+ usage = "aggregate email/email_hour/email_day/email_month/email_year"
+ )
var aggregate: String = "email_day"
- @ArgOption(name = "--email-aliases",
- aliases = Array("-a"),
- usage = "\"emails to author alias\" input data path")
+ @ArgOption(
+ name = "--email-aliases",
+ aliases = Array("-a"),
+ usage = "\"emails to author alias\" input data path"
+ )
var emailAlias: String = null
- @ArgOption(name = "--ignore-ssl-cert",
+ @ArgOption(
+ name = "--ignore-ssl-cert",
aliases = Array("-k"),
- usage = "Ignore SSL certificate validation")
+ usage = "Ignore SSL certificate validation"
+ )
var ignoreSSLCert: Boolean = false
- @ArgOption(name = "--extract-branches",
- aliases = Array("-r"),
- usage = "enables branches extraction for each commit")
+ @ArgOption(
+ name = "--extract-branches",
+ aliases = Array("-r"),
+ usage = "enables branches extraction for each commit"
+ )
var extractBranches: Boolean = false
override def run() {
- implicit val config = GerritEndpointConfig(gerritConfig.getListenUrl(),
- prefix =
- Option(projectControl).map(_.getProject.getName),
- "",
- elasticIndex,
- beginDate,
- endDate,
- aggregate,
- emailAlias,
- ignoreSSLCert=Some(ignoreSSLCert))
+ implicit val config = GerritEndpointConfig(
+ gerritConfig.getListenUrl(),
+ prefix = Option(projectControl).map(_.getProject.getName),
+ "",
+ elasticIndex,
+ beginDate,
+ endDate,
+ aggregate,
+ emailAlias,
+ ignoreSSLCert = Some(ignoreSSLCert)
+ )
implicit val spark: SparkSession = SparkSession
.builder()
@@ -90,7 +106,8 @@
config.elasticIndex.foreach { esIndex =>
stdout.println(
- s"$numRows rows extracted. Posting Elasticsearch at '${config.elasticIndex}/$indexType'")
+ s"$numRows rows extracted. Posting Elasticsearch at '${config.elasticIndex}/$indexType'"
+ )
stdout.flush()
import com.gerritforge.analytics.infrastructure.ESSparkWriterImplicits.withAliasSwap
import scala.concurrent.ExecutionContext.Implicits.global
@@ -114,15 +131,17 @@
}
def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
- config.prefix.toSeq.flatMap(projectName =>
- gerritProjects.getProject(projectName) match {
- case Success(project) =>
- Seq(project)
- case Failure(e) => {
- logger.warn(s"Unable to fetch project $projectName", e)
- Seq()
+ config.prefix.toSeq.flatMap(
+ projectName =>
+ gerritProjects.getProject(projectName) match {
+ case Success(project) =>
+ Seq(project)
+ case Failure(e) => {
+ logger.warn(s"Unable to fetch project $projectName", e)
+ Seq()
+ }
}
- })
+ )
}
}
diff --git a/gitcommits/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/gitcommits/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 72e0836..3947759 100644
--- a/gitcommits/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/gitcommits/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -19,7 +19,11 @@
import com.gerritforge.analytics.common.api.GerritConnectivity
import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations._
-import com.gerritforge.analytics.gitcommits.model.{GerritProject, GerritProjectsSupport, ProjectContributionSource}
+import com.gerritforge.analytics.gitcommits.model.{
+ GerritProject,
+ GerritProjectsSupport,
+ ProjectContributionSource
+}
import org.apache.spark.sql.Row
import org.json4s.JsonDSL._
import org.json4s._
@@ -29,12 +33,16 @@
import scala.collection.mutable
import scala.io.{Codec, Source}
-class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers with SparkTestSupport with Inside {
+class GerritAnalyticsTransformationsSpec
+ extends FlatSpec
+ with Matchers
+ with SparkTestSupport
+ with Inside {
"GerritProjects" should "parse JSON into a GerritProject objects" in {
- val projects = GerritProjectsSupport.parseJsonProjectListResponse(Source.fromString(
- """)]}'
+ val projects = GerritProjectsSupport.parseJsonProjectListResponse(
+ Source.fromString(""")]}'
|{
| "All-Projects-name": {
| "id": "All-Projects-id",
@@ -43,9 +51,13 @@
| "id": "Test-id",
| }
|}
- |""".stripMargin))
+ |""".stripMargin)
+ )
- projects should contain only(GerritProject("All-Projects-id", "All-Projects-name"), GerritProject("Test-id", "Test-name"))
+ projects should contain only (GerritProject("All-Projects-id", "All-Projects-name"), GerritProject(
+ "Test-id",
+ "Test-name"
+ ))
}
"enrichWithSource" should "enrich project RDD object with its source" in {
@@ -75,36 +87,39 @@
val expectedResult = List("LineOne", "LineTwo", "LineThree")
val inputStream = new ByteArrayInputStream(contentWithEmptyLines.getBytes)
- val contentWithoutEmptyLines = filterEmptyStrings(Source.fromInputStream(inputStream, Codec.UTF8.name))
+ val contentWithoutEmptyLines =
+ filterEmptyStrings(Source.fromInputStream(inputStream, Codec.UTF8.name))
contentWithoutEmptyLines.toList should contain only (expectedResult: _*)
}
"fetchRawContributors" should "fetch file content from the initial list of project names and file names" in {
- val line1 = "foo" -> "bar"
- val line2 = "foo1" -> "bar1"
- val line3 = "foo2" -> "bar2"
+ val line1 = "foo" -> "bar"
+ val line2 = "foo1" -> "bar1"
+ val line3 = "foo2" -> "bar2"
val line3b = "foo3" -> "bar3"
val projectSource1 = ProjectContributionSource("p1", newSource(line1, line2, line3))
val projectSource2 = ProjectContributionSource("p2", newSource(line3b))
- val rawContributors = sc.parallelize(Seq(projectSource1, projectSource2))
+ val rawContributors = sc
+ .parallelize(Seq(projectSource1, projectSource2))
.fetchRawContributors(new GerritConnectivity(None, None))(spark)
.collect
rawContributors should have size (4)
- rawContributors should contain allOf(
- ("p1","""{"foo":"bar"}"""),
- ("p1","""{"foo1":"bar1"}"""),
+ rawContributors should contain allOf (
+ ("p1", """{"foo":"bar"}"""),
+ ("p1", """{"foo1":"bar1"}"""),
("p1", """{"foo2":"bar2"}"""),
("p2", """{"foo3":"bar3"}""")
)
}
it should "fetch file content from the initial list of project names and file names with non-latin chars" in {
- val rawContributors = sc.parallelize(Seq(ProjectContributionSource("p1", newSource("foo2" -> "bar2\u0100"))))
+ val rawContributors = sc
+ .parallelize(Seq(ProjectContributionSource("p1", newSource("foo2" -> "bar2\u0100"))))
.fetchRawContributors(new GerritConnectivity(None, None))
.collect
@@ -115,57 +130,140 @@
"transformCommitterInfo" should "transform a DataFrame with project and json to a workable DF with separated columns" in {
import sql.implicits._
- val rdd = sc.parallelize(Seq(
- ("p1","""{"name":"a","email":"a@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":1, "num_files": 2, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":0, "is_merge": false, "is_bot_like": false, "commits":[{ "sha1": "e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":false, "files": ["file1.txt", "file2.txt"]}], "branches": ["master", "stable-2.14"]}"""),
- ("p2","""{"name":"b","email":"b@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":428, "num_files": 2, "num_distinct_files": 3, "added_lines":1, "deleted_lines":1, "last_commit_date":1500000000000, "is_merge": true, "is_bot_like":true, "commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true, "files": ["file3.txt", "file4.txt"] },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1500000000000,"merge":true, "files": ["file1.txt", "file4.txt"]}]}, "branches":[]"""),
- // last commit is missing hour,day,month,year to check optionality
- ("p3","""{"name":"c","email":"c@mail.com","num_commits":12,"num_files": 4, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":1600000000000,"is_merge": true, "is_bot_like":false,"commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true, "files": ["file1.txt", "file2.txt"] },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1600000000000,"merge":true, "files": ["file1.txt", "file2.txt"]}]}, "branches":[]""")
- ))
+ val rdd = sc.parallelize(
+ Seq(
+ (
+ "p1",
+ """{"name":"a","email":"a@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":1, "num_files": 2, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":0, "is_merge": false, "is_bot_like": false, "commits":[{ "sha1": "e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":false, "files": ["file1.txt", "file2.txt"]}], "branches": ["master", "stable-2.14"]}"""
+ ),
+ (
+ "p2",
+ """{"name":"b","email":"b@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":428, "num_files": 2, "num_distinct_files": 3, "added_lines":1, "deleted_lines":1, "last_commit_date":1500000000000, "is_merge": true, "is_bot_like":true, "commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true, "files": ["file3.txt", "file4.txt"] },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1500000000000,"merge":true, "files": ["file1.txt", "file4.txt"]}]}, "branches":[]"""
+ ),
+ // last commit is missing hour,day,month,year to check optionality
+ (
+ "p3",
+ """{"name":"c","email":"c@mail.com","num_commits":12,"num_files": 4, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":1600000000000,"is_merge": true, "is_bot_like":false,"commits":[{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":0,"merge":true, "files": ["file1.txt", "file2.txt"] },{"sha1":"e063a806c33bd524e89a87732bd3f1ad9a77a41e", "date":1600000000000,"merge":true, "files": ["file1.txt", "file2.txt"]}]}, "branches":[]"""
+ )
+ )
+ )
- val df = rdd.toDF("project", "json")
- .transformCommitterInfo
+ val df = rdd.toDF("project", "json").transformCommitterInfo
df.count should be(3)
val collected: Array[Row] = df.collect
- df.schema.fields.map(_.name) should contain inOrder(
- "project", "author", "email",
- "year", "month", "day", "hour",
- "num_files", "num_distinct_files", "added_lines", "deleted_lines",
- "num_commits", "last_commit_date",
- "is_merge", "commits", "branches", "is_bot_like")
+ df.schema.fields.map(_.name) should contain inOrder ("project", "author", "email",
+ "year", "month", "day", "hour",
+ "num_files", "num_distinct_files", "added_lines", "deleted_lines",
+ "num_commits", "last_commit_date",
+ "is_merge", "commits", "branches", "is_bot_like")
- collected should contain allOf(
- Row("p1", "a", "a@mail.com", 2017, 9, 11, 23, 2, 2, 1, 1, 1, 0, false,
- new mutable.WrappedArray.ofRef(Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, false))), new mutable.WrappedArray.ofRef(Array("master", "stable-2.14")), false),
- Row("p2", "b", "b@mail.com", 2017, 9, 11, 23, 2, 3, 1, 1, 428, 1500000000000L, true,
- new mutable.WrappedArray.ofRef[Row](Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, true), Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 1500000000000L, true))), null, true),
- Row("p3", "c", "c@mail.com", null, null, null, null, 4, 2, 1, 1, 12, 1600000000000L, true,
- new mutable.WrappedArray.ofRef[Row](Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0l, true), Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 1600000000000L, true))), null, false)
+ collected should contain allOf (
+ Row(
+ "p1",
+ "a",
+ "a@mail.com",
+ 2017,
+ 9,
+ 11,
+ 23,
+ 2,
+ 2,
+ 1,
+ 1,
+ 1,
+ 0,
+ false,
+ new mutable.WrappedArray.ofRef(
+ Array(Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0L, false))
+ ),
+ new mutable.WrappedArray.ofRef(Array("master", "stable-2.14")),
+ false
+ ),
+ Row(
+ "p2",
+ "b",
+ "b@mail.com",
+ 2017,
+ 9,
+ 11,
+ 23,
+ 2,
+ 3,
+ 1,
+ 1,
+ 428,
+ 1500000000000L,
+ true,
+ new mutable.WrappedArray.ofRef[Row](
+ Array(
+ Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0L, true),
+ Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 1500000000000L, true)
+ )
+ ),
+ null,
+ true
+ ),
+ Row(
+ "p3",
+ "c",
+ "c@mail.com",
+ null,
+ null,
+ null,
+ null,
+ 4,
+ 2,
+ 1,
+ 1,
+ 12,
+ 1600000000000L,
+ true,
+ new mutable.WrappedArray.ofRef[Row](
+ Array(
+ Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 0L, true),
+ Row("e063a806c33bd524e89a87732bd3f1ad9a77a41e", 1600000000000L, true)
+ )
+ ),
+ null,
+ false
+ )
)
}
"handleAliases" should "enrich the data with author from the alias DF" in {
import spark.implicits._
- val aliasDF = sc.parallelize(Seq(
- ("aliased_author", "aliased_email@aliased_author.com", "")
- )).toDF("author", "email", "organization")
+ val aliasDF = sc
+ .parallelize(
+ Seq(
+ ("aliased_author", "aliased_email@aliased_author.com", "")
+ )
+ )
+ .toDF("author", "email", "organization")
- val inputSampleDF = sc.parallelize(Seq(
- ("author_from_name_a", "non_aliased_email@a_mail.com", "a_mail.com"),
- ("author_from_name_b", "aliased_email@aliased_author.com", "aliased_author.com")
- )).toDF("author", "email", "organization")
+ val inputSampleDF = sc
+ .parallelize(
+ Seq(
+ ("author_from_name_a", "non_aliased_email@a_mail.com", "a_mail.com"),
+ ("author_from_name_b", "aliased_email@aliased_author.com", "aliased_author.com")
+ )
+ )
+ .toDF("author", "email", "organization")
- val expectedDF = sc.parallelize(Seq(
- ("author_from_name_a", "non_aliased_email@a_mail.com", "a_mail.com"),
- ("aliased_author", "aliased_email@aliased_author.com", "aliased_author.com")
- )).toDF("author", "email", "organization")
+ val expectedDF = sc
+ .parallelize(
+ Seq(
+ ("author_from_name_a", "non_aliased_email@a_mail.com", "a_mail.com"),
+ ("aliased_author", "aliased_email@aliased_author.com", "aliased_author.com")
+ )
+ )
+ .toDF("author", "email", "organization")
val df = inputSampleDF.handleAliases(Some(aliasDF))
- df.schema.fields.map(_.name) should contain allOf(
- "author", "email", "organization")
+ df.schema.fields.map(_.name) should contain allOf ("author", "email", "organization")
df.collect should contain theSameElementsAs expectedDF.collect
}
@@ -173,23 +271,55 @@
it should "enrich the data with organization from the alias DF when available" in {
import spark.implicits._
- val aliasDF = sc.parallelize(Seq(
- ("aliased_author_with_organization", "aliased_email@aliased_organization.com", "aliased_organization"),
- ("aliased_author_empty_organization", "aliased_email@emtpy_organization.com", ""),
- ("aliased_author_null_organization", "aliased_email@null_organization.com", null)
- )).toDF("author", "email", "organization")
+ val aliasDF = sc
+ .parallelize(
+ Seq(
+ (
+ "aliased_author_with_organization",
+ "aliased_email@aliased_organization.com",
+ "aliased_organization"
+ ),
+ ("aliased_author_empty_organization", "aliased_email@emtpy_organization.com", ""),
+ ("aliased_author_null_organization", "aliased_email@null_organization.com", null)
+ )
+ )
+ .toDF("author", "email", "organization")
- val inputSampleDF = sc.parallelize(Seq(
- ("author_from_name_a", "aliased_email@aliased_organization.com", "aliased_organization.com"),
- ("author_from_name_b", "aliased_email@emtpy_organization.com", "emtpy_organization.com"),
- ("author_from_name_c", "aliased_email@null_organization.com", "null_organization.com")
- )).toDF("author", "email", "organization")
+ val inputSampleDF = sc
+ .parallelize(
+ Seq(
+ (
+ "author_from_name_a",
+ "aliased_email@aliased_organization.com",
+ "aliased_organization.com"
+ ),
+ ("author_from_name_b", "aliased_email@emtpy_organization.com", "emtpy_organization.com"),
+ ("author_from_name_c", "aliased_email@null_organization.com", "null_organization.com")
+ )
+ )
+ .toDF("author", "email", "organization")
- val expectedDF = sc.parallelize(Seq(
- ("aliased_author_with_organization", "aliased_email@aliased_organization.com", "aliased_organization"),
- ("aliased_author_empty_organization", "aliased_email@emtpy_organization.com", "emtpy_organization.com"),
- ("aliased_author_null_organization", "aliased_email@null_organization.com", "null_organization.com")
- )).toDF("author", "email", "organization")
+ val expectedDF = sc
+ .parallelize(
+ Seq(
+ (
+ "aliased_author_with_organization",
+ "aliased_email@aliased_organization.com",
+ "aliased_organization"
+ ),
+ (
+ "aliased_author_empty_organization",
+ "aliased_email@emtpy_organization.com",
+ "emtpy_organization.com"
+ ),
+ (
+ "aliased_author_null_organization",
+ "aliased_email@null_organization.com",
+ "null_organization.com"
+ )
+ )
+ )
+ .toDF("author", "email", "organization")
val df = inputSampleDF.handleAliases(Some(aliasDF))
@@ -200,13 +330,21 @@
it should "return correct columns when alias DF is defined" in {
import spark.implicits._
- val inputSampleDF = sc.parallelize(Seq(
- ("author_name", "email@mail.com", "an_organization")
- )).toDF("author", "email", "organization")
+ val inputSampleDF = sc
+ .parallelize(
+ Seq(
+ ("author_name", "email@mail.com", "an_organization")
+ )
+ )
+ .toDF("author", "email", "organization")
- val aliasDF = sc.parallelize(Seq(
- ("a_random_author", "a_random_email@mail.com", "a_random_organization")
- )).toDF("author", "email", "organization")
+ val aliasDF = sc
+ .parallelize(
+ Seq(
+ ("a_random_author", "a_random_email@mail.com", "a_random_organization")
+ )
+ )
+ .toDF("author", "email", "organization")
val df = inputSampleDF.handleAliases(Some(aliasDF))
@@ -227,19 +365,31 @@
it should "lowercase aliased organizations" in {
import spark.implicits._
- val inputSampleDF = sc.parallelize(Seq(
- ("author_name", "email@mail.com", "an_organization")
- )).toDF("author", "email", "organization")
+ val inputSampleDF = sc
+ .parallelize(
+ Seq(
+ ("author_name", "email@mail.com", "an_organization")
+ )
+ )
+ .toDF("author", "email", "organization")
- val aliasDF = sc.parallelize(Seq(
- ("author_name", "email@mail.com", "OrGaNiZaTiOnToBeLoWeRcAsEd")
- )).toDF("author", "email", "organization")
+ val aliasDF = sc
+ .parallelize(
+ Seq(
+ ("author_name", "email@mail.com", "OrGaNiZaTiOnToBeLoWeRcAsEd")
+ )
+ )
+ .toDF("author", "email", "organization")
val df = inputSampleDF.handleAliases(Some(aliasDF))
- val expectedDF = sc.parallelize(Seq(
- ("author_name", "email@mail.com", "organizationtobelowercased")
- )).toDF("author", "email", "organization")
+ val expectedDF = sc
+ .parallelize(
+ Seq(
+ ("author_name", "email@mail.com", "organizationtobelowercased")
+ )
+ )
+ .toDF("author", "email", "organization")
df.collect should contain theSameElementsAs expectedDF.collect
}
@@ -247,24 +397,28 @@
"addOrganization" should "compute organization column from the email" in {
import sql.implicits._
- val df = sc.parallelize(Seq(
- "",
- "@", // corner case
- "not an email",
- "email@domain-simple",
- "email@domain-com.com",
- "email@domain-couk.co.uk",
- "email@domain-info.info",
- "email@mail.companyname-couk.co.uk",
- "email@mail.companyname-com.com",
- "email@mail.companyname-info.info"
- )).toDF("email")
+ val df = sc
+ .parallelize(
+ Seq(
+ "",
+ "@", // corner case
+ "not an email",
+ "email@domain-simple",
+ "email@domain-com.com",
+ "email@domain-couk.co.uk",
+ "email@domain-info.info",
+ "email@mail.companyname-couk.co.uk",
+ "email@mail.companyname-com.com",
+ "email@mail.companyname-info.info"
+ )
+ )
+ .toDF("email")
val transformed = df.addOrganization()
- transformed.schema.fields.map(_.name) should contain allOf("email", "organization")
+ transformed.schema.fields.map(_.name) should contain allOf ("email", "organization")
- transformed.collect should contain allOf(
+ transformed.collect should contain allOf (
Row("", ""),
Row("@", ""),
Row("not an email", ""),
@@ -281,15 +435,28 @@
"extractCommitsPerProject" should "generate a Dataset with the all the SHA of commits with associated project" in {
import sql.implicits._
- val committerInfo = sc.parallelize(Seq(
- ("p1","""{"name":"a","email":"a@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":1, "num_files": 2, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":0, "is_merge": false, "commits":[{ "sha1": "sha_1", "date":0,"merge":false, "files": ["file1.txt", "file2.txt"]}] }"""),
- ("p2","""{"name":"b","email":"b@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":428, "num_files": 2, "num_distinct_files": 3, "added_lines":1, "deleted_lines":1, "last_commit_date":1500000000000, "is_merge": true, "commits":[{"sha1":"sha_2", "date":0,"merge":true, "files": ["file3.txt", "file4.txt"] },{"sha1":"sha_3", "date":1500000000000,"merge":true, "files": ["file1.txt", "file4.txt"]}]}"""),
- // last commit is missing hour,day,month,year to check optionality
- ("p3","""{"name":"c","email":"c@mail.com","num_commits":12,"num_files": 4, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":1600000000000,"is_merge": true,"commits":[{"sha1":"sha_4", "date":0,"merge":true, "files": ["file1.txt", "file2.txt"] },{"sha1":"sha_5", "date":1600000000000,"merge":true, "files": ["file1.txt", "file2.txt"]}]}""")
- )).toDF("project", "json")
+ val committerInfo = sc
+ .parallelize(
+ Seq(
+ (
+ "p1",
+ """{"name":"a","email":"a@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":1, "num_files": 2, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":0, "is_merge": false, "commits":[{ "sha1": "sha_1", "date":0,"merge":false, "files": ["file1.txt", "file2.txt"]}] }"""
+ ),
+ (
+ "p2",
+ """{"name":"b","email":"b@mail.com","year":2017,"month":9, "day":11, "hour":23, "num_commits":428, "num_files": 2, "num_distinct_files": 3, "added_lines":1, "deleted_lines":1, "last_commit_date":1500000000000, "is_merge": true, "commits":[{"sha1":"sha_2", "date":0,"merge":true, "files": ["file3.txt", "file4.txt"] },{"sha1":"sha_3", "date":1500000000000,"merge":true, "files": ["file1.txt", "file4.txt"]}]}"""
+ ),
+ // last commit is missing hour,day,month,year to check optionality
+ (
+ "p3",
+ """{"name":"c","email":"c@mail.com","num_commits":12,"num_files": 4, "num_distinct_files": 2, "added_lines":1, "deleted_lines":1, "last_commit_date":1600000000000,"is_merge": true,"commits":[{"sha1":"sha_4", "date":0,"merge":true, "files": ["file1.txt", "file2.txt"] },{"sha1":"sha_5", "date":1600000000000,"merge":true, "files": ["file1.txt", "file2.txt"]}]}"""
+ )
+ )
+ )
+ .toDF("project", "json")
.transformCommitterInfo
- committerInfo.commitSet.collect() should contain only(
+ committerInfo.commitSet.collect() should contain only (
"sha_1",
"sha_2",
"sha_3",
@@ -300,7 +467,10 @@
}
private def newSource(contributorsJson: JObject*): Option[String] = {
- val tmpFile = File.createTempFile(System.getProperty("java.io.tmpdir"),s"${getClass.getName}-${System.nanoTime()}")
+ val tmpFile = File.createTempFile(
+ System.getProperty("java.io.tmpdir"),
+ s"${getClass.getName}-${System.nanoTime()}"
+ )
val out = new OutputStreamWriter(new FileOutputStream(tmpFile), StandardCharsets.UTF_8)
contributorsJson.foreach(json => out.write(compact(render(json)) + '\n'))
diff --git a/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/model/EmailSpec.scala b/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/model/EmailSpec.scala
index fe65ec5..45911a5 100644
--- a/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/model/EmailSpec.scala
+++ b/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/model/EmailSpec.scala
@@ -78,7 +78,7 @@
it should "not match an invalid mail format" in {
"invalid email" match {
case Email(_, _) => fail("Invalid emails should be rejected")
- case _ =>
+ case _ =>
}
}
}
diff --git a/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfigTest.scala b/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfigTest.scala
index a839430..6830a6e 100644
--- a/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfigTest.scala
+++ b/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfigTest.scala
@@ -20,7 +20,7 @@
"gerritProjectsUrl" should "contain prefix when available" in {
val prefix = "prefixMustBeThere"
- val conf = GerritEndpointConfig(baseUrl = Some("testBaseUrl"), prefix = Some(prefix))
+ val conf = GerritEndpointConfig(baseUrl = Some("testBaseUrl"), prefix = Some(prefix))
conf.gerritProjectsUrl should contain(s"testBaseUrl/projects/?p=$prefix")
}
diff --git a/project/SharedSettings.scala b/project/SharedSettings.scala
index bfc3b4d..c3426ba 100644
--- a/project/SharedSettings.scala
+++ b/project/SharedSettings.scala
@@ -22,8 +22,8 @@
object SharedSettings {
val elastic4s = Seq(
- "com.sksamuel.elastic4s" %% "elastic4s-core" % Elastic4sVersion,
- "com.sksamuel.elastic4s" %% "elastic4s-http" % Elastic4sVersion
+ "com.sksamuel.elastic4s" %% "elastic4s-core" % Elastic4sVersion,
+ "com.sksamuel.elastic4s" %% "elastic4s-http" % Elastic4sVersion
)
private val dockerRepositoryPrefix = "gerrit-analytics-etl"
@@ -35,16 +35,18 @@
fork in Test := true,
git.useGitDescribe := true,
libraryDependencies ++= Seq(
- "org.apache.spark" %% "spark-core" % sparkVersion % "provided" exclude("org.spark-project.spark", "unused"),
- "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
- "org.elasticsearch" %% "elasticsearch-spark-20" % esSpark excludeAll ExclusionRule(organization = "org.apache.spark"),
- "org.json4s" %% "json4s-native" % json4s,
- "com.google.gerrit" % "gerrit-plugin-api" % gerritApiVersion % Provided withSources(),
- "com.typesafe.scala-logging" %% "scala-logging" % scalaLogging,
- "com.github.scopt" %% "scopt" % scopt,
- "org.scalactic" %% "scalactic" % scalactic % "test",
- "org.scalatest" %% "scalatest" % scalaTest % "test",
- "com.dimafeng" %% "testcontainers-scala" % TestContainersScala % Test
+ "org.apache.spark" %% "spark-core" % sparkVersion % "provided" exclude ("org.spark-project.spark", "unused"),
+ "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
+ "org.elasticsearch" %% "elasticsearch-spark-20" % esSpark excludeAll ExclusionRule(
+ organization = "org.apache.spark"
+ ),
+ "org.json4s" %% "json4s-native" % json4s,
+ "com.google.gerrit" % "gerrit-plugin-api" % gerritApiVersion % Provided withSources (),
+ "com.typesafe.scala-logging" %% "scala-logging" % scalaLogging,
+ "com.github.scopt" %% "scopt" % scopt,
+ "org.scalactic" %% "scalactic" % scalactic % "test",
+ "org.scalatest" %% "scalatest" % scalaTest % "test",
+ "com.dimafeng" %% "testcontainers-scala" % TestContainersScala % Test
) ++ elastic4s
)
@@ -52,8 +54,8 @@
val repositoryName = Seq(dockerRepositoryPrefix, projectName).mkString("-")
Seq(
name := s"analytics-etl-$projectName",
- mainClass in (Compile,run) := Some(s"com.gerritforge.analytics.$projectName.job.Main"),
- packageOptions in(Compile, packageBin) += Package.ManifestAttributes(
+ mainClass in (Compile, run) := Some(s"com.gerritforge.analytics.$projectName.job.Main"),
+ packageOptions in (Compile, packageBin) += Package.ManifestAttributes(
("Gerrit-ApiType", "plugin"),
("Gerrit-PluginName", s"analytics-etl-$projectName"),
("Gerrit-Module", s"com.gerritforge.analytics.$projectName.plugin.Module"),
@@ -79,17 +81,23 @@
)
}
- def baseDockerfile(projectName: String, artifact: File, artifactTargetPath: String): Dockerfile = {
+ def baseDockerfile(
+ projectName: String,
+ artifact: File,
+ artifactTargetPath: String
+ ): Dockerfile = {
new Dockerfile {
from("openjdk:8-alpine")
label("maintainer" -> "GerritForge <info@gerritforge.com>")
runRaw("apk --update add curl tar bash && rm -rf /var/lib/apt/lists/* && rm /var/cache/apk/*")
env("SPARK_VERSION", sparkVersion)
env("SPARK_HOME", "/usr/local/spark-$SPARK_VERSION-bin-hadoop2.7")
- env("PATH","$PATH:$SPARK_HOME/bin")
+ env("PATH", "$PATH:$SPARK_HOME/bin")
env("SPARK_JAR_PATH", artifactTargetPath)
- env("SPARK_JAR_CLASS",s"com.gerritforge.analytics.$projectName.job.Main")
- runRaw("curl -sL \"http://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop2.7.tgz\" | tar -xz -C /usr/local")
+ env("SPARK_JAR_CLASS", s"com.gerritforge.analytics.$projectName.job.Main")
+ runRaw(
+ "curl -sL \"http://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop2.7.tgz\" | tar -xz -C /usr/local"
+ )
add(artifact, artifactTargetPath)
runRaw(s"chmod +x $artifactTargetPath")
}
@@ -97,14 +105,14 @@
}
object Versions {
- val Elastic4sVersion = "6.5.1"
- val sparkVersion = "2.3.3"
- val gerritApiVersion = "2.13.7"
- val esSpark = "6.2.0"
- val scalaLogging = "3.7.2"
- val scopt = "3.6.0"
- val scalactic = "3.0.1"
- val scalaTest = "3.0.1"
- val json4s = "3.2.11"
+ val Elastic4sVersion = "6.5.1"
+ val sparkVersion = "2.3.3"
+ val gerritApiVersion = "2.13.7"
+ val esSpark = "6.2.0"
+ val scalaLogging = "3.7.2"
+ val scopt = "3.6.0"
+ val scalactic = "3.0.1"
+ val scalaTest = "3.0.1"
+ val json4s = "3.2.11"
val TestContainersScala = "0.23.0"
}
diff --git a/project/docker.sbt b/project/docker.sbt
index b8343b2..1f19e78 100644
--- a/project/docker.sbt
+++ b/project/docker.sbt
@@ -1 +1 @@
-addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.5.0")
\ No newline at end of file
+addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.5.0")
diff --git a/project/git.sbt b/project/git.sbt
index e8972cf..f6ecec1 100644
--- a/project/git.sbt
+++ b/project/git.sbt
@@ -1 +1 @@
-addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.9.3")
\ No newline at end of file
+addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.9.3")