Enrich user info in Auditlogs ETL

Allow enrichment of user information. This change on allows "user type" to be defined, but it can
be easily extended to any other useful information.

User data is passed via a CSV file with the following format:
id,type
123,bot
456,human

"User type" is defaulted to "human" if not present in this file.

A possible use case for this feature is the distinction between "bot" and "human" users.

Feature: Issue 10229
Change-Id: I042244154e668f5137ff97053f1cc2ad88342acd
diff --git a/README.md b/README.md
index 165c68a..9f5dfa4 100644
--- a/README.md
+++ b/README.md
@@ -147,15 +147,25 @@
 
 ## Parameters
 
-* -u, --gerritUrl             - gerrit server URL (Required)
-* --username                  - Gerrit API Username (Optional)
-* --password                  - Gerrit API Password (Optional)
-* -i, --elasticSearchIndex    - elasticSearch index to persist data into (Required)
-* -p, --eventsPath            - path to a directory (or a file) containing auditLogs events. Supports also _.gz_ files. (Required)
-* -a, --eventsTimeAggregation - Events of the same type, produced by the same user will be aggregated with this time granularity: 'second', 'minute', 'hour', 'week', 'month', 'quarter'. (Optional) - Default: 'hour'
-* -k, --ignoreSSLCert         - Ignore SSL certificate validation (Optional) - Default: false
-* -s, --since                 - process only auditLogs occurred after (and including) this date (Optional)
-* -u, --until                 - process only auditLogs occurred before (and including) this date (Optional)
+* -u, --gerritUrl              - gerrit server URL (Required)
+* --username                   - Gerrit API Username (Optional)
+* --password                   - Gerrit API Password (Optional)
+* -i, --elasticSearchIndex     - elasticSearch index to persist data into (Required)
+* -p, --eventsPath             - path to a directory (or a file) containing auditLogs events. Supports also _.gz_ files. (Required)
+* -a, --eventsTimeAggregation  - Events of the same type, produced by the same user will be aggregated with this time granularity: 'second', 'minute', 'hour', 'week', 'month', 'quarter'. (Optional) - Default: 'hour'
+* -k, --ignoreSSLCert          - Ignore SSL certificate validation (Optional) - Default: false
+* -s, --since                  - process only auditLogs occurred after (and including) this date (Optional)
+* -u, --until                  - process only auditLogs occurred before (and including) this date (Optional)
+* -a, --additionalUserInfoPath - path to a CSV file containing additional user information (Optional). Currently it is only possible to add user `type` (i.e.: _bot_, _human_).
+If the type is not specified the user will be considered _human_.
+
+  Here an additional user information CSV file example:
+  ```csv
+    id,type
+    123,"bot"
+    456,"bot"
+    789,"human"
+  ```
 
 ### Build
 
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/AdditionalUserInfo.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/AdditionalUserInfo.scala
new file mode 100644
index 0000000..4400a70
--- /dev/null
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/AdditionalUserInfo.scala
@@ -0,0 +1,45 @@
+package com.gerritforge.analytics.auditlog.broadcast
+
+import com.gerritforge.analytics.auditlog.model.AuditLogETLConfig
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+
+import scala.util.Try
+
+case class AdditionalUsersInfo(usersInfo: Map[GerritAccountId,AdditionalUserInfo]) {
+  def getUserType(who: Int): String = usersInfo.get(who).map(_.`type`).getOrElse(AdditionalUserInfo.DEFAULT_USER_TYPE)
+}
+
+object AdditionalUsersInfo {
+  val empty: AdditionalUsersInfo = AdditionalUsersInfo(Map.empty[GerritAccountId,AdditionalUserInfo])
+}
+
+case class AdditionalUserInfo(id: GerritAccountId, `type`: String)
+
+object AdditionalUserInfo {
+  val DEFAULT_USER_TYPE = "human"
+
+  def loadAdditionalUserInfo(config: AuditLogETLConfig)(implicit spark: SparkSession): Try[AdditionalUsersInfo] = {
+
+    val schema = new StructType()
+      .add("id", IntegerType,false)
+      .add("type", StringType,false)
+
+    import spark.implicits._
+    Try {
+      AdditionalUsersInfo(
+        config.additionalUserInfoPath.map { path =>
+            spark.read
+              .option("header", "true")
+              .schema(schema)
+              .csv(path)
+              .as[AdditionalUserInfo]
+              // We are collecting on the fair assumption that the additional user info file will fit in memory
+              .collect
+              .map(additionalUserInfo => additionalUserInfo.id -> additionalUserInfo)
+              .toMap
+        }.getOrElse(Map.empty[GerritAccountId,AdditionalUserInfo])
+      )
+    }
+  }
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritUserIdentifiers.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritUserIdentifiers.scala
index 0c805fb..9f040d2 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritUserIdentifiers.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritUserIdentifiers.scala
@@ -25,13 +25,13 @@
 import org.json4s.FieldSerializer._
 
 
-case class GerritUserIdentifiers(private val accounts: Map[GerriAccountId, GerritUserIdentifier]) {
-  def getIdentifier(accountId: GerriAccountId): GerritUserIdentifier = accounts.getOrElse(accountId, s"$accountId")
+case class GerritUserIdentifiers(private val accounts: Map[GerritAccountId, GerritUserIdentifier]) {
+  def getIdentifier(accountId: GerritAccountId): GerritUserIdentifier = accounts.getOrElse(accountId, s"$accountId")
 }
 
 object GerritUserIdentifiers extends LazyLogging {
 
-  private case class GerritAccount(accountId: GerriAccountId, username: Option[String], email: Option[String], name: Option[String]) {
+  private case class GerritAccount(accountId: GerritAccountId, username: Option[String], email: Option[String], name: Option[String]) {
     val getIdentifier: GerritUserIdentifier =
       name.getOrElse(
         email.getOrElse(
@@ -40,7 +40,7 @@
       )
   }
 
-  val empty = GerritUserIdentifiers(Map.empty[GerriAccountId, GerritUserIdentifier])
+  val empty = GerritUserIdentifiers(Map.empty[GerritAccountId, GerritUserIdentifier])
 
   private val gerritAccountSerializer = FieldSerializer[GerritAccount](
     deserializer=renameFrom(name="_account_id",newName="accountId")
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/package.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/package.scala
index 4cc45a8..0624eae 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/package.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/package.scala
@@ -15,6 +15,6 @@
 package com.gerritforge.analytics.auditlog
 
 package object broadcast {
-  type GerriAccountId = Int
+  type GerritAccountId = Int
   type GerritUserIdentifier = String
 }
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
index adda7f9..8690c58 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
@@ -14,7 +14,7 @@
 
 package com.gerritforge.analytics.auditlog.job
 
-import com.gerritforge.analytics.auditlog.broadcast.GerritUserIdentifiers
+import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUserInfo, GerritUserIdentifiers}
 import com.gerritforge.analytics.auditlog.model.ElasticSearchFields._
 import com.gerritforge.analytics.auditlog.model._
 import com.gerritforge.analytics.auditlog.range.TimeRange
@@ -40,14 +40,21 @@
         sys.exit(1)
       }
 
+      val triedAdditionalUserInfo = AdditionalUserInfo.loadAdditionalUserInfo(config)
+      if (triedAdditionalUserInfo.isFailure) {
+        logger.error("Error loading additional user information", triedAdditionalUserInfo.failed.get)
+        sys.exit(1)
+      }
+
       spark
         .getEventsFromPath(config.eventsPath.get)
-        .transformEvents(tryUserIdentifiers.get, config.eventsTimeAggregation.get, TimeRange(config.since, config.until))
+        .transformEvents(tryUserIdentifiers.get, triedAdditionalUserInfo.get,config.eventsTimeAggregation.get, TimeRange(config.since, config.until))
         .saveToEs(s"${config.elasticSearchIndex.get}/$DOCUMENT_TYPE")
 
     case None =>
       logger.error("Could not parse command line arguments")
       sys.exit(1)
   }
+
 }
 
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditLogETLConfig.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditLogETLConfig.scala
index 30e4c93..b95d81a 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditLogETLConfig.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditLogETLConfig.scala
@@ -25,5 +25,6 @@
   since: Option[LocalDate] = None,
   until: Option[LocalDate] = None,
   eventsPath: Option[String] = None,
-  eventsTimeAggregation: Option[String] = Some("hour")
+  eventsTimeAggregation: Option[String] = Some("hour"),
+  additionalUserInfoPath: Option[String] = None
 )
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/CommandLineArguments.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/CommandLineArguments.scala
index 2ff6854..21b6a84 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/CommandLineArguments.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/CommandLineArguments.scala
@@ -46,6 +46,10 @@
           c.copy(eventsPath = Some(input))
         } text "path to a directory (or a file) containing auditLogs events. Supports also '.gz' files. (Required)"
 
+        opt[String]('a', "additionalUserInfoPath") required () action { (input, c) =>
+          c.copy(additionalUserInfoPath = Some(input))
+        } text "path to a CSV file containing additional user information (Optional)\n\t\t\t\tCSV must have an header. Example:\n\t\t\t\tid,type\n\t\t\t\t123456,'bot'\n\t\t\t\t678876,'human'"
+
         opt[String]('a', "eventsTimeAggregation") optional () action { (input, c) =>
           c.copy(eventsTimeAggregation = Some(input))
         } text "Events of the same type, produced by the same user will be aggregated with this time granularity: " +
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
index 41d33ec..1980385 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
@@ -22,11 +22,13 @@
   val AUDIT_TYPE_FIELD      = "audit_type"
   val ACCESS_PATH_FIELD     = "access_path"
   val RESULT_FIELD          = "result"
+  val USER_TYPE_FIELD       = "user_type"
 
   val FACETING_FIELDS = List(
     TIME_BUCKET_FIELD,
     AUDIT_TYPE_FIELD,
     USER_IDENTIFIER_FIELD,
+    USER_TYPE_FIELD,
     ACCESS_PATH_FIELD,
     COMMAND_FIELD,
     COMMAND_ARGS_FIELD,
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
index 1b06829..d9138bc 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
@@ -14,7 +14,7 @@
 
 package com.gerritforge.analytics.auditlog.spark
 
-import com.gerritforge.analytics.auditlog.broadcast.GerritUserIdentifiers
+import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUserInfo, AdditionalUsersInfo, GerritUserIdentifiers}
 import com.gerritforge.analytics.auditlog.model.AuditEvent
 import com.gerritforge.analytics.auditlog.model.ElasticSearchFields._
 import com.gerritforge.analytics.auditlog.range.TimeRange
@@ -23,9 +23,10 @@
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SparkSession}
 
-case class AuditLogsTransformer(gerritIdentifiers: GerritUserIdentifiers = GerritUserIdentifiers.empty)(implicit spark: SparkSession) {
+case class AuditLogsTransformer(gerritIdentifiers: GerritUserIdentifiers = GerritUserIdentifiers.empty, additionalUsersInfo: AdditionalUsersInfo = AdditionalUsersInfo.empty)(implicit spark: SparkSession) {
 
   private val broadcastUserIdentifiers = spark.sparkContext.broadcast(gerritIdentifiers)
+  private val broadcastAdditionalUsersInfo = spark.sparkContext.broadcast(additionalUsersInfo)
 
   def transform(auditEventsRDD: RDD[AuditEvent], timeAggregation: String, timeRange: TimeRange = TimeRange.always): DataFrame =
     auditEventsRDD
@@ -35,5 +36,6 @@
       .hydrateWithUserIdentifierColumn(USER_IDENTIFIER_FIELD, broadcastUserIdentifiers.value)
       .withTimeBucketColum(TIME_BUCKET_FIELD, timeAggregation)
       .withCommandColumns(COMMAND_FIELD, COMMAND_ARGS_FIELD)
+      .withUserTypeColumn(USER_TYPE_FIELD, broadcastAdditionalUsersInfo.value)
       .aggregateNumEventsColumn(NUM_EVENTS_FIELD, FACETING_FIELDS)
-}
+}
\ No newline at end of file
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
index f1ddd9b..80f2e24 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
@@ -14,7 +14,7 @@
 
 package com.gerritforge.analytics.auditlog.spark.dataframe.ops
 
-import com.gerritforge.analytics.auditlog.broadcast.GerritUserIdentifiers
+import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUsersInfo, GerritUserIdentifiers}
 import com.gerritforge.analytics.auditlog.spark.sql.udf.SparkExtractors.{extractCommandArgumentsUDF, extractCommandUDF}
 import org.apache.spark.sql.{Column, DataFrame}
 import org.apache.spark.sql.expressions.UserDefinedFunction
@@ -53,6 +53,12 @@
         .withColumn(commandArgsCol, extractCommandArgumentsUDF(col("what"), col("access_path")))
     }
 
+    def withUserTypeColumn(commandCol: String, additionalUsersInfo: AdditionalUsersInfo): DataFrame = {
+      def extractUserType: UserDefinedFunction = udf((who: Int) => additionalUsersInfo.getUserType(who))
+
+      dataFrame.withColumn(commandCol, ifExistThenGetOrNull("who", extractUserType(col("who"))))
+    }
+
     def aggregateNumEventsColumn(numEventsCol: String, cols: List[String]): DataFrame = {
       dataFrame.groupBy(cols.map(c => col(c)): _*)
         .agg(count("*")
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/rdd/ops/SparkRDDOps.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/rdd/ops/SparkRDDOps.scala
index 4602aee..22efd88 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/rdd/ops/SparkRDDOps.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/rdd/ops/SparkRDDOps.scala
@@ -14,7 +14,7 @@
 
 package com.gerritforge.analytics.auditlog.spark.rdd.ops
 
-import com.gerritforge.analytics.auditlog.broadcast.GerritUserIdentifiers
+import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUsersInfo, GerritUserIdentifiers}
 import com.gerritforge.analytics.auditlog.model.AuditEvent
 import com.gerritforge.analytics.auditlog.range.TimeRange
 import com.gerritforge.analytics.auditlog.spark.AuditLogsTransformer
@@ -29,10 +29,12 @@
 
     def toJsonString: RDD[String] = rdd.map(_.toJsonString)
 
-    def transformEvents(gerritUserIdentifiers: GerritUserIdentifiers, timeAggregation: String, timeRange: TimeRange)
-                       (implicit spark: SparkSession): DataFrame =
-      AuditLogsTransformer(gerritUserIdentifiers)
+    def transformEvents(gerritUserIdentifiers: GerritUserIdentifiers, additionalUsersInfo: AdditionalUsersInfo, timeAggregation: String, timeRange: TimeRange)
+                       (implicit spark: SparkSession): DataFrame = {
+
+      AuditLogsTransformer(gerritUserIdentifiers, additionalUsersInfo)
         .transform(rdd, timeAggregation, timeRange)
+    }
   }
 
   implicit class PimpedStringRDD(rdd: RDD[String]) {
@@ -41,5 +43,4 @@
       spark.sqlContext.read.json(rdd.toDS())
     }
   }
-
 }
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
index aade9f9..b0a09a2 100644
--- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
@@ -16,7 +16,7 @@
 import java.sql
 
 import com.gerritforge.analytics.SparkTestSupport
-import com.gerritforge.analytics.auditlog.broadcast.GerritUserIdentifiers
+import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUserInfo, AdditionalUsersInfo, GerritUserIdentifiers}
 import com.gerritforge.analytics.auditlog.model.{ElasticSearchFields, HttpAuditEvent, SshAuditEvent}
 import com.gerritforge.analytics.auditlog.spark.AuditLogsTransformer
 import com.gerritforge.analytics.support.ops.CommonTimeOperations._
@@ -38,6 +38,7 @@
         AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
         HttpAuditEvent.auditType,
         null, // no user identifier
+        null, // no user type
         anonymousHttpAuditEvent.accessPath.get,
         GIT_UPLOAD_PACK,
         anonymousHttpAuditEvent.what,
@@ -58,6 +59,7 @@
       AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
       HttpAuditEvent.auditType,
       s"${authenticatedHttpAuditEvent.who.get}",
+      AdditionalUserInfo.DEFAULT_USER_TYPE,
       authenticatedHttpAuditEvent.accessPath.get,
       GIT_UPLOAD_PACK,
       authenticatedHttpAuditEvent.what,
@@ -81,6 +83,7 @@
       AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
       HttpAuditEvent.auditType,
       gerritUserIdentifier,
+      AdditionalUserInfo.DEFAULT_USER_TYPE,
       authenticatedHttpAuditEvent.accessPath.get,
       GIT_UPLOAD_PACK,
       authenticatedHttpAuditEvent.what,
@@ -101,6 +104,7 @@
       AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
       SshAuditEvent.auditType,
       s"${sshAuditEvent.who.get}",
+      AdditionalUserInfo.DEFAULT_USER_TYPE,
       sshAuditEvent.accessPath.get,
       SSH_GERRIT_COMMAND,
       SSH_GERRIT_COMMAND_ARGUMENTS,
@@ -121,6 +125,7 @@
       AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
       SshAuditEvent.auditType,
       s"${sshAuditEvent.who.get}",
+      AdditionalUserInfo.DEFAULT_USER_TYPE,
       sshAuditEvent.accessPath.get,
       SSH_GERRIT_COMMAND,
       SSH_GERRIT_COMMAND_ARGUMENTS,
@@ -143,6 +148,7 @@
         AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
         SshAuditEvent.auditType,
         s"${sshAuditEvent.who.get}",
+        AdditionalUserInfo.DEFAULT_USER_TYPE,
         sshAuditEvent.accessPath.get,
         SSH_GERRIT_COMMAND,
         SSH_GERRIT_COMMAND_ARGUMENTS,
@@ -153,6 +159,7 @@
         AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
         SshAuditEvent.auditType,
         s"${user2Id.get}",
+        AdditionalUserInfo.DEFAULT_USER_TYPE,
         sshAuditEvent.accessPath.get,
         SSH_GERRIT_COMMAND,
         SSH_GERRIT_COMMAND_ARGUMENTS,
@@ -176,6 +183,7 @@
         AuditLogsTransformerSpec.epochMillisToNearestHour(events.head.timeAtStart),
         SshAuditEvent.auditType,
         s"${sshAuditEvent.who.get}",
+        AdditionalUserInfo.DEFAULT_USER_TYPE,
         sshAuditEvent.accessPath.get,
         SSH_GERRIT_COMMAND,
         SSH_GERRIT_COMMAND_ARGUMENTS,
@@ -186,6 +194,7 @@
         AuditLogsTransformerSpec.epochMillisToNearestHour(events.last.timeAtStart),
         HttpAuditEvent.auditType,
         s"${authenticatedHttpAuditEvent.who.get}",
+        AdditionalUserInfo.DEFAULT_USER_TYPE,
         authenticatedHttpAuditEvent.accessPath.get,
         GIT_UPLOAD_PACK,
         authenticatedHttpAuditEvent.what,
@@ -194,6 +203,40 @@
       )
     )
   }
+
+  it should "process user type" in {
+    val events = Seq(authenticatedHttpAuditEvent)
+
+    val userType = "nonDefaultUserType"
+    val additionalUserInfo = AdditionalUserInfo(authenticatedHttpAuditEvent.who.get, userType)
+
+    val dataFrame = AuditLogsTransformer(additionalUsersInfo = AdditionalUsersInfo(Map(authenticatedHttpAuditEvent.who.get -> additionalUserInfo))).transform(
+        auditEventsRDD        = sc.parallelize(events),
+        timeAggregation       = "hour"
+    )
+    dataFrame.collect.length shouldBe 1
+    dataFrame.collect.head.getAs[String](ElasticSearchFields.USER_TYPE_FIELD) shouldBe userType
+  }
+
+  it should "process user type when gerrit account could be identified" in {
+    val events = Seq(authenticatedHttpAuditEvent)
+    val gerritUserIdentifier = "Antonio Barone"
+
+    val userType = "nonDefaultUserType"
+    val additionalUserInfo = AdditionalUserInfo(authenticatedHttpAuditEvent.who.get, userType)
+
+    val dataFrame =
+      AuditLogsTransformer(
+        gerritIdentifiers = GerritUserIdentifiers(Map(authenticatedHttpAuditEvent.who.get -> gerritUserIdentifier)),
+        additionalUsersInfo = AdditionalUsersInfo(Map(authenticatedHttpAuditEvent.who.get -> additionalUserInfo))
+      ).transform(
+          auditEventsRDD        = sc.parallelize(events),
+          timeAggregation       = "hour"
+      )
+
+    dataFrame.collect.length shouldBe 1
+    dataFrame.collect.head.getAs[String](ElasticSearchFields.USER_TYPE_FIELD) shouldBe userType
+  }
 }
 
 object AuditLogsTransformerSpec {