Extract project from audit logs

Attempt, where possible, to extract project names from audit logs. This
uses a list of public projects whith is retrieved via the /project/
endpoint via the `GerritProjects` class.

Feature: Issue 10225
Change-Id: Ifb41bd9b7c4a2db3d081dbddb0e1cf377f41338c
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritProjects.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritProjects.scala
new file mode 100644
index 0000000..cbb72ac
--- /dev/null
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritProjects.scala
@@ -0,0 +1,123 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+
+// http://www.apache.org/licenses/LICENSE-2.0
+
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.broadcast
+import java.net.URLDecoder
+
+import com.gerritforge.analytics.auditlog.util.RegexUtil
+import com.gerritforge.analytics.common.api.GerritConnectivity
+import com.gerritforge.analytics.support.ops.GerritSourceOps._
+import com.typesafe.scalalogging.LazyLogging
+import org.json4s.native.JsonMethods._
+import org.json4s.{DefaultFormats, _}
+
+import scala.annotation.tailrec
+import scala.util.{Failure, Success, Try}
+
+case class GerritProject(name: GerritProjectName)
+case class GerritProjects(private val projects: Map[GerritProjectName, GerritProject]) extends LazyLogging with RegexUtil {
+
+  private val PROJECT_SSH_WITH_SPACES           = capture(r = """project:(.+?)\)?\s""")
+  private val PROJECT_SSH_WITH_BRACKETS         = capture(r = """project:\{\^?(.*)\}""")
+  private val PROJECT_SSH_NO_SPACES             = capture(r = """project(?::|.)(.*)""")
+  private val PROJECT_SSH_PACK                  = capture(r = """(?:git-receive-pack|git-upload-pack)\.\/(.+)""")
+  private val PROJECT_SSH_REPLICATION_START     = capture(r = """replication\.start\.([\.\/a-zA-Z0-9]+(?:-[a-zA-Z0-9\.]+)*)(?:\.|\s|$)""")
+  private val PROJECT_REST_API_CHANGES_SEGMENT  = capture(r = """changes\/([^\/]+)~""")
+  private val PROJECT_REST_API_PROJECTS_SEGMENT = capture(r = """projects\/([^\/]+)""")
+  private val PROJECT_HTTP_PACK_INFO_REF        = capture(r = """https?:\/\/(.+)\/info\/refs\?service=(?:git-upload-pack|git-receive-pack)""")
+  private val PROJECT_HTTP_PACK                 = capture(r = """https?:\/\/(.+)\/(?:git-upload-pack|git-receive-pack)""")
+
+  private val NO_PROJECT_RELATED_COMMANDS = capture(r = """(LOGIN|LOGOUT|AUTH)""")
+
+  private def existProject(id: GerritProjectName): Boolean = projects.get(id).isDefined
+
+  // Helper method to find a project at the start of a string.
+  // For example, the string `redhat-performance/quads.github.com.status:open.foo:bar` will match the project
+  // redhat-performance/quads.github.com, if that project exists in `gerritProject`
+  private def findProjectStringAtStart(rawProject: String, sep: Char = '.'): Option[String] =
+    rawProject.split(sep).foldLeft(List.empty[String]) { case (acc, token) =>
+      acc.headOption.map { t => (t + sep + token) +: acc }.getOrElse(List(token))
+    }.find(existProject)
+
+  // Helper method to find a project at the end of a string.
+  // For example, the string `gerrit.foo.bar.baz.redhat-performance/quads.github.com` will match the project
+  // redhat-performance/quads.github.com, if that project exists in `gerritProject`
+  private def findProjectStringAtEnd(rawProject: String, sep: Char = '.'): Option[String] =
+    rawProject.split(sep).foldRight(List.empty[String]) { case (token, acc) =>
+      acc.headOption.map { t => (token + sep + t) +: acc }.getOrElse(List(token))
+    }.find(existProject)
+
+  def extractProject(what: String, accessPath: String): Option[String] = accessPath match {
+    case _ if matches(NO_PROJECT_RELATED_COMMANDS, what) =>
+      None
+    case "SSH_COMMAND" =>
+      extractGroup(PROJECT_SSH_WITH_SPACES, what)
+        .orElse(extractGroup(PROJECT_SSH_WITH_BRACKETS, what))
+        .orElse(extractGroup(PROJECT_SSH_PACK, what))
+        .orElse(extractGroup(PROJECT_SSH_REPLICATION_START, what))
+        .orElse(extractGroup(PROJECT_SSH_NO_SPACES, what).flatMap(findProjectStringAtStart(_)))
+        .orElse(findProjectStringAtStart(what))
+        .orElse(findProjectStringAtEnd(what))
+    case "REST_API" | "UNKNOWN" =>
+      extractGroup(PROJECT_REST_API_CHANGES_SEGMENT, what)
+        .orElse(extractGroup(PROJECT_REST_API_PROJECTS_SEGMENT, what))
+        .map(URLDecoder.decode(_, "UTF-8"))
+    case "GIT" =>
+      extractGroup(PROJECT_HTTP_PACK_INFO_REF, what)
+        .orElse(extractGroup(PROJECT_HTTP_PACK, what))
+        .flatMap(findProjectStringAtEnd(_, '/'))
+    case unexpected =>
+      logger.warn(s"Unexpected access path '$unexpected' encountered when extracting project from '$what'")
+      None
+  }
+}
+
+object GerritProjects extends LazyLogging {
+
+  val empty = GerritProjects(Map.empty[GerritProjectName, GerritProject])
+
+  implicit private val formats = DefaultFormats
+
+  def loadProjects(gerritConnectivity: GerritConnectivity, gerritUrl: String): Try[GerritProjects] = {
+    val PAGE_SIZE = 500
+    logger.debug(s"Loading gerrit projects...")
+
+    val baseUrl = s"""$gerritUrl/projects/?n=$PAGE_SIZE&query=state%3Aactive%20OR%20state%3Aread-only"""
+
+    @tailrec
+    def loopThroughPages(more: Boolean, triedAcc: Try[GerritProjects] = Success(empty)): Try[GerritProjects] = {
+      if (!more)
+        triedAcc
+      else {
+        val acc = triedAcc.get
+
+        val url              = baseUrl + s"&S=${acc.projects.size}"
+        val accountsJsonPage = gerritConnectivity.getContentFromApi(url).dropGerritPrefix.mkString
+
+        logger.debug(s"Getting gerrit projects - start: ${acc.projects.size}")
+
+        val pageInfo = Try(parse(accountsJsonPage)).map { jsMapProjects =>
+          val thisPageProjects = jsMapProjects.extract[List[GerritProject]].map(prj => prj.name -> prj)
+          (thisPageProjects.size == PAGE_SIZE, acc.copy(projects = acc.projects ++ thisPageProjects))
+        }
+
+        pageInfo match {
+          case Success((newMore, newGerritProjects)) => loopThroughPages(newMore, Success(newGerritProjects))
+          case Failure(exception) => loopThroughPages(more=false, Failure(exception))
+        }
+      }
+    }
+    loopThroughPages(more=true)
+  }
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/package.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/package.scala
index 0624eae..e06aa27 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/package.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/package.scala
@@ -16,5 +16,6 @@
 
 package object broadcast {
   type GerritAccountId = Int
+  type GerritProjectName = String
   type GerritUserIdentifier = String
 }
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
index 8690c58..0557983 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
@@ -14,7 +14,7 @@
 
 package com.gerritforge.analytics.auditlog.job
 
-import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUserInfo, GerritUserIdentifiers}
+import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUserInfo, GerritProjects, GerritUserIdentifiers}
 import com.gerritforge.analytics.auditlog.model.ElasticSearchFields._
 import com.gerritforge.analytics.auditlog.model._
 import com.gerritforge.analytics.auditlog.range.TimeRange
@@ -30,6 +30,17 @@
 
   CommandLineArguments(args) match {
     case Some(config) =>
+
+      val tryProjects = GerritProjects.loadProjects(
+        new GerritConnectivity(config.gerritUsername, config.gerritPassword, config.ignoreSSLCert.getOrElse(false)),
+        config.gerritUrl.get
+      )
+
+      if (tryProjects.isFailure) {
+        logger.error("Error loading public projects", tryProjects.failed.get)
+        sys.exit(1)
+      }
+
       val tryUserIdentifiers = GerritUserIdentifiers.loadAccounts(
         new GerritConnectivity(config.gerritUsername, config.gerritPassword, config.ignoreSSLCert.getOrElse(false)),
         config.gerritUrl.get
@@ -48,7 +59,13 @@
 
       spark
         .getEventsFromPath(config.eventsPath.get)
-        .transformEvents(tryUserIdentifiers.get, triedAdditionalUserInfo.get,config.eventsTimeAggregation.get, TimeRange(config.since, config.until))
+        .transformEvents(
+          tryUserIdentifiers.get,
+          triedAdditionalUserInfo.get,
+          tryProjects.get,
+          config.eventsTimeAggregation.get,
+          TimeRange(config.since, config.until)
+        )
         .saveToEs(s"${config.elasticSearchIndex.get}/$DOCUMENT_TYPE")
 
     case None =>
@@ -56,5 +73,4 @@
       sys.exit(1)
   }
 
-}
-
+}
\ No newline at end of file
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
index 1980385..59114c2 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
@@ -23,6 +23,7 @@
   val ACCESS_PATH_FIELD     = "access_path"
   val RESULT_FIELD          = "result"
   val USER_TYPE_FIELD       = "user_type"
+  val PROJECT_FIELD         = "project"
 
   val FACETING_FIELDS = List(
     TIME_BUCKET_FIELD,
@@ -32,6 +33,7 @@
     ACCESS_PATH_FIELD,
     COMMAND_FIELD,
     COMMAND_ARGS_FIELD,
+    PROJECT_FIELD,
     RESULT_FIELD
   )
 
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
index d9138bc..3fef687 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
@@ -14,7 +14,7 @@
 
 package com.gerritforge.analytics.auditlog.spark
 
-import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUserInfo, AdditionalUsersInfo, GerritUserIdentifiers}
+import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUsersInfo, GerritProjects, GerritUserIdentifiers}
 import com.gerritforge.analytics.auditlog.model.AuditEvent
 import com.gerritforge.analytics.auditlog.model.ElasticSearchFields._
 import com.gerritforge.analytics.auditlog.range.TimeRange
@@ -23,10 +23,15 @@
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SparkSession}
 
-case class AuditLogsTransformer(gerritIdentifiers: GerritUserIdentifiers = GerritUserIdentifiers.empty, additionalUsersInfo: AdditionalUsersInfo = AdditionalUsersInfo.empty)(implicit spark: SparkSession) {
+case class AuditLogsTransformer(
+  gerritIdentifiers: GerritUserIdentifiers = GerritUserIdentifiers.empty,
+  additionalUsersInfo: AdditionalUsersInfo = AdditionalUsersInfo.empty,
+  gerritProjects: GerritProjects = GerritProjects.empty
+)(implicit spark: SparkSession) {
 
   private val broadcastUserIdentifiers = spark.sparkContext.broadcast(gerritIdentifiers)
   private val broadcastAdditionalUsersInfo = spark.sparkContext.broadcast(additionalUsersInfo)
+  private val broadcastGerritProjects = spark.sparkContext.broadcast(gerritProjects)
 
   def transform(auditEventsRDD: RDD[AuditEvent], timeAggregation: String, timeRange: TimeRange = TimeRange.always): DataFrame =
     auditEventsRDD
@@ -37,5 +42,6 @@
       .withTimeBucketColum(TIME_BUCKET_FIELD, timeAggregation)
       .withCommandColumns(COMMAND_FIELD, COMMAND_ARGS_FIELD)
       .withUserTypeColumn(USER_TYPE_FIELD, broadcastAdditionalUsersInfo.value)
+      .withProjectColumn(PROJECT_FIELD, broadcastGerritProjects.value)
       .aggregateNumEventsColumn(NUM_EVENTS_FIELD, FACETING_FIELDS)
 }
\ No newline at end of file
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
index 80f2e24..927556a 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
@@ -14,11 +14,11 @@
 
 package com.gerritforge.analytics.auditlog.spark.dataframe.ops
 
-import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUsersInfo, GerritUserIdentifiers}
+import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUsersInfo, GerritProjects, GerritUserIdentifiers}
 import com.gerritforge.analytics.auditlog.spark.sql.udf.SparkExtractors.{extractCommandArgumentsUDF, extractCommandUDF}
-import org.apache.spark.sql.{Column, DataFrame}
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.functions.{udf, _}
+import org.apache.spark.sql.{Column, DataFrame}
 
 import scala.util.Try
 
@@ -53,6 +53,13 @@
         .withColumn(commandArgsCol, extractCommandArgumentsUDF(col("what"), col("access_path")))
     }
 
+    def withProjectColumn(projectCol: String, gerritProjects: GerritProjects): DataFrame = {
+      def extractProjectUDF: UserDefinedFunction = udf((what: String, accessPath: String) => gerritProjects.extractProject(what, accessPath))
+
+      dataFrame
+        .withColumn(projectCol, extractProjectUDF(col("what"), col("access_path")))
+    }
+
     def withUserTypeColumn(commandCol: String, additionalUsersInfo: AdditionalUsersInfo): DataFrame = {
       def extractUserType: UserDefinedFunction = udf((who: Int) => additionalUsersInfo.getUserType(who))
 
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/rdd/ops/SparkRDDOps.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/rdd/ops/SparkRDDOps.scala
index 22efd88..bfc466e 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/rdd/ops/SparkRDDOps.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/rdd/ops/SparkRDDOps.scala
@@ -14,7 +14,7 @@
 
 package com.gerritforge.analytics.auditlog.spark.rdd.ops
 
-import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUsersInfo, GerritUserIdentifiers}
+import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUsersInfo, GerritProjects, GerritUserIdentifiers}
 import com.gerritforge.analytics.auditlog.model.AuditEvent
 import com.gerritforge.analytics.auditlog.range.TimeRange
 import com.gerritforge.analytics.auditlog.spark.AuditLogsTransformer
@@ -29,10 +29,15 @@
 
     def toJsonString: RDD[String] = rdd.map(_.toJsonString)
 
-    def transformEvents(gerritUserIdentifiers: GerritUserIdentifiers, additionalUsersInfo: AdditionalUsersInfo, timeAggregation: String, timeRange: TimeRange)
-                       (implicit spark: SparkSession): DataFrame = {
+    def transformEvents(
+      gerritUserIdentifiers: GerritUserIdentifiers,
+      additionalUsersInfo: AdditionalUsersInfo,
+      gerritProjects: GerritProjects,
+      timeAggregation: String,
+      timeRange: TimeRange
+    )(implicit spark: SparkSession): DataFrame = {
 
-      AuditLogsTransformer(gerritUserIdentifiers, additionalUsersInfo)
+      AuditLogsTransformer(gerritUserIdentifiers, additionalUsersInfo, gerritProjects)
         .transform(rdd, timeAggregation, timeRange)
     }
   }
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala
index 013213f..6abe234 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala
@@ -14,26 +14,24 @@
 
 package com.gerritforge.analytics.auditlog.spark.sql.udf
 
+import com.gerritforge.analytics.auditlog.util.RegexUtil
 import com.typesafe.scalalogging.LazyLogging
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.functions.udf
 
-import scala.util.matching.Regex
+case object SparkExtractors extends LazyLogging with RegexUtil {
 
-case object SparkExtractors extends LazyLogging {
-  private val GERRIT_SSH_COMMAND = new Regex("""^(.+?)\.""", "capture")
-  private val GERRIT_SSH_COMMAND_ARGUMENTS = new Regex("""^.+?\.(.+)""", "capture")
+  // regular expressions to extract commands
+  private val GERRIT_SSH_COMMAND = capture(r = """^(.+?)\.""")
+  private val GIT_COMMAND        = capture(r = """.*(git-upload-pack|git-receive-pack)""")
 
-  private val GIT_COMMAND = new Regex(""".*(git-upload-pack|git-receive-pack)""", "capture")
-  private val GIT_SSH_COMMAND_ARGUMENTS = new Regex("""git-(?:upload|receive)-pack\.(.+)""", "capture")
-  private val GIT_HTTP_COMMAND_ARGUMENTS = new Regex("""(^http.*)""", "capture")
+  // regular expressions to extract command arguments
+  private val GERRIT_SSH_COMMAND_ARGUMENTS = capture(r = """^.+?\.(.+)""")
+  private val GIT_SSH_COMMAND_ARGUMENTS    = capture(r = """git-(?:upload|receive)-pack\.(.+)""")
+  private val GIT_HTTP_COMMAND_ARGUMENTS   = capture(r = """(^http.*)""")
 
   val FAILED_SSH_AUTH = "FAILED_SSH_AUTH"
 
-  private def extractOrElse(rx: Regex, target: String, default: String): String = extractGroup(rx, target).getOrElse(default)
-
-  private def extractGroup(rx: Regex, target: String): Option[String] = rx.findAllMatchIn(target).toList.headOption.map(_.group("capture"))
-
   def extractCommand(what: String, accessPath: String, httpMethod: String = null): String = accessPath match {
     case "SSH_COMMAND"          => extractOrElse(GERRIT_SSH_COMMAND, what, what)
     case "GIT"                  => extractOrElse(GIT_COMMAND, what, what)
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/util/RegexUtil.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/util/RegexUtil.scala
new file mode 100644
index 0000000..f2cbce3
--- /dev/null
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/util/RegexUtil.scala
@@ -0,0 +1,27 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+
+// http://www.apache.org/licenses/LICENSE-2.0
+
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.util
+import scala.util.matching.Regex
+
+trait RegexUtil {
+  def capture(r: String) = new Regex(r, "capture")
+
+  def matches(regex: Regex, string: String): Boolean = regex.findFirstIn(string).isDefined
+
+  def extractOrElse(rx: Regex, target: String, default: String): String = extractGroup(rx, target).getOrElse(default)
+
+  def extractGroup(rx: Regex, target: String): Option[String] = rx.findAllMatchIn(target).toList.headOption.map(_.group("capture"))
+
+}
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
index b0a09a2..3b5fefc 100644
--- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
@@ -16,7 +16,7 @@
 import java.sql
 
 import com.gerritforge.analytics.SparkTestSupport
-import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUserInfo, AdditionalUsersInfo, GerritUserIdentifiers}
+import com.gerritforge.analytics.auditlog.broadcast._
 import com.gerritforge.analytics.auditlog.model.{ElasticSearchFields, HttpAuditEvent, SshAuditEvent}
 import com.gerritforge.analytics.auditlog.spark.AuditLogsTransformer
 import com.gerritforge.analytics.support.ops.CommonTimeOperations._
@@ -42,6 +42,7 @@
         anonymousHttpAuditEvent.accessPath.get,
         GIT_UPLOAD_PACK,
         anonymousHttpAuditEvent.what,
+        null, // no project
         anonymousHttpAuditEvent.result,
         expectedAggregatedCount
     )
@@ -63,6 +64,7 @@
       authenticatedHttpAuditEvent.accessPath.get,
       GIT_UPLOAD_PACK,
       authenticatedHttpAuditEvent.what,
+      null, // no project
       authenticatedHttpAuditEvent.result,
       expectedAggregatedCount
     )
@@ -87,6 +89,7 @@
       authenticatedHttpAuditEvent.accessPath.get,
       GIT_UPLOAD_PACK,
       authenticatedHttpAuditEvent.what,
+      null, // no project
       authenticatedHttpAuditEvent.result,
       expectedAggregatedCount
     )
@@ -108,6 +111,7 @@
       sshAuditEvent.accessPath.get,
       SSH_GERRIT_COMMAND,
       SSH_GERRIT_COMMAND_ARGUMENTS,
+      null, // no project
       sshAuditEvent.result,
       expectedAggregatedCount
     )
@@ -129,6 +133,7 @@
       sshAuditEvent.accessPath.get,
       SSH_GERRIT_COMMAND,
       SSH_GERRIT_COMMAND_ARGUMENTS,
+      null, // no project
       sshAuditEvent.result,
       expectedAggregatedCount
     )
@@ -152,6 +157,7 @@
         sshAuditEvent.accessPath.get,
         SSH_GERRIT_COMMAND,
         SSH_GERRIT_COMMAND_ARGUMENTS,
+        null, // no project
         sshAuditEvent.result,
         expectedAggregatedCount
       ),
@@ -163,6 +169,7 @@
         sshAuditEvent.accessPath.get,
         SSH_GERRIT_COMMAND,
         SSH_GERRIT_COMMAND_ARGUMENTS,
+        null, // no project
         sshAuditEvent.result,
         expectedAggregatedCount
       )
@@ -187,6 +194,7 @@
         sshAuditEvent.accessPath.get,
         SSH_GERRIT_COMMAND,
         SSH_GERRIT_COMMAND_ARGUMENTS,
+        null, // no project
         sshAuditEvent.result,
         expectedSshAggregatedCount
       ),
@@ -198,6 +206,7 @@
         authenticatedHttpAuditEvent.accessPath.get,
         GIT_UPLOAD_PACK,
         authenticatedHttpAuditEvent.what,
+        null, // no project
         authenticatedHttpAuditEvent.result,
         expectedHttpAggregatedCount
       )
@@ -237,6 +246,26 @@
     dataFrame.collect.length shouldBe 1
     dataFrame.collect.head.getAs[String](ElasticSearchFields.USER_TYPE_FIELD) shouldBe userType
   }
+
+  it should "extract gerrit project from an http event" in {
+    val events = Seq(authenticatedHttpAuditEvent)
+
+    val dataFrame = AuditLogsTransformer(gerritProjects = GerritProjects(Map(project -> GerritProject(project))))
+      .transform(sc.parallelize(events), timeAggregation="hour")
+
+    dataFrame.collect.length shouldBe 1
+    dataFrame.collect.head.getAs[String](ElasticSearchFields.PROJECT_FIELD) shouldBe project
+  }
+
+  it should "extract gerrit project from an ssh event" in {
+    val events = Seq(sshAuditEvent)
+
+    val dataFrame = AuditLogsTransformer(gerritProjects = GerritProjects(Map(project -> GerritProject(project))))
+      .transform(sc.parallelize(events), timeAggregation="hour")
+
+    dataFrame.collect.length shouldBe 1
+    dataFrame.collect.head.getAs[String](ElasticSearchFields.PROJECT_FIELD) shouldBe project
+  }
 }
 
 object AuditLogsTransformerSpec {
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/TestFixtures.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/TestFixtures.scala
index 7c5b65e..a69ac29 100644
--- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/TestFixtures.scala
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/TestFixtures.scala
@@ -23,13 +23,14 @@
   val timeAtStart = 1544802407000L
   val elapsed = 12
   val uuid = "audit:5f10fea5-35d1-4252-b86f-99db7a9b549b"
+  val project = "Mirantis/tcp-qa"
 
   val GIT_UPLOAD_PACK = "git-upload-pack"
 
   val httpMethod = "GET"
   val httpStatus = "200"
 
-  val httpWhat=s"https://review.gerrithub.io/Mirantis/tcp-qa/$GIT_UPLOAD_PACK"
+  val httpWhat=s"https://review.gerrithub.io/$project/$GIT_UPLOAD_PACK"
 
   val anonymousHttpAuditEvent = HttpAuditEvent(Some(gitAccessPath), httpMethod, httpStatus, sessionId, None, timeAtStart, httpWhat, elapsed, uuid)
   val authenticatedHttpAuditEvent: HttpAuditEvent = anonymousHttpAuditEvent.copy(who=Some(userId))
@@ -37,7 +38,7 @@
   val sshAccessPath  = "SSH_COMMAND"
   val sshResult = "0"
   val SSH_GERRIT_COMMAND = "gerrit"
-  val SSH_GERRIT_COMMAND_ARGUMENTS = "stream-events.-s.patchset-created.-s.change-restored.-s.comment-added"
+  val SSH_GERRIT_COMMAND_ARGUMENTS = s"query.--format.json.--current-patch-set.project:$project"
 
   val sshWhat        = s"$SSH_GERRIT_COMMAND.$SSH_GERRIT_COMMAND_ARGUMENTS"
 
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/broadcast/GerritProjectsSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/broadcast/GerritProjectsSpec.scala
new file mode 100644
index 0000000..57598e4
--- /dev/null
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/broadcast/GerritProjectsSpec.scala
@@ -0,0 +1,238 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+
+// http://www.apache.org/licenses/LICENSE-2.0
+
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.broadcast
+import java.net.URLEncoder
+
+import org.scalatest.{FlatSpec, Matchers}
+
+class GerritProjectsSpec extends FlatSpec with Matchers {
+  behavior of "extract project from SSH commands"
+
+  it should "extract nothing where project could not be found in existing projects" in {
+    val project = "spdk/spdk.github.io"
+    val what = s"gerrit.query.--format.json.--current-patch-set.project:$project"
+    val accessPath = "SSH_COMMAND"
+
+    GerritProjects.empty.extractProject(what, accessPath) shouldBe empty
+  }
+
+  it should "extract nothing where command does not contain project" in {
+    val what = s"gerrit.command.not.targeting.any.project"
+    val accessPath = "SSH_COMMAND"
+
+    GerritProjects.empty.extractProject(what, accessPath) shouldBe empty
+  }
+
+  it should "extract nothing where command is LOGIN" in {
+    val what = s"LOGIN"
+    val accessPath = "SSH_COMMAND"
+
+    GerritProjects.empty.extractProject(what, accessPath) shouldBe empty
+  }
+
+  it should "extract a project from a gerrit query command, where the project is delimited by a space" in {
+    val project = "spdk/spdk.github.io"
+    val what = s"gerrit.query.--format.json.--current-patch-set.project:$project commit:7f4e8237114e957e727707ab6549d482d40d7c92 NOT is:draft"
+    val accessPath = "SSH_COMMAND"
+
+    val existingProjects = GerritProjects(Map(
+      project -> GerritProject(project)
+    ))
+
+    existingProjects.extractProject(what, accessPath) should contain(project)
+  }
+
+  it should "extract a project from a gerrit query command, where the project is wrapped by curly brackets" in {
+    val project = "dalihub/dali-scene-template"
+    val what = s"gerrit.query.project:{^$project}.--format=JSON.--all-approvals.--comments.--all-reviewers"
+    val accessPath = "SSH_COMMAND"
+
+    val existingProjects = GerritProjects(Map(
+      project -> GerritProject(project)
+    ))
+
+    existingProjects.extractProject(what, accessPath) should contain(project)
+  }
+
+  it should "extract a project from a gerrit query command, where the project is dot '.' joined with its arguments" in {
+    val project = "redhat-performance/quads"
+    val what = s"gerrit.query.--format=JSON.project:$project.status:open"
+    val accessPath = "SSH_COMMAND"
+
+    val existingProjects = GerritProjects(Map(
+      project -> GerritProject(project)
+    ))
+
+    existingProjects.extractProject(what, accessPath) should contain(project)
+  }
+
+  it should "extract a project from a gerrit index command" in {
+    val project = "Accumbo/iOS"
+    val what = s"gerrit.index.project.$project"
+    val accessPath = "SSH_COMMAND"
+
+    val existingProjects = GerritProjects(Map(
+      project -> GerritProject(project)
+    ))
+
+    existingProjects.extractProject(what, accessPath) should contain(project)
+  }
+
+  it should "extract a project included in parenthesis" in {
+
+    val specificProject = s"spdk/spdk.gerrithub.io"
+
+    val what = s"""gerrit.query.--format.json.--current-patch-set.(project:$specificProject) commit:"d2da91b1878f7b7ccd3f30f766ffa2f35cc2011d" NOT is:draft"""
+    val accessPath = "SSH_COMMAND"
+
+    GerritProjects.empty.extractProject(what, accessPath) should contain(specificProject)
+  }
+
+  it should "extract the most specific project when multiple projects names are substrings of each other" in {
+    val genericProject = "redhat-performance/quads"
+
+    val specificProject = s"$genericProject.github.com"
+
+    val what = s"gerrit.query.--format=JSON.project:$specificProject.status:open"
+    val accessPath = "SSH_COMMAND"
+
+    val existingProjects = GerritProjects(Map(
+      genericProject -> GerritProject(genericProject),
+      specificProject -> GerritProject(specificProject)
+    ))
+
+    existingProjects.extractProject(what, accessPath) should contain(specificProject)
+  }
+
+  it should "extract a project from a receive pack command" in {
+    val project = "att-comdev/prometheus-openstack-exporter"
+    val what = s"git-receive-pack./$project"
+    val accessPath = "SSH_COMMAND"
+
+    GerritProjects.empty.extractProject(what, accessPath) should contain(project)
+  }
+
+  it should "extract a project from an upload pack command" in {
+    val project = "dalihub/dali-toolkit"
+    val what = s"git-upload-pack./$project"
+    val accessPath = "SSH_COMMAND"
+
+    GerritProjects.empty.extractProject(what, accessPath) should contain(project)
+  }
+
+  it should "extract a project from a replication start command that has some arguments" in {
+    val project = "singh4java/springboot"
+    val what = s"replication.start.$project.--wait.--now"
+    val accessPath = "SSH_COMMAND"
+
+    GerritProjects.empty.extractProject(what, accessPath) should contain(project)
+  }
+
+  it should "extract a project from a replication start command that has no arguments" in {
+    val project = "singh4java/springboot.github.com"
+    val what = s"replication.start.$project"
+    val accessPath = "SSH_COMMAND"
+
+    GerritProjects.empty.extractProject(what, accessPath) should contain(project)
+  }
+
+  it should "extract nothing when replication start refers to no project" in {
+    val what = s"replication.start.--help"
+    val accessPath = "SSH_COMMAND"
+
+    GerritProjects.empty.extractProject(what, accessPath) shouldBe empty
+  }
+
+  it should "extract a project when is found in at the end of the command, even though the word 'project' is not part of the command" in {
+    val project = "Sonos-Inc/old97s"
+    val what = s"gerrit.query.--format=JSON.--current-patch-set.$project"
+    val accessPath = "SSH_COMMAND"
+
+    val existingProjects = GerritProjects(Map(
+      project -> GerritProject(project)
+    ))
+
+    existingProjects.extractProject(what, accessPath) should contain(project)
+  }
+
+  behavior of "REST API calls"
+
+  it should "extract the decoded project name from a /changes/ url" in {
+    val project = "true-wealth/b2b"
+    val what = s"/changes/${URLEncoder.encode(project, "UTF-8")}~425967/revisions/5/actions"
+    val accessPath = "REST_API"
+
+    GerritProjects.empty.extractProject(what, accessPath) should contain(project)
+  }
+
+  it should "extract the decoded project name from a /projects/ url" in {
+    val project = "VidScale/UDNTests"
+    val what = s"/projects/${URLEncoder.encode(project, "UTF-8")}"
+    val accessPath = "REST_API"
+
+    GerritProjects.empty.extractProject(what, accessPath) should contain(project)
+  }
+
+  behavior of "GIT HTTP calls"
+
+  it should "extract the project from an info ref upload pack target" in {
+    val project = "dushyantRathore/Jenkins_Test"
+    val what = s"https://review.gerrithub.io/$project/info/refs?service=git-upload-pack"
+    val accessPath = "GIT"
+
+    val existingProjects = GerritProjects(Map(
+      project -> GerritProject(project)
+    ))
+
+    existingProjects.extractProject(what, accessPath) should contain(project)
+  }
+
+  it should "extract the project from an info ref receive pack target" in {
+    val project = "dushyantRathore/Jenkins_Test"
+    val what = s"https://review.gerrithub.io/$project/info/refs?service=git-receive-pack"
+    val accessPath = "GIT"
+
+    val existingProjects = GerritProjects(Map(
+      project -> GerritProject(project)
+    ))
+
+    existingProjects.extractProject(what, accessPath) should contain(project)
+  }
+
+  it should "extract the project from an upload pack target" in {
+    val project = "att-comdev/cicd"
+    val what = s"https://review.gerrithub.io/$project/git-upload-pack"
+    val accessPath = "GIT"
+
+    val existingProjects = GerritProjects(Map(
+      project -> GerritProject(project)
+    ))
+
+    existingProjects.extractProject(what, accessPath) should contain(project)
+  }
+
+  it should "extract the project from a receive pack target" in {
+    val project = "att-comdev/cicd"
+    val what = s"https://review.gerrithub.io/$project/git-receive-pack"
+    val accessPath = "GIT"
+
+    val existingProjects = GerritProjects(Map(
+      project -> GerritProject(project)
+    ))
+
+    existingProjects.extractProject(what, accessPath) should contain(project)
+  }
+
+}
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala
index 4ec9210..cbf6762 100644
--- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala
@@ -1,4 +1,4 @@
-// Copyright (C) 2018 GerritForge Ltd
+// Copyright (C) 2019 GerritForge Ltd
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -186,5 +186,4 @@
 
     SparkExtractors.extractCommandArguments(what, accessPath) shouldBe empty
   }
-
 }
\ No newline at end of file