Extract sub-command from AuditLog events

This will allow a better normalization of the data and a better
understanding of Gerrit usage.

Feature: Issue 10227
Change-Id: I997dd93c49cfcafc0a08f0a0d7cb8bc4d459175e
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala
index 2fd3331..8b6667c 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AggregatedAuditEvent.scala
@@ -23,6 +23,7 @@
   access_path: Option[String],
   command: String,
   command_arguments: String,
+  sub_command: Option[String],
   project: Option[String],
   result: String,
   num_events: Long
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
index 59114c2..9b05ce9 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
@@ -18,6 +18,7 @@
   val TIME_BUCKET_FIELD     = "events_time_bucket"
   val COMMAND_FIELD         = "command"
   val COMMAND_ARGS_FIELD    = "command_arguments"
+  val SUB_COMMAND_FIELD     = "sub_command"
   val USER_IDENTIFIER_FIELD = "user_identifier"
   val AUDIT_TYPE_FIELD      = "audit_type"
   val ACCESS_PATH_FIELD     = "access_path"
@@ -33,6 +34,7 @@
     ACCESS_PATH_FIELD,
     COMMAND_FIELD,
     COMMAND_ARGS_FIELD,
+    SUB_COMMAND_FIELD,
     PROJECT_FIELD,
     RESULT_FIELD
   )
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
index 9710149..ab1c8a4 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
@@ -42,6 +42,7 @@
       .hydrateWithUserIdentifierColumn(USER_IDENTIFIER_FIELD, broadcastUserIdentifiers.value)
       .withTimeBucketColum(TIME_BUCKET_FIELD, timeAggregation)
       .withCommandColumns(COMMAND_FIELD, COMMAND_ARGS_FIELD)
+      .withSubCommandColumns(SUB_COMMAND_FIELD)
       .withUserTypeColumn(USER_TYPE_FIELD, broadcastAdditionalUsersInfo.value)
       .withProjectColumn(PROJECT_FIELD, broadcastGerritProjects.value)
       .aggregateNumEventsColumn(NUM_EVENTS_FIELD, FACETING_FIELDS)
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
index 927556a..cba1d42 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
@@ -15,7 +15,7 @@
 package com.gerritforge.analytics.auditlog.spark.dataframe.ops
 
 import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUsersInfo, GerritProjects, GerritUserIdentifiers}
-import com.gerritforge.analytics.auditlog.spark.sql.udf.SparkExtractors.{extractCommandArgumentsUDF, extractCommandUDF}
+import com.gerritforge.analytics.auditlog.spark.sql.udf.SparkExtractors.{extractCommandArgumentsUDF, extractCommandUDF, extractSubCommandUDF}
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.functions.{udf, _}
 import org.apache.spark.sql.{Column, DataFrame}
@@ -53,6 +53,15 @@
         .withColumn(commandArgsCol, extractCommandArgumentsUDF(col("what"), col("access_path")))
     }
 
+    def withSubCommandColumns(subCommandCol: String): DataFrame = {
+      dataFrame.withColumn(subCommandCol,
+        extractSubCommandUDF(
+          col("what"),
+          col("access_path")
+        )
+      )
+    }
+
     def withProjectColumn(projectCol: String, gerritProjects: GerritProjects): DataFrame = {
       def extractProjectUDF: UserDefinedFunction = udf((what: String, accessPath: String) => gerritProjects.extractProject(what, accessPath))
 
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala
index 6abe234..46cf077 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala
@@ -30,6 +30,12 @@
   private val GIT_SSH_COMMAND_ARGUMENTS    = capture(r = """git-(?:upload|receive)-pack\.(.+)""")
   private val GIT_HTTP_COMMAND_ARGUMENTS   = capture(r = """(^http.*)""")
 
+  // regular expressions to extract sub-commands
+  // Rest API sub-command example: what = /config/server/version -> sub-command: config
+  private val REST_API_SUB_COMMAND = capture("""^\/(.*?)(?:\/|\s|$)""")
+  // SSH sub-command example: what = gerrit.plugin.reload.analytics -> sub-command: plugin
+  private val SSH_SUB_COMMAND = capture("""^.*?\.(.*?)(?:\.|\s|$)""")
+
   val FAILED_SSH_AUTH = "FAILED_SSH_AUTH"
 
   def extractCommand(what: String, accessPath: String, httpMethod: String = null): String = accessPath match {
@@ -57,4 +63,16 @@
   }
 
   def extractCommandArgumentsUDF: UserDefinedFunction = udf((rawCommand: String, accessPath: String) => extractCommandArguments(rawCommand, accessPath))
+
+  def extractSubCommand(what: String, accessPath: String): Option[String] = accessPath match {
+    case "REST_API"|"UNKNOWN"   => Some(extractOrElse(REST_API_SUB_COMMAND, what, what))
+    case "SSH_COMMAND"          => Some(extractOrElse(SSH_SUB_COMMAND, what, what))
+    case "GIT"                  => None
+    case "JSON_RPC"             => None
+    case unexpected             =>
+      logger.warn(s"Unexpected access path '$unexpected' encountered when extracting command from '$what'")
+      None
+  }
+
+  def extractSubCommandUDF: UserDefinedFunction = udf((rawCommand: String, accessPath: String) => extractSubCommand(rawCommand, accessPath))
 }
\ No newline at end of file
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
index 79df0da..0ee35e6 100644
--- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
@@ -41,6 +41,7 @@
       GIT_UPLOAD_PACK,
       anonymousHttpAuditEvent.what,
       None,
+      None,
       anonymousHttpAuditEvent.result,
       num_events = 1
     )
@@ -62,6 +63,7 @@
       GIT_UPLOAD_PACK,
       anonymousHttpAuditEvent.what,
       None,
+      None,
       anonymousHttpAuditEvent.result,
       num_events = 1
     )
@@ -86,6 +88,7 @@
       GIT_UPLOAD_PACK,
       authenticatedHttpAuditEvent.what,
       None,
+      None,
       authenticatedHttpAuditEvent.result,
       num_events = 1
     )
@@ -106,6 +109,7 @@
       sshAuditEvent.accessPath,
       SSH_GERRIT_COMMAND,
       SSH_GERRIT_COMMAND_ARGUMENTS,
+      Some("query"),
       None,
       sshAuditEvent.result,
       num_events = 1
@@ -127,6 +131,7 @@
       sshAuditEvent.accessPath,
       SSH_GERRIT_COMMAND,
       SSH_GERRIT_COMMAND_ARGUMENTS,
+      Some("query"),
       None,
       sshAuditEvent.result,
       num_events = 2
@@ -150,6 +155,7 @@
         sshAuditEvent.accessPath,
         SSH_GERRIT_COMMAND,
         SSH_GERRIT_COMMAND_ARGUMENTS,
+        Some("query"),
         None,
         sshAuditEvent.result,
         num_events = 1
@@ -162,6 +168,7 @@
         sshAuditEvent.accessPath,
         SSH_GERRIT_COMMAND,
         SSH_GERRIT_COMMAND_ARGUMENTS,
+        Some("query"),
         None,
         sshAuditEvent.result,
         num_events = 1
@@ -185,6 +192,7 @@
         sshAuditEvent.accessPath,
         SSH_GERRIT_COMMAND,
         SSH_GERRIT_COMMAND_ARGUMENTS,
+        Some("query"),
         None,
         sshAuditEvent.result,
         num_events = 1
@@ -198,6 +206,7 @@
         GIT_UPLOAD_PACK,
         authenticatedHttpAuditEvent.what,
         None,
+        None,
         authenticatedHttpAuditEvent.result,
         num_events = 1
       )
@@ -257,6 +266,15 @@
     aggregatedEventsDS.collect.length shouldBe 1
     aggregatedEventsDS.collect.head.project should contain(project)
   }
+
+  it should "extract sub-command" in {
+    val events = Seq(sshAuditEvent.copy(what = "aCommand.aSubCommand"))
+
+    val aggregatedEventsDS = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
+
+    aggregatedEventsDS.collect.length shouldBe 1
+    aggregatedEventsDS.collect.head.sub_command should contain("aSubCommand")
+  }
 }
 
 object AuditLogsTransformerSpec {
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala
index cbf6762..4b5e298 100644
--- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala
@@ -186,4 +186,62 @@
 
     SparkExtractors.extractCommandArguments(what, accessPath) shouldBe empty
   }
+
+  behavior of "extractSubCommand"
+
+  it should "extract SSH gerrit sub-command" in {
+    val what = s"gerrit.stream-events.-s.patchset-created.-s.change-restored.-s.comment-added"
+    val accessPath = "SSH_COMMAND"
+
+    SparkExtractors.extractSubCommand(what, accessPath) shouldBe Some("stream-events")
+  }
+
+  it should "extract SSH replication sub-command" in {
+    val what = s"replication.start.GerritCodeReview/*"
+    val accessPath = "SSH_COMMAND"
+
+    SparkExtractors.extractSubCommand(what, accessPath) shouldBe Some("start")
+  }
+
+  it should "return no sub-commands for GIT commands - SSH" in {
+    val what = s"git-upload-pack./spdk/spdk.github.io"
+    val accessPath = "GIT"
+
+    SparkExtractors.extractSubCommand(what, accessPath) shouldBe None
+  }
+
+  it should "return no sub-commands for GIT commands - HTTP" in {
+    val what = "https://review.gerrithub.io/redhat-openstack/infrared.git/git-upload-pack"
+    val accessPath = "GIT"
+
+    SparkExtractors.extractSubCommand(what, accessPath) shouldBe None
+  }
+
+  it should "extract REST API sub-command" in {
+    val what = "/changes/ffilz%2Fnfs-ganesha~372229/comments"
+    val accessPath = "REST_API"
+
+    SparkExtractors.extractSubCommand(what, accessPath) shouldBe Some("changes")
+  }
+
+  it should "return no sub-commands failed AUTH" in {
+    val what = s"AUTH"
+    val accessPath = null
+
+    SparkExtractors.extractSubCommand(what, accessPath) shouldBe None
+  }
+
+  it should "return no sub-commands for JSON _RPC commands" in {
+    val what = s"some_command"
+    val accessPath = "JSON_RPC"
+
+    SparkExtractors.extractSubCommand(what, accessPath) shouldBe None
+  }
+
+  it should "return no sub-commands for an unexpected access path" in {
+    val what = s"any"
+    val accessPath = "unexpected"
+
+    SparkExtractors.extractSubCommand(what, accessPath) shouldBe None
+  }
 }
\ No newline at end of file