Extract sub-command from AuditLog events
This will allow a better normalization of the data and a better
understanding of Gerrit usage.
Feature: Issue 10227
Change-Id: I997dd93c49cfcafc0a08f0a0d7cb8bc4d459175e
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala
index 2fd3331..8b6667c 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala
@@ -23,6 +23,7 @@
access_path: Option[String],
command: String,
command_arguments: String,
+ sub_command: Option[String],
project: Option[String],
result: String,
num_events: Long
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
index 59114c2..9b05ce9 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
@@ -18,6 +18,7 @@
val TIME_BUCKET_FIELD = "events_time_bucket"
val COMMAND_FIELD = "command"
val COMMAND_ARGS_FIELD = "command_arguments"
+ val SUB_COMMAND_FIELD = "sub_command"
val USER_IDENTIFIER_FIELD = "user_identifier"
val AUDIT_TYPE_FIELD = "audit_type"
val ACCESS_PATH_FIELD = "access_path"
@@ -33,6 +34,7 @@
ACCESS_PATH_FIELD,
COMMAND_FIELD,
COMMAND_ARGS_FIELD,
+ SUB_COMMAND_FIELD,
PROJECT_FIELD,
RESULT_FIELD
)
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
index 9710149..ab1c8a4 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
@@ -42,6 +42,7 @@
.hydrateWithUserIdentifierColumn(USER_IDENTIFIER_FIELD, broadcastUserIdentifiers.value)
.withTimeBucketColum(TIME_BUCKET_FIELD, timeAggregation)
.withCommandColumns(COMMAND_FIELD, COMMAND_ARGS_FIELD)
+ .withSubCommandColumns(SUB_COMMAND_FIELD)
.withUserTypeColumn(USER_TYPE_FIELD, broadcastAdditionalUsersInfo.value)
.withProjectColumn(PROJECT_FIELD, broadcastGerritProjects.value)
.aggregateNumEventsColumn(NUM_EVENTS_FIELD, FACETING_FIELDS)
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
index 927556a..cba1d42 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
@@ -15,7 +15,7 @@
package com.gerritforge.analytics.auditlog.spark.dataframe.ops
import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUsersInfo, GerritProjects, GerritUserIdentifiers}
-import com.gerritforge.analytics.auditlog.spark.sql.udf.SparkExtractors.{extractCommandArgumentsUDF, extractCommandUDF}
+import com.gerritforge.analytics.auditlog.spark.sql.udf.SparkExtractors.{extractCommandArgumentsUDF, extractCommandUDF, extractSubCommandUDF}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{udf, _}
import org.apache.spark.sql.{Column, DataFrame}
@@ -53,6 +53,15 @@
.withColumn(commandArgsCol, extractCommandArgumentsUDF(col("what"), col("access_path")))
}
+ def withSubCommandColumns(subCommandCol: String): DataFrame = {
+ dataFrame.withColumn(subCommandCol,
+ extractSubCommandUDF(
+ col("what"),
+ col("access_path")
+ )
+ )
+ }
+
def withProjectColumn(projectCol: String, gerritProjects: GerritProjects): DataFrame = {
def extractProjectUDF: UserDefinedFunction = udf((what: String, accessPath: String) => gerritProjects.extractProject(what, accessPath))
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala
index 6abe234..46cf077 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala
@@ -30,6 +30,12 @@
private val GIT_SSH_COMMAND_ARGUMENTS = capture(r = """git-(?:upload|receive)-pack\.(.+)""")
private val GIT_HTTP_COMMAND_ARGUMENTS = capture(r = """(^http.*)""")
+ // regular expressions to extract sub-commands
+ // Rest API sub-command example: what = /config/server/version -> sub-command: config
+ private val REST_API_SUB_COMMAND = capture("""^\/(.*?)(?:\/|\s|$)""")
+ // SSH sub-command example: what = gerrit.plugin.reload.analytics -> sub-command: plugin
+ private val SSH_SUB_COMMAND = capture("""^.*?\.(.*?)(?:\.|\s|$)""")
+
val FAILED_SSH_AUTH = "FAILED_SSH_AUTH"
def extractCommand(what: String, accessPath: String, httpMethod: String = null): String = accessPath match {
@@ -57,4 +63,16 @@
}
def extractCommandArgumentsUDF: UserDefinedFunction = udf((rawCommand: String, accessPath: String) => extractCommandArguments(rawCommand, accessPath))
+
+ def extractSubCommand(what: String, accessPath: String): Option[String] = accessPath match {
+ case "REST_API"|"UNKNOWN" => Some(extractOrElse(REST_API_SUB_COMMAND, what, what))
+ case "SSH_COMMAND" => Some(extractOrElse(SSH_SUB_COMMAND, what, what))
+ case "GIT" => None
+ case "JSON_RPC" => None
+ case unexpected =>
+ logger.warn(s"Unexpected access path '$unexpected' encountered when extracting command from '$what'")
+ None
+ }
+
+ def extractSubCommandUDF: UserDefinedFunction = udf((rawCommand: String, accessPath: String) => extractSubCommand(rawCommand, accessPath))
}
\ No newline at end of file
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
index 79df0da..0ee35e6 100644
--- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
@@ -41,6 +41,7 @@
GIT_UPLOAD_PACK,
anonymousHttpAuditEvent.what,
None,
+ None,
anonymousHttpAuditEvent.result,
num_events = 1
)
@@ -62,6 +63,7 @@
GIT_UPLOAD_PACK,
anonymousHttpAuditEvent.what,
None,
+ None,
anonymousHttpAuditEvent.result,
num_events = 1
)
@@ -86,6 +88,7 @@
GIT_UPLOAD_PACK,
authenticatedHttpAuditEvent.what,
None,
+ None,
authenticatedHttpAuditEvent.result,
num_events = 1
)
@@ -106,6 +109,7 @@
sshAuditEvent.accessPath,
SSH_GERRIT_COMMAND,
SSH_GERRIT_COMMAND_ARGUMENTS,
+ Some("query"),
None,
sshAuditEvent.result,
num_events = 1
@@ -127,6 +131,7 @@
sshAuditEvent.accessPath,
SSH_GERRIT_COMMAND,
SSH_GERRIT_COMMAND_ARGUMENTS,
+ Some("query"),
None,
sshAuditEvent.result,
num_events = 2
@@ -150,6 +155,7 @@
sshAuditEvent.accessPath,
SSH_GERRIT_COMMAND,
SSH_GERRIT_COMMAND_ARGUMENTS,
+ Some("query"),
None,
sshAuditEvent.result,
num_events = 1
@@ -162,6 +168,7 @@
sshAuditEvent.accessPath,
SSH_GERRIT_COMMAND,
SSH_GERRIT_COMMAND_ARGUMENTS,
+ Some("query"),
None,
sshAuditEvent.result,
num_events = 1
@@ -185,6 +192,7 @@
sshAuditEvent.accessPath,
SSH_GERRIT_COMMAND,
SSH_GERRIT_COMMAND_ARGUMENTS,
+ Some("query"),
None,
sshAuditEvent.result,
num_events = 1
@@ -198,6 +206,7 @@
GIT_UPLOAD_PACK,
authenticatedHttpAuditEvent.what,
None,
+ None,
authenticatedHttpAuditEvent.result,
num_events = 1
)
@@ -257,6 +266,15 @@
aggregatedEventsDS.collect.length shouldBe 1
aggregatedEventsDS.collect.head.project should contain(project)
}
+
+ it should "extract sub-command" in {
+ val events = Seq(sshAuditEvent.copy(what = "aCommand.aSubCommand"))
+
+ val aggregatedEventsDS = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
+
+ aggregatedEventsDS.collect.length shouldBe 1
+ aggregatedEventsDS.collect.head.sub_command should contain("aSubCommand")
+ }
}
object AuditLogsTransformerSpec {
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala
index cbf6762..4b5e298 100644
--- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala
@@ -186,4 +186,62 @@
SparkExtractors.extractCommandArguments(what, accessPath) shouldBe empty
}
+
+ behavior of "extractSubCommand"
+
+ it should "extract SSH gerrit sub-command" in {
+ val what = s"gerrit.stream-events.-s.patchset-created.-s.change-restored.-s.comment-added"
+ val accessPath = "SSH_COMMAND"
+
+ SparkExtractors.extractSubCommand(what, accessPath) shouldBe Some("stream-events")
+ }
+
+ it should "extract SSH replication sub-command" in {
+ val what = s"replication.start.GerritCodeReview/*"
+ val accessPath = "SSH_COMMAND"
+
+ SparkExtractors.extractSubCommand(what, accessPath) shouldBe Some("start")
+ }
+
+ it should "return no sub-commands for GIT commands - SSH" in {
+ val what = s"git-upload-pack./spdk/spdk.github.io"
+ val accessPath = "GIT"
+
+ SparkExtractors.extractSubCommand(what, accessPath) shouldBe None
+ }
+
+ it should "return no sub-commands for GIT commands - HTTP" in {
+ val what = "https://review.gerrithub.io/redhat-openstack/infrared.git/git-upload-pack"
+ val accessPath = "GIT"
+
+ SparkExtractors.extractSubCommand(what, accessPath) shouldBe None
+ }
+
+ it should "extract REST API sub-command" in {
+ val what = "/changes/ffilz%2Fnfs-ganesha~372229/comments"
+ val accessPath = "REST_API"
+
+ SparkExtractors.extractSubCommand(what, accessPath) shouldBe Some("changes")
+ }
+
+ it should "return no sub-commands failed AUTH" in {
+ val what = s"AUTH"
+ val accessPath = null
+
+ SparkExtractors.extractSubCommand(what, accessPath) shouldBe None
+ }
+
+ it should "return no sub-commands for JSON _RPC commands" in {
+ val what = s"some_command"
+ val accessPath = "JSON_RPC"
+
+ SparkExtractors.extractSubCommand(what, accessPath) shouldBe None
+ }
+
+ it should "return no sub-commands for an unexpected access path" in {
+ val what = s"any"
+ val accessPath = "unexpected"
+
+ SparkExtractors.extractSubCommand(what, accessPath) shouldBe None
+ }
}
\ No newline at end of file