Extract sub-command from AuditLog events This will allow a better normalization of the data and a better understanding of Gerrit usage. Feature: Issue 10227 Change-Id: I997dd93c49cfcafc0a08f0a0d7cb8bc4d459175e
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala index 2fd3331..8b6667c 100644 --- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala +++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala
@@ -23,6 +23,7 @@ access_path: Option[String], command: String, command_arguments: String, + sub_command: Option[String], project: Option[String], result: String, num_events: Long
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala index 59114c2..9b05ce9 100644 --- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala +++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
@@ -18,6 +18,7 @@ val TIME_BUCKET_FIELD = "events_time_bucket" val COMMAND_FIELD = "command" val COMMAND_ARGS_FIELD = "command_arguments" + val SUB_COMMAND_FIELD = "sub_command" val USER_IDENTIFIER_FIELD = "user_identifier" val AUDIT_TYPE_FIELD = "audit_type" val ACCESS_PATH_FIELD = "access_path" @@ -33,6 +34,7 @@ ACCESS_PATH_FIELD, COMMAND_FIELD, COMMAND_ARGS_FIELD, + SUB_COMMAND_FIELD, PROJECT_FIELD, RESULT_FIELD )
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala index 9710149..ab1c8a4 100644 --- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala +++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
@@ -42,6 +42,7 @@ .hydrateWithUserIdentifierColumn(USER_IDENTIFIER_FIELD, broadcastUserIdentifiers.value) .withTimeBucketColum(TIME_BUCKET_FIELD, timeAggregation) .withCommandColumns(COMMAND_FIELD, COMMAND_ARGS_FIELD) + .withSubCommandColumns(SUB_COMMAND_FIELD) .withUserTypeColumn(USER_TYPE_FIELD, broadcastAdditionalUsersInfo.value) .withProjectColumn(PROJECT_FIELD, broadcastGerritProjects.value) .aggregateNumEventsColumn(NUM_EVENTS_FIELD, FACETING_FIELDS)
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala index 927556a..cba1d42 100644 --- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala +++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
@@ -15,7 +15,7 @@ package com.gerritforge.analytics.auditlog.spark.dataframe.ops import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUsersInfo, GerritProjects, GerritUserIdentifiers} -import com.gerritforge.analytics.auditlog.spark.sql.udf.SparkExtractors.{extractCommandArgumentsUDF, extractCommandUDF} +import com.gerritforge.analytics.auditlog.spark.sql.udf.SparkExtractors.{extractCommandArgumentsUDF, extractCommandUDF, extractSubCommandUDF} import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions.{udf, _} import org.apache.spark.sql.{Column, DataFrame} @@ -53,6 +53,15 @@ .withColumn(commandArgsCol, extractCommandArgumentsUDF(col("what"), col("access_path"))) } + def withSubCommandColumns(subCommandCol: String): DataFrame = { + dataFrame.withColumn(subCommandCol, + extractSubCommandUDF( + col("what"), + col("access_path") + ) + ) + } + def withProjectColumn(projectCol: String, gerritProjects: GerritProjects): DataFrame = { def extractProjectUDF: UserDefinedFunction = udf((what: String, accessPath: String) => gerritProjects.extractProject(what, accessPath))
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala index 6abe234..46cf077 100644 --- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala +++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala
@@ -30,6 +30,12 @@ private val GIT_SSH_COMMAND_ARGUMENTS = capture(r = """git-(?:upload|receive)-pack\.(.+)""") private val GIT_HTTP_COMMAND_ARGUMENTS = capture(r = """(^http.*)""") + // regular expressions to extract sub-commands + // Rest API sub-command example: what = /config/server/version -> sub-command: config + private val REST_API_SUB_COMMAND = capture("""^\/(.*?)(?:\/|\s|$)""") + // SSH sub-command example: what = gerrit.plugin.reload.analytics -> sub-command: plugin + private val SSH_SUB_COMMAND = capture("""^.*?\.(.*?)(?:\.|\s|$)""") + val FAILED_SSH_AUTH = "FAILED_SSH_AUTH" def extractCommand(what: String, accessPath: String, httpMethod: String = null): String = accessPath match { @@ -57,4 +63,16 @@ } def extractCommandArgumentsUDF: UserDefinedFunction = udf((rawCommand: String, accessPath: String) => extractCommandArguments(rawCommand, accessPath)) + + def extractSubCommand(what: String, accessPath: String): Option[String] = accessPath match { + case "REST_API"|"UNKNOWN" => Some(extractOrElse(REST_API_SUB_COMMAND, what, what)) + case "SSH_COMMAND" => Some(extractOrElse(SSH_SUB_COMMAND, what, what)) + case "GIT" => None + case "JSON_RPC" => None + case unexpected => + logger.warn(s"Unexpected access path '$unexpected' encountered when extracting command from '$what'") + None + } + + def extractSubCommandUDF: UserDefinedFunction = udf((rawCommand: String, accessPath: String) => extractSubCommand(rawCommand, accessPath)) } \ No newline at end of file
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala index 79df0da..0ee35e6 100644 --- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala +++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
@@ -41,6 +41,7 @@ GIT_UPLOAD_PACK, anonymousHttpAuditEvent.what, None, + None, anonymousHttpAuditEvent.result, num_events = 1 ) @@ -62,6 +63,7 @@ GIT_UPLOAD_PACK, anonymousHttpAuditEvent.what, None, + None, anonymousHttpAuditEvent.result, num_events = 1 ) @@ -86,6 +88,7 @@ GIT_UPLOAD_PACK, authenticatedHttpAuditEvent.what, None, + None, authenticatedHttpAuditEvent.result, num_events = 1 ) @@ -106,6 +109,7 @@ sshAuditEvent.accessPath, SSH_GERRIT_COMMAND, SSH_GERRIT_COMMAND_ARGUMENTS, + Some("query"), None, sshAuditEvent.result, num_events = 1 @@ -127,6 +131,7 @@ sshAuditEvent.accessPath, SSH_GERRIT_COMMAND, SSH_GERRIT_COMMAND_ARGUMENTS, + Some("query"), None, sshAuditEvent.result, num_events = 2 @@ -150,6 +155,7 @@ sshAuditEvent.accessPath, SSH_GERRIT_COMMAND, SSH_GERRIT_COMMAND_ARGUMENTS, + Some("query"), None, sshAuditEvent.result, num_events = 1 @@ -162,6 +168,7 @@ sshAuditEvent.accessPath, SSH_GERRIT_COMMAND, SSH_GERRIT_COMMAND_ARGUMENTS, + Some("query"), None, sshAuditEvent.result, num_events = 1 @@ -185,6 +192,7 @@ sshAuditEvent.accessPath, SSH_GERRIT_COMMAND, SSH_GERRIT_COMMAND_ARGUMENTS, + Some("query"), None, sshAuditEvent.result, num_events = 1 @@ -198,6 +206,7 @@ GIT_UPLOAD_PACK, authenticatedHttpAuditEvent.what, None, + None, authenticatedHttpAuditEvent.result, num_events = 1 ) @@ -257,6 +266,15 @@ aggregatedEventsDS.collect.length shouldBe 1 aggregatedEventsDS.collect.head.project should contain(project) } + + it should "extract sub-command" in { + val events = Seq(sshAuditEvent.copy(what = "aCommand.aSubCommand")) + + val aggregatedEventsDS = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour") + + aggregatedEventsDS.collect.length shouldBe 1 + aggregatedEventsDS.collect.head.sub_command should contain("aSubCommand") + } } object AuditLogsTransformerSpec {
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala index cbf6762..4b5e298 100644 --- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala +++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala
@@ -186,4 +186,62 @@ SparkExtractors.extractCommandArguments(what, accessPath) shouldBe empty } + + behavior of "extractSubCommand" + + it should "extract SSH gerrit sub-command" in { + val what = s"gerrit.stream-events.-s.patchset-created.-s.change-restored.-s.comment-added" + val accessPath = "SSH_COMMAND" + + SparkExtractors.extractSubCommand(what, accessPath) shouldBe Some("stream-events") + } + + it should "extract SSH replication sub-command" in { + val what = s"replication.start.GerritCodeReview/*" + val accessPath = "SSH_COMMAND" + + SparkExtractors.extractSubCommand(what, accessPath) shouldBe Some("start") + } + + it should "return no sub-commands for GIT commands - SSH" in { + val what = s"git-upload-pack./spdk/spdk.github.io" + val accessPath = "GIT" + + SparkExtractors.extractSubCommand(what, accessPath) shouldBe None + } + + it should "return no sub-commands for GIT commands - HTTP" in { + val what = "https://review.gerrithub.io/redhat-openstack/infrared.git/git-upload-pack" + val accessPath = "GIT" + + SparkExtractors.extractSubCommand(what, accessPath) shouldBe None + } + + it should "extract REST API sub-command" in { + val what = "/changes/ffilz%2Fnfs-ganesha~372229/comments" + val accessPath = "REST_API" + + SparkExtractors.extractSubCommand(what, accessPath) shouldBe Some("changes") + } + + it should "return no sub-commands failed AUTH" in { + val what = s"AUTH" + val accessPath = null + + SparkExtractors.extractSubCommand(what, accessPath) shouldBe None + } + + it should "return no sub-commands for JSON _RPC commands" in { + val what = s"some_command" + val accessPath = "JSON_RPC" + + SparkExtractors.extractSubCommand(what, accessPath) shouldBe None + } + + it should "return no sub-commands for an unexpected access path" in { + val what = s"any" + val accessPath = "unexpected" + + SparkExtractors.extractSubCommand(what, accessPath) shouldBe None + } } \ No newline at end of file