Provide Spark job to process AuditLogs events

This change provides a new ETL job that allows the processing of auditLog
events (typically generated by the audit-sl4j plugin).
Events are normalized, aggregated and then persisted to an elasticsearch
index. An indended use of this is to then expose this information through
visualization dashboards, such as kibana.

This work also refactors some general purpose code in order to be shared
by all the ETLs, such as connection to gerrit APIand time operations.

Feature: Issue 9866
Change-Id: I9d1fd9ae569de3b5c0d64cb625356fad3a62fd7a
diff --git a/README.md b/README.md
index eee18aa..a76cf23 100644
--- a/README.md
+++ b/README.md
@@ -20,7 +20,7 @@
 
 ## Git Commits
 
-Extracts and aggregate git commits data from Gerrit Projects.
+Extracts and aggregates git commits data from Gerrit Projects.
 
 Requires a [Gerrit 2.13.x](https://www.gerritcodereview.com/releases/README.md) or later
 with the [analytics](https://gerrit.googlesource.com/plugins/analytics/)
@@ -132,6 +132,58 @@
           gerritforge/gerrit-analytics-etl-gitcommits:latest
   ```
 
+## Audit Logs
+
+Extract, aggregate and persist auditLog entries produced by Gerrit via the [audit-sl4j](https://gerrit.googlesource.com/plugins/audit-sl4j/) plugin.
+AuditLog entries are an immutable trace of what happened on Gerrit and this ETL can leverage that to answer questions such as:
+
+- How is GIT incoming traffic distributed?
+- Git/SSH vs. Git/HTTP traffic
+- Git receive-pack vs. upload-pack
+- Top#10 users of receive-pack
+
+and many others questions related to the usage of Gerrit.
+
+Job can be launched, for example, with the following parameters:
+
+```bash
+spark-submit \
+    --class com.gerritforge.analytics.auditlog.job.Main \
+    --conf spark.es.nodes=es.mycompany.com \
+    --conf spark.es.port=9200 \
+    --conf spark.es.index.auto.create=true \
+    $JARS/analytics-etl-auditlog.jar \
+        --gerritUrl https://gerrit.mycompany.com \
+        --elasticSearchIndex gerrit \
+        --eventsPath /path/to/auditlogs \
+        --ignoreSSLCert false \
+        --since 2000-06-01 \
+        --until 2020-12-01
+```
+
+## Parameters
+
+* -u, --gerritUrl             - gerrit server URL (Required)
+* --username                  - Gerrit API Username (Optional)
+* --password                  - Gerrit API Password (Optional)
+* -i, --elasticSearchIndex    - elasticSearch index to persist data into (Required)
+* -p, --eventsPath            - path to a directory (or a file) containing auditLogs events. Supports also _.gz_ files. (Required)
+* -a, --eventsTimeAggregation - Events of the same type, produced by the same user will be aggregated with this time granularity: 'second', 'minute', 'hour', 'week', 'month', 'quarter'. (Optional) - Default: 'hour'
+* -k, --ignoreSSLCert         - Ignore SSL certificate validation (Optional) - Default: false
+* -s, --since                 - process only auditLogs occurred after (and including) this date (Optional)
+* -u, --until                 - process only auditLogs occurred before (and including) this date (Optional)
+
+### Build
+
+#### JAR
+To build the jar file, simply use
+
+`sbt analyticsETLAuditLog/assembly`
+
+#### Docker
+
+Not yet available
+
 # Development environment
 
 A docker compose file is provided to spin up an instance of Elastisearch with Kibana locally.
diff --git a/auditlog/src/main/resources/log4j.properties b/auditlog/src/main/resources/log4j.properties
new file mode 100644
index 0000000..dca6ccc
--- /dev/null
+++ b/auditlog/src/main/resources/log4j.properties
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Set the default spark-shell log level to WARN. When running the spark-shell, the
+# log level for this class is used to overwrite the root logger's log level, so that
+# the user can have different defaults for the shell and regular Spark apps.
+log4j.logger.org.apache.spark.repl.Main=WARN
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.spark_project.jetty=WARN
+log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR
+
+# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
+log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
\ No newline at end of file
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritUserIdentifiers.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritUserIdentifiers.scala
new file mode 100644
index 0000000..0c805fb
--- /dev/null
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/GerritUserIdentifiers.scala
@@ -0,0 +1,88 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.broadcast
+
+import com.gerritforge.analytics.common.api.GerritConnectivity
+import com.gerritforge.analytics.support.ops.GerritSourceOps._
+import com.typesafe.scalalogging.LazyLogging
+import org.json4s.native.JsonMethods._
+import org.json4s.{DefaultFormats, _}
+
+import scala.annotation.tailrec
+import scala.util.{Failure, Success, Try}
+import org.json4s.FieldSerializer._
+
+
+case class GerritUserIdentifiers(private val accounts: Map[GerriAccountId, GerritUserIdentifier]) {
+  def getIdentifier(accountId: GerriAccountId): GerritUserIdentifier = accounts.getOrElse(accountId, s"$accountId")
+}
+
+object GerritUserIdentifiers extends LazyLogging {
+
+  private case class GerritAccount(accountId: GerriAccountId, username: Option[String], email: Option[String], name: Option[String]) {
+    val getIdentifier: GerritUserIdentifier =
+      name.getOrElse(
+        email.getOrElse(
+          username.getOrElse(s"$accountId")
+        )
+      )
+  }
+
+  val empty = GerritUserIdentifiers(Map.empty[GerriAccountId, GerritUserIdentifier])
+
+  private val gerritAccountSerializer = FieldSerializer[GerritAccount](
+    deserializer=renameFrom(name="_account_id",newName="accountId")
+  )
+
+  implicit val formats: Formats = DefaultFormats + gerritAccountSerializer
+
+  def loadAccounts(gerritConnectivity: GerritConnectivity, gerritUrl: String): Try[GerritUserIdentifiers] = {
+
+    logger.debug(s"Loading gerrit accounts...")
+
+    val baseUrl = s"""$gerritUrl/accounts/?q=name:""&o=details"""
+
+    @tailrec
+    def loopThroughPages(more: Boolean, triedAcc: Try[GerritUserIdentifiers] = Success(empty)): Try[GerritUserIdentifiers] = {
+        if (!more)
+          triedAcc
+        else {
+          val acc = triedAcc.get
+
+          val url              = baseUrl + s"&start=${ acc.accounts.size}"
+          val accountsJsonPage = gerritConnectivity.getContentFromApi(url).dropGerritPrefix.mkString
+
+          logger.debug(s"Getting gerrit accounts - start: ${acc.accounts.size}")
+
+          val pageInfo = Try(parse(accountsJsonPage)).map { jsListAccounts =>
+            val more = (jsListAccounts \ "_more_accounts").extractOrElse(default = false)
+
+            val thisPageAccounts = jsListAccounts
+              .extract[List[GerritAccount]]
+              .map( ga => ga.accountId -> ga.getIdentifier)
+              .toMap
+
+            (more, acc.copy(accounts = acc.accounts ++ thisPageAccounts))
+          }
+
+          pageInfo match {
+            case Success((newMore, newGerritAccounts)) => loopThroughPages(newMore, Success(newGerritAccounts))
+            case Failure(exception) => loopThroughPages(more=false, Failure(exception))
+          }
+        }
+    }
+    loopThroughPages(more=true)
+  }
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/package.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/package.scala
new file mode 100644
index 0000000..4cc45a8
--- /dev/null
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/broadcast/package.scala
@@ -0,0 +1,20 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog
+
+package object broadcast {
+  type GerriAccountId = Int
+  type GerritUserIdentifier = String
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
index e444603..adda7f9 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
@@ -1,6 +1,53 @@
-package com.gerritforge.analytics.auditlog.job
-import com.typesafe.scalalogging.LazyLogging
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
 
-object Main extends App with LazyLogging {
-  // TODO: Implement job here
+package com.gerritforge.analytics.auditlog.job
+
+import com.gerritforge.analytics.auditlog.broadcast.GerritUserIdentifiers
+import com.gerritforge.analytics.auditlog.model.ElasticSearchFields._
+import com.gerritforge.analytics.auditlog.model._
+import com.gerritforge.analytics.auditlog.range.TimeRange
+import com.gerritforge.analytics.auditlog.spark.rdd.ops.SparkRDDOps._
+import com.gerritforge.analytics.auditlog.spark.session.ops.SparkSessionOps._
+import com.gerritforge.analytics.common.api.GerritConnectivity
+import com.gerritforge.analytics.spark.SparkApp
+import com.typesafe.scalalogging.LazyLogging
+import org.elasticsearch.spark.sql._
+
+object Main extends SparkApp with App with LazyLogging {
+  override val appName = "Gerrit AuditLog Analytics ETL"
+
+  CommandLineArguments(args) match {
+    case Some(config) =>
+      val tryUserIdentifiers = GerritUserIdentifiers.loadAccounts(
+        new GerritConnectivity(config.gerritUsername, config.gerritPassword, config.ignoreSSLCert.getOrElse(false)),
+        config.gerritUrl.get
+      )
+
+      if (tryUserIdentifiers.isFailure) {
+        logger.error("Error loading gerrit user identifiers", tryUserIdentifiers.failed.get)
+        sys.exit(1)
+      }
+
+      spark
+        .getEventsFromPath(config.eventsPath.get)
+        .transformEvents(tryUserIdentifiers.get, config.eventsTimeAggregation.get, TimeRange(config.since, config.until))
+        .saveToEs(s"${config.elasticSearchIndex.get}/$DOCUMENT_TYPE")
+
+    case None =>
+      logger.error("Could not parse command line arguments")
+      sys.exit(1)
+  }
 }
+
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditEvent.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditEvent.scala
index 0f4a848..622d68d 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditEvent.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditEvent.scala
@@ -1,100 +1,160 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package com.gerritforge.analytics.auditlog.model
+
+import com.gerritforge.analytics.auditlog.model.json.AuditLogFieldExtractors
 import org.json4s.jackson.Serialization.write
 import org.json4s.native.JsonMethods._
 import org.json4s.{DefaultFormats, _}
+import org.json4s.JsonDSL._
 
-import scala.util.{Failure, Success, Try}
+import scala.util.{Failure, Try}
 
-object AuditEvent {
-
-  implicit private val formats = DefaultFormats + AuditUUID.serializer + AccountId.serializer
-
-  def fromJsonString(json: String): Try[AuditEvent] = {
-    Try(parse(json)).flatMap { jsValueEvent =>
-      jsValueEvent \ "type" match {
-        case JString(eventType) if eventType == "HttpAuditEvent" =>
-          Success((jsValueEvent \ "event").camelizeKeys.extract[HttpAuditEvent])
-        case JString(eventType) if eventType == "ExtendedHttpAuditEvent" =>
-          Success((jsValueEvent \ "event").camelizeKeys.extract[ExtendedHttpAuditEvent])
-        case JString(eventType) if eventType == "SshAuditEvent" =>
-          Success((jsValueEvent \ "event").camelizeKeys.extract[SshAuditEvent])
-        case _ => Failure(new MappingException(s"Could not parse json into an audit event: $json"))
-      }
-    }
-  }
-
-  def toJsonString[T <: AuditEvent](auditEvent: T): String = {
-    compact(render(parse(write[T](auditEvent)).snakizeKeys))
-  }
-}
 
 sealed trait AuditEvent {
+  def auditType: String
+  def accessPath: Option[String]
   def sessionId: String
   def timeAtStart: Long
   def what: String
   def elapsed: Int
-  def uuid: AuditUUID
+  def uuid: String
+  def who: Option[Int]
+
+  implicit def formats: Formats = DefaultFormats
+
+  def toJsonString: String =
+    compact(
+      render(
+        parse(write[AuditEvent](this)).snakizeKeys merge JObject(JField("audit_type", auditType))
+      )
+    )
 }
 
 final case class SshAuditEvent(
+  accessPath: Option[String],
   sessionId: String,
-  who: Option[CurrentUser],
+  who: Option[Int],
   timeAtStart: Long,
   what: String,
   elapsed: Int,
-  uuid: AuditUUID
-) extends AuditEvent
+  uuid: String
+) extends AuditEvent {
+
+  override val auditType = SshAuditEvent.auditType
+
+}
+
+object SshAuditEvent extends AuditLogFieldExtractors {
+
+  val auditType = "SSH"
+
+  def apply(jsEvent: JValue): Try[SshAuditEvent] = Try {
+    SshAuditEvent(
+      accessPath(jsEvent),
+      sessionId(jsEvent),
+      who(jsEvent),
+      timeAtStart(jsEvent),
+      what(jsEvent),
+      elapsed(jsEvent),
+      uuid(jsEvent)
+    )
+  }
+}
 
 sealed trait BaseHttpAuditEvent extends AuditEvent {
   def httpMethod: String
   def httpStatus: Int
-  def who: CurrentUser
 }
 
 final case class HttpAuditEvent(
-    httpMethod: String,
-    httpStatus: Int,
-    sessionId: String,
-    who: CurrentUser,
-    timeAtStart: Long,
-    what: String,
-    elapsed: Int,
-    uuid: AuditUUID
-) extends BaseHttpAuditEvent
-
-final case class ExtendedHttpAuditEvent(
-    httpMethod: String,
-    httpStatus: Int,
-    sessionId: String,
-    who: CurrentUser,
-    timeAtStart: Long,
-    what: String,
-    elapsed: Int,
-    uuid: AuditUUID
-) extends BaseHttpAuditEvent
-
-final case class CurrentUser(
-  accessPath: String,
-  accountId: Option[AccountId]
-)
-
-final case class AccountId(id: Int)
-object AccountId {
-  val serializer = new CustomSerializer[AccountId]( _ =>
-      (
-        { case JObject(JField("id", JInt(id)) :: Nil) => AccountId(id.intValue()) },
-        { case a: AccountId => JInt(a.id) }
-      )
-  )
+  accessPath: Option[String],
+  httpMethod: String,
+  httpStatus: Int,
+  sessionId: String,
+  who: Option[Int],
+  timeAtStart: Long,
+  what: String,
+  elapsed: Int,
+  uuid: String
+) extends BaseHttpAuditEvent {
+  override val auditType = HttpAuditEvent.auditType
 }
 
-final case class AuditUUID(uuid: String)
+object HttpAuditEvent extends AuditLogFieldExtractors {
 
-object AuditUUID {
-  val serializer = new CustomSerializer[AuditUUID]( _ =>
-      (
-        { case JObject(JField("uuid", JString(uuid)) :: Nil) => AuditUUID(uuid) },
-        { case a: AuditUUID => JString(a.uuid) }
-      )
-  )
+  val auditType = "HTTP"
+
+  def apply(jsEvent: JValue): Try[HttpAuditEvent] = Try {
+    HttpAuditEvent(
+      accessPath(jsEvent),
+      httpMethod(jsEvent),
+      httpStatus(jsEvent),
+      sessionId(jsEvent),
+      who(jsEvent),
+      timeAtStart(jsEvent),
+      what(jsEvent),
+      elapsed(jsEvent),
+      uuid(jsEvent)
+    )
+  }
+}
+
+final case class ExtendedHttpAuditEvent(
+  accessPath: Option[String],
+  httpMethod: String,
+  httpStatus: Int,
+  sessionId: String,
+  who: Option[Int],
+  timeAtStart: Long,
+  what: String,
+  elapsed: Int,
+  uuid: String
+) extends BaseHttpAuditEvent {
+  override val auditType = ExtendedHttpAuditEvent.auditType
+}
+
+object ExtendedHttpAuditEvent extends AuditLogFieldExtractors {
+  val auditType = "EXTENDED_HTTP"
+
+  def apply(jsEvent: JValue): Try[ExtendedHttpAuditEvent] = Try {
+    ExtendedHttpAuditEvent(
+      accessPath(jsEvent),
+      httpMethod(jsEvent),
+      httpStatus(jsEvent),
+      sessionId(jsEvent),
+      who(jsEvent),
+      timeAtStart(jsEvent),
+      what(jsEvent),
+      elapsed(jsEvent),
+      uuid(jsEvent)
+    )
+  }
+}
+
+object AuditEvent {
+  implicit private val formats = DefaultFormats
+
+  def parseRaw(json: String): Try[AuditEvent] = {
+    Try(parse(json)).flatMap { jsValueEvent =>
+      jsValueEvent \ "type" match {
+        case JString(eventType) if eventType == "HttpAuditEvent" => HttpAuditEvent(jsValueEvent \ "event")
+        case JString(eventType) if eventType == "ExtendedHttpAuditEvent" => ExtendedHttpAuditEvent(jsValueEvent \"event")
+        case JString(eventType) if eventType == "SshAuditEvent" => SshAuditEvent(jsValueEvent \ "event")
+        case _ => Failure(new MappingException(s"Could not parse json into an audit event: $json"))
+      }
+    }
+  }
 }
\ No newline at end of file
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditLogETLConfig.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditLogETLConfig.scala
new file mode 100644
index 0000000..30e4c93
--- /dev/null
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/AuditLogETLConfig.scala
@@ -0,0 +1,29 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.model
+
+import java.time.LocalDate
+
+case class AuditLogETLConfig(
+  gerritUrl: Option[String] = None,
+  gerritUsername: Option[String] = None,
+  gerritPassword: Option[String] = None,
+  ignoreSSLCert: Option[Boolean] = None,
+  elasticSearchIndex: Option[String] = None,
+  since: Option[LocalDate] = None,
+  until: Option[LocalDate] = None,
+  eventsPath: Option[String] = None,
+  eventsTimeAggregation: Option[String] = Some("hour")
+)
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/CommandLineArguments.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/CommandLineArguments.scala
new file mode 100644
index 0000000..2ff6854
--- /dev/null
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/CommandLineArguments.scala
@@ -0,0 +1,67 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.model
+
+import java.time.LocalDate
+
+import com.gerritforge.analytics.support.ops.ReadsOps._
+import scopt.OptionParser
+
+object CommandLineArguments {
+
+  def apply(args: Array[String]): Option[AuditLogETLConfig] = {
+
+    val parser: OptionParser[AuditLogETLConfig] =
+      new scopt.OptionParser[AuditLogETLConfig]("spark-submit") {
+        head("spark-submit")
+        opt[String]('u', "gerritUrl") required () action { (input, c) =>
+          c.copy(gerritUrl = Some(input))
+        } text "gerrit server URL (Required)"
+
+        opt[String]("username") optional () action { (input, c) =>
+          c.copy(gerritUsername = Some(input))
+        } text "Gerrit API Username (Optional)"
+
+        opt[String]("password") optional () action { (input, c) =>
+          c.copy(gerritPassword = Some(input))
+        } text "Gerrit API Password (Optional)"
+
+        opt[String]('i', "elasticSearchIndex") required () action { (input, c) =>
+          c.copy(elasticSearchIndex = Some(input))
+        } text "elasticSearch index to persist data into (Required)"
+
+        opt[String]('p', "eventsPath") required () action { (input, c) =>
+          c.copy(eventsPath = Some(input))
+        } text "path to a directory (or a file) containing auditLogs events. Supports also '.gz' files. (Required)"
+
+        opt[String]('a', "eventsTimeAggregation") optional () action { (input, c) =>
+          c.copy(eventsTimeAggregation = Some(input))
+        } text "Events of the same type, produced by the same user will be aggregated with this time granularity: " +
+                 "'second', 'minute', 'hour', 'week', 'month', 'quarter'. (Optional) - Default: 'hour'"
+
+        opt[Boolean]('k', "ignoreSSLCert") optional () action { (input, c) =>
+          c.copy(ignoreSSLCert = Some(input))
+        } text "Ignore SSL certificate validation (Optional) - Default: false"
+
+        opt[LocalDate]('s', "since") optional () action { (input, c) => c.copy(since = Some(input))
+        } text "process only auditLogs occurred after (and including) this date, expressed as 'yyyy-MM-dd' (Optional)"
+
+        opt[LocalDate]('u', "until") optional () action { (input, c) => c.copy(until = Some(input))
+        } text "process only auditLogs occurred before (and including) this date, expressed as 'yyyy-MM-dd' (Optional)"
+      }
+
+      parser.parse(args, AuditLogETLConfig())
+  }
+}
\ No newline at end of file
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
new file mode 100644
index 0000000..2721074
--- /dev/null
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
@@ -0,0 +1,39 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.model
+
+object ElasticSearchFields {
+  val TIME_BUCKET_FIELD     = "events_time_bucket"
+  val COMMAND_FIELD         = "command"
+  val COMMAND_ARGS_FIELD    = "command_arguments"
+  val USER_IDENTIFIER_FIELD = "user_identifier"
+  val AUDIT_TYPE_FIELD      = "audit_type"
+  val ACCESS_PATH_FIELD     = "access_path"
+
+  val FACETING_FIELDS = List(
+    TIME_BUCKET_FIELD,
+    AUDIT_TYPE_FIELD,
+    USER_IDENTIFIER_FIELD,
+    ACCESS_PATH_FIELD,
+    COMMAND_FIELD,
+    COMMAND_ARGS_FIELD
+  )
+
+  val NUM_EVENTS_FIELD = "num_events"
+
+  val ALL_DOCUMENT_FIELDS: List[String] = FACETING_FIELDS :+ NUM_EVENTS_FIELD
+
+  val DOCUMENT_TYPE = "auditlog"
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/json/AuditLogFieldExtractors.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/json/AuditLogFieldExtractors.scala
new file mode 100644
index 0000000..73da840
--- /dev/null
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/json/AuditLogFieldExtractors.scala
@@ -0,0 +1,32 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.model.json
+
+import org.json4s.{DefaultFormats, JValue}
+
+trait AuditLogFieldExtractors {
+  implicit private val formats = DefaultFormats
+
+  val accessPath  = (jsEvent: JValue) => (jsEvent \ "who" \ "access_path").extractOpt[String]
+  val sessionId   = (jsEvent: JValue) => (jsEvent \ "session_id").extract[String]
+  val timeAtStart = (jsEvent: JValue) => (jsEvent \ "time_at_start").extract[Long]
+  val what        = (jsEvent: JValue) => (jsEvent \ "what").extract[String]
+  val elapsed     = (jsEvent: JValue) => (jsEvent \ "elapsed").extract[Int]
+  val uuid        = (jsEvent: JValue) => (jsEvent \ "uuid" \ "uuid").extract[String]
+  val who         = (jsEvent: JValue) => (jsEvent \ "who" \ "account_id" \ "id").extractOpt[Int]
+  val httpMethod  = (jsEvent: JValue) => (jsEvent \ "http_method").extract[String]
+  val httpStatus  = (jsEvent: JValue) => (jsEvent \ "http_status").extract[Int]
+
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/range/TimeRange.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/range/TimeRange.scala
new file mode 100644
index 0000000..83071f0
--- /dev/null
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/range/TimeRange.scala
@@ -0,0 +1,33 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.range
+
+import java.time.LocalDate
+
+import com.gerritforge.analytics.support.ops.implicits._
+import com.google.inject.Singleton
+
+@Singleton
+case class TimeRange(since: Option[LocalDate], until: Option[LocalDate]) {
+
+  private val maybeSinceMs: Option[Long] = since.map(_.atStartOfDay().convertToUTCEpochMillis)
+  private val maybeUntilMs: Option[Long] = until.map(_.atStartOfDay().convertToUTCEpochMillis)
+
+  def isWithin(timeMs: Long): Boolean = maybeSinceMs.forall(_ <= timeMs) && maybeUntilMs.forall(_ >= timeMs)
+}
+
+object TimeRange {
+  val always = TimeRange(None, None)
+}
\ No newline at end of file
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
new file mode 100644
index 0000000..1b06829
--- /dev/null
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
@@ -0,0 +1,39 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+
+// http://www.apache.org/licenses/LICENSE-2.0
+
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.spark
+
+import com.gerritforge.analytics.auditlog.broadcast.GerritUserIdentifiers
+import com.gerritforge.analytics.auditlog.model.AuditEvent
+import com.gerritforge.analytics.auditlog.model.ElasticSearchFields._
+import com.gerritforge.analytics.auditlog.range.TimeRange
+import com.gerritforge.analytics.auditlog.spark.dataframe.ops.DataFrameOps._
+import com.gerritforge.analytics.auditlog.spark.rdd.ops.SparkRDDOps._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+case class AuditLogsTransformer(gerritIdentifiers: GerritUserIdentifiers = GerritUserIdentifiers.empty)(implicit spark: SparkSession) {
+
+  private val broadcastUserIdentifiers = spark.sparkContext.broadcast(gerritIdentifiers)
+
+  def transform(auditEventsRDD: RDD[AuditEvent], timeAggregation: String, timeRange: TimeRange = TimeRange.always): DataFrame =
+    auditEventsRDD
+      .filterWithinRange(TimeRange(timeRange.since, timeRange.until))
+      .toJsonString
+      .toJsonTableDataFrame
+      .hydrateWithUserIdentifierColumn(USER_IDENTIFIER_FIELD, broadcastUserIdentifiers.value)
+      .withTimeBucketColum(TIME_BUCKET_FIELD, timeAggregation)
+      .withCommandColumns(COMMAND_FIELD, COMMAND_ARGS_FIELD)
+      .aggregateNumEventsColumn(NUM_EVENTS_FIELD, FACETING_FIELDS)
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
new file mode 100644
index 0000000..f1ddd9b
--- /dev/null
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
@@ -0,0 +1,62 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.spark.dataframe.ops
+
+import com.gerritforge.analytics.auditlog.broadcast.GerritUserIdentifiers
+import com.gerritforge.analytics.auditlog.spark.sql.udf.SparkExtractors.{extractCommandArgumentsUDF, extractCommandUDF}
+import org.apache.spark.sql.{Column, DataFrame}
+import org.apache.spark.sql.expressions.UserDefinedFunction
+import org.apache.spark.sql.functions.{udf, _}
+
+import scala.util.Try
+
+
+object DataFrameOps {
+
+  implicit class PimpedAuditLogDataFrame(dataFrame: DataFrame) {
+
+    private def hasColumn(path: String) = Try(dataFrame(path)).isSuccess
+
+    private def ifExistThenGetOrNull(column: String, targetColumn: => Column) = if (hasColumn(column)) targetColumn else lit(null)
+
+    def hydrateWithUserIdentifierColumn(userIdentifierCol: String, gerritAccounts: GerritUserIdentifiers): DataFrame = {
+      def extractIdentifier: UserDefinedFunction = udf((who: Int) => gerritAccounts.getIdentifier(who))
+
+      dataFrame.withColumn(userIdentifierCol, ifExistThenGetOrNull("who", extractIdentifier(col("who"))))
+
+    }
+
+    def withTimeBucketColum(timeBucketCol: String, timeAggregation: String): DataFrame = {
+      dataFrame
+        .withColumn(timeBucketCol, date_trunc(format=timeAggregation, from_unixtime(col("time_at_start").divide(1000))))
+    }
+
+    def withCommandColumns(commandCol: String, commandArgsCol: String): DataFrame = {
+      dataFrame
+        .withColumn(commandCol,
+          extractCommandUDF(
+            col("what"),
+            col("access_path"),
+            ifExistThenGetOrNull("http_method", col("http_method"))))
+        .withColumn(commandArgsCol, extractCommandArgumentsUDF(col("what"), col("access_path")))
+    }
+
+    def aggregateNumEventsColumn(numEventsCol: String, cols: List[String]): DataFrame = {
+      dataFrame.groupBy(cols.map(c => col(c)): _*)
+        .agg(count("*")
+        .alias(numEventsCol))
+    }
+  }
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/rdd/ops/SparkRDDOps.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/rdd/ops/SparkRDDOps.scala
new file mode 100644
index 0000000..4602aee
--- /dev/null
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/rdd/ops/SparkRDDOps.scala
@@ -0,0 +1,45 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.spark.rdd.ops
+
+import com.gerritforge.analytics.auditlog.broadcast.GerritUserIdentifiers
+import com.gerritforge.analytics.auditlog.model.AuditEvent
+import com.gerritforge.analytics.auditlog.range.TimeRange
+import com.gerritforge.analytics.auditlog.spark.AuditLogsTransformer
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+object SparkRDDOps {
+
+  implicit class PimpedAuditEventRDD(rdd: RDD[AuditEvent]) {
+    def filterWithinRange(timeRange: TimeRange): RDD[AuditEvent] =
+      rdd.filter(event => timeRange.isWithin(event.timeAtStart))
+
+    def toJsonString: RDD[String] = rdd.map(_.toJsonString)
+
+    def transformEvents(gerritUserIdentifiers: GerritUserIdentifiers, timeAggregation: String, timeRange: TimeRange)
+                       (implicit spark: SparkSession): DataFrame =
+      AuditLogsTransformer(gerritUserIdentifiers)
+        .transform(rdd, timeAggregation, timeRange)
+  }
+
+  implicit class PimpedStringRDD(rdd: RDD[String]) {
+    def toJsonTableDataFrame(implicit spark: SparkSession): DataFrame = {
+      import spark.implicits._
+      spark.sqlContext.read.json(rdd.toDS())
+    }
+  }
+
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/session/ops/SparkSessionOps.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/session/ops/SparkSessionOps.scala
new file mode 100644
index 0000000..0d221ee
--- /dev/null
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/session/ops/SparkSessionOps.scala
@@ -0,0 +1,31 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.spark.session.ops
+
+import com.gerritforge.analytics.auditlog.model.AuditEvent
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+
+object SparkSessionOps {
+
+  implicit class PimpedSparkSession(spark: SparkSession) {
+    def getEventsFromPath(path: String): RDD[AuditEvent] =
+      spark
+        .read
+        .textFile(path)
+        .rdd
+        .flatMap(auditString => AuditEvent.parseRaw(auditString).toOption)
+  }
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala
new file mode 100644
index 0000000..013213f
--- /dev/null
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractors.scala
@@ -0,0 +1,62 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.spark.sql.udf
+
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.spark.sql.expressions.UserDefinedFunction
+import org.apache.spark.sql.functions.udf
+
+import scala.util.matching.Regex
+
+case object SparkExtractors extends LazyLogging {
+  private val GERRIT_SSH_COMMAND = new Regex("""^(.+?)\.""", "capture")
+  private val GERRIT_SSH_COMMAND_ARGUMENTS = new Regex("""^.+?\.(.+)""", "capture")
+
+  private val GIT_COMMAND = new Regex(""".*(git-upload-pack|git-receive-pack)""", "capture")
+  private val GIT_SSH_COMMAND_ARGUMENTS = new Regex("""git-(?:upload|receive)-pack\.(.+)""", "capture")
+  private val GIT_HTTP_COMMAND_ARGUMENTS = new Regex("""(^http.*)""", "capture")
+
+  val FAILED_SSH_AUTH = "FAILED_SSH_AUTH"
+
+  private def extractOrElse(rx: Regex, target: String, default: String): String = extractGroup(rx, target).getOrElse(default)
+
+  private def extractGroup(rx: Regex, target: String): Option[String] = rx.findAllMatchIn(target).toList.headOption.map(_.group("capture"))
+
+  def extractCommand(what: String, accessPath: String, httpMethod: String = null): String = accessPath match {
+    case "SSH_COMMAND"          => extractOrElse(GERRIT_SSH_COMMAND, what, what)
+    case "GIT"                  => extractOrElse(GIT_COMMAND, what, what)
+    case "REST_API"|"UNKNOWN"   => Option(httpMethod).getOrElse(what)
+    case "JSON_RPC"             => what
+    case null if what == "AUTH" => FAILED_SSH_AUTH
+    case unexpected             =>
+      logger.warn(s"Unexpected access path '$unexpected' encountered when extracting command from '$what'")
+      what
+  }
+
+  def extractCommandUDF: UserDefinedFunction = udf((rawCommand: String, accessPath: String, httpMethod: String) => extractCommand(rawCommand, accessPath, httpMethod))
+
+  def extractCommandArguments(what: String, accessPath: String): Option[String] = accessPath match {
+    case "SSH_COMMAND"          => extractGroup(GERRIT_SSH_COMMAND_ARGUMENTS, what)
+    case "GIT"                  => Option(extractGroup(GIT_SSH_COMMAND_ARGUMENTS, what).getOrElse(extractOrElse(GIT_HTTP_COMMAND_ARGUMENTS, what, null)))
+    case "REST_API"|"UNKNOWN"   => Some(what)
+    case "JSON_RPC"             => None
+    case null if what == "AUTH" => None
+    case unexpected             =>
+      logger.warn(s"Unexpected access path '$unexpected' encountered when extracting command arguments from '$what'")
+      None
+  }
+
+  def extractCommandArgumentsUDF: UserDefinedFunction = udf((rawCommand: String, accessPath: String) => extractCommandArguments(rawCommand, accessPath))
+}
\ No newline at end of file
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
new file mode 100644
index 0000000..dcc9f92
--- /dev/null
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/AuditLogsTransformerSpec.scala
@@ -0,0 +1,195 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+
+// http://www.apache.org/licenses/LICENSE-2.0
+
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog
+import java.sql
+
+import com.gerritforge.analytics.SparkTestSupport
+import com.gerritforge.analytics.auditlog.broadcast.GerritUserIdentifiers
+import com.gerritforge.analytics.auditlog.model.{ElasticSearchFields, HttpAuditEvent, SshAuditEvent}
+import com.gerritforge.analytics.auditlog.spark.AuditLogsTransformer
+import com.gerritforge.analytics.support.ops.CommonTimeOperations._
+import org.apache.spark.sql.Row
+import org.scalatest.{FlatSpec, Matchers}
+
+class AuditLogsTransformerSpec extends FlatSpec with Matchers with SparkTestSupport with TestFixtures {
+  behavior of "AuditLogsTransformer"
+
+  it should "process an anonymous http audit entry" in {
+    val events = Seq(anonymousHttpAuditEvent)
+
+    val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
+
+    dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
+
+    val expectedAggregatedCount = 1
+    dataFrame.collect should contain only Row(
+        AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
+        HttpAuditEvent.auditType,
+        null, // no user identifier
+        anonymousHttpAuditEvent.accessPath.get,
+        GIT_UPLOAD_PACK,
+        anonymousHttpAuditEvent.what,
+        expectedAggregatedCount
+    )
+  }
+
+  it should "process an authenticated http audit entry where gerrit account couldn't be identified" in {
+    val events = Seq(authenticatedHttpAuditEvent)
+
+    val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
+
+    dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
+
+    val expectedAggregatedCount = 1
+    dataFrame.collect should contain only Row(
+      AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
+      HttpAuditEvent.auditType,
+      s"${authenticatedHttpAuditEvent.who.get}",
+      authenticatedHttpAuditEvent.accessPath.get,
+      GIT_UPLOAD_PACK,
+      authenticatedHttpAuditEvent.what,
+      expectedAggregatedCount
+    )
+  }
+
+  it should "process an authenticated http audit entry where gerrit account could be identified" in {
+    val events = Seq(authenticatedHttpAuditEvent)
+    val gerritUserIdentifier = "Antonio Barone"
+
+    val dataFrame =
+      AuditLogsTransformer(GerritUserIdentifiers(Map(authenticatedHttpAuditEvent.who.get -> gerritUserIdentifier)))
+        .transform(sc.parallelize(events), timeAggregation="hour")
+
+    dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
+
+    val expectedAggregatedCount = 1
+    dataFrame.collect should contain only Row(
+      AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
+      HttpAuditEvent.auditType,
+      gerritUserIdentifier,
+      authenticatedHttpAuditEvent.accessPath.get,
+      GIT_UPLOAD_PACK,
+      authenticatedHttpAuditEvent.what,
+      expectedAggregatedCount
+    )
+  }
+
+  it should "process an SSH audit entry" in {
+    val events = Seq(sshAuditEvent)
+
+    val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
+
+    dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
+
+    val expectedAggregatedCount = 1
+    dataFrame.collect should contain only Row(
+      AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
+      SshAuditEvent.auditType,
+      s"${sshAuditEvent.who.get}",
+      sshAuditEvent.accessPath.get,
+      SSH_GERRIT_COMMAND,
+      SSH_GERRIT_COMMAND_ARGUMENTS,
+      expectedAggregatedCount
+    )
+  }
+
+  it should "group ssh events from the same user together, if they fall within the same time bucket (hour)" in {
+    val events = Seq(sshAuditEvent, sshAuditEvent.copy(timeAtStart = timeAtStart + 1000))
+
+    val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
+
+    dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
+
+    val expectedAggregatedCount = 2
+    dataFrame.collect should contain only Row(
+      AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
+      SshAuditEvent.auditType,
+      s"${sshAuditEvent.who.get}",
+      sshAuditEvent.accessPath.get,
+      SSH_GERRIT_COMMAND,
+      SSH_GERRIT_COMMAND_ARGUMENTS,
+      expectedAggregatedCount
+    )
+  }
+
+  it should "group ssh events from different users separately, even if they fall within the same time bucket (hour)" in {
+    val user2Id = sshAuditEvent.who.map(_ + 1)
+    val events = Seq(sshAuditEvent, sshAuditEvent.copy(who=user2Id))
+
+    val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
+
+    dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
+
+    val expectedAggregatedCount = 1
+    dataFrame.collect should contain allOf (
+      Row(
+        AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
+        SshAuditEvent.auditType,
+        s"${sshAuditEvent.who.get}",
+        sshAuditEvent.accessPath.get,
+        SSH_GERRIT_COMMAND,
+        SSH_GERRIT_COMMAND_ARGUMENTS,
+        expectedAggregatedCount
+      ),
+      Row(
+        AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
+        SshAuditEvent.auditType,
+        s"${user2Id.get}",
+        sshAuditEvent.accessPath.get,
+        SSH_GERRIT_COMMAND,
+        SSH_GERRIT_COMMAND_ARGUMENTS,
+        expectedAggregatedCount
+      )
+    )
+  }
+
+  it should "group different event types separately, event if they fall within the same time bucket (hour)" in {
+    val events = Seq(sshAuditEvent, authenticatedHttpAuditEvent.copy(timeAtStart = sshAuditEvent.timeAtStart + 1000))
+
+    val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
+
+    dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
+
+    val expectedSshAggregatedCount = 1
+    val expectedHttpAggregatedCount = 1
+    dataFrame.collect should contain allOf (
+      Row(
+        AuditLogsTransformerSpec.epochMillisToNearestHour(events.head.timeAtStart),
+        SshAuditEvent.auditType,
+        s"${sshAuditEvent.who.get}",
+        sshAuditEvent.accessPath.get,
+        SSH_GERRIT_COMMAND,
+        SSH_GERRIT_COMMAND_ARGUMENTS,
+        expectedSshAggregatedCount
+      ),
+      Row(
+        AuditLogsTransformerSpec.epochMillisToNearestHour(events.last.timeAtStart),
+        HttpAuditEvent.auditType,
+        s"${authenticatedHttpAuditEvent.who.get}",
+        authenticatedHttpAuditEvent.accessPath.get,
+        GIT_UPLOAD_PACK,
+        authenticatedHttpAuditEvent.what,
+        expectedHttpAggregatedCount
+      )
+    )
+  }
+}
+
+object AuditLogsTransformerSpec {
+  def epochMillisToNearestHour(epochMillis: Long): sql.Timestamp = {
+    val millisInOneHour = 3600000
+    epochToSqlTimestampOps(epochMillis - (epochMillis % millisInOneHour))
+  }
+}
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/TestFixtures.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/TestFixtures.scala
new file mode 100644
index 0000000..02dad6a
--- /dev/null
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/TestFixtures.scala
@@ -0,0 +1,44 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+
+// http://www.apache.org/licenses/LICENSE-2.0
+
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog
+import com.gerritforge.analytics.auditlog.model.{HttpAuditEvent, SshAuditEvent}
+
+trait TestFixtures {
+
+  val userId = 123
+  val sessionId = "someSessionId"
+  val gitAccessPath = "GIT"
+  val timeAtStart = 1544802407000L
+  val elapsed = 12
+  val uuid = "audit:5f10fea5-35d1-4252-b86f-99db7a9b549b"
+
+  val GIT_UPLOAD_PACK = "git-upload-pack"
+
+  val httpMethod = "GET"
+  val httpStatus = 200
+
+  val httpWhat=s"https://review.gerrithub.io/Mirantis/tcp-qa/$GIT_UPLOAD_PACK"
+
+  val anonymousHttpAuditEvent = HttpAuditEvent(Some(gitAccessPath), httpMethod, httpStatus, sessionId, None, timeAtStart, httpWhat, elapsed, uuid)
+  val authenticatedHttpAuditEvent: HttpAuditEvent = anonymousHttpAuditEvent.copy(who=Some(userId))
+
+  val sshAccessPath  = "SSH_COMMAND"
+  val SSH_GERRIT_COMMAND = "gerrit"
+  val SSH_GERRIT_COMMAND_ARGUMENTS = "stream-events.-s.patchset-created.-s.change-restored.-s.comment-added"
+
+  val sshWhat        = s"$SSH_GERRIT_COMMAND.$SSH_GERRIT_COMMAND_ARGUMENTS"
+
+  val sshAuditEvent = SshAuditEvent(Some(sshAccessPath), sessionId, Some(userId), timeAtStart, sshWhat, elapsed, uuid)
+}
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/model/AuditEventSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/model/AuditEventSpec.scala
index 82c9988..33911f0 100644
--- a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/model/AuditEventSpec.scala
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/model/AuditEventSpec.scala
@@ -1,4 +1,19 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package com.gerritforge.analytics.auditlog.model
+
 import org.json4s.JsonAST.JValue
 import org.json4s.JsonDSL._
 import org.json4s.MappingException
@@ -9,17 +24,17 @@
 
 class AuditEventSpec extends FlatSpec with Matchers with Inside {
 
-  behavior of "fromJsonString"
+  behavior of "parseRaw"
 
   "parsing a string that is not json" should "result in a ParseException failure" in {
     val someJson = """this_is_not_a_valid_json"""
-    AuditEvent.fromJsonString(someJson).failure.exception shouldBe a[ParseException]
+    AuditEvent.parseRaw(someJson).failure.exception shouldBe a[ParseException]
   }
 
   "parsing a json not representing an audit event log" should "result in a MappingException failure" in {
     val someJson = """{"some": "json", "not": "expected"}"""
 
-    AuditEvent.fromJsonString(someJson).failure.exception shouldBe a[MappingException]
+    AuditEvent.parseRaw(someJson).failure.exception shouldBe a[MappingException]
   }
 
   "A json representing an http audit event log" should "be parsed into a success of HttpAuditEvent" in {
@@ -29,7 +44,7 @@
     val sessionId  = "1r7ywi4vd3jk410dv60pvd19vk"
     val accountId  = 1009124
     val accessPath = "GIT"
-    val timeAtStart       = 1542240322369L
+    val timeAtStart = 1542240322369L
     val what       = "https://review.gerrithub.io/Mirantis/tcp-qa/git-upload-pack"
     val elapsed    = 12
     val auditUUID = "audit:fe4cff68-d094-474a-9d97-502270b0b2e6"
@@ -67,17 +82,19 @@
          |}
        """.stripMargin
 
-    val triedEvent = AuditEvent.fromJsonString(jsonEvent)
+    val triedEvent = AuditEvent.parseRaw(jsonEvent)
     inside (triedEvent.success.value) {
-      case HttpAuditEvent(gotHttpMethod, gotHttpStatus, gotSessionId, gotWho, gotTimeAtStart, gotWhat, gotElapsed, gotUUID) =>
+      case HttpAuditEvent(gotAccessPath,gotHttpMethod,gotHttpStatus,gotSessionId,gotWho,gotTimeAtStart,gotWhat,gotElapsed,gotUUID
+      ) =>
+        gotSessionId   shouldBe sessionId
+        gotWho         should contain(accountId)
+        gotTimeAtStart shouldBe timeAtStart
         gotHttpMethod  shouldBe httpMethod
         gotHttpStatus  shouldBe httpStatus
-        gotSessionId   shouldBe sessionId
-        gotWho         shouldBe CurrentUser(accessPath = accessPath, accountId = Some(AccountId(accountId)))
-        gotTimeAtStart shouldBe timeAtStart
         gotWhat        shouldBe what
         gotElapsed     shouldBe elapsed
-        gotUUID.uuid   shouldBe auditUUID
+        gotUUID        shouldBe auditUUID
+        gotAccessPath  should contain(accessPath)
     }
   }
 
@@ -116,14 +133,15 @@
          |}
        """.stripMargin
 
-    inside (AuditEvent.fromJsonString(jsonEvent).success.value) {
-      case SshAuditEvent(gotSessionId, gotWho, gotTimeAtStart, gotWhat, gotElapsed, gotUUID) =>
+    inside (AuditEvent.parseRaw(jsonEvent).success.value) {
+      case SshAuditEvent(gotAccessPath, gotSessionId, gotWho, gotTimeAtStart, gotWhat, gotElapsed, gotUUID) =>
         gotSessionId   shouldBe sessionId
-        gotWho         shouldBe Some(CurrentUser(accessPath = accessPath, accountId = Some(AccountId(accountId))))
+        gotWho         should contain(accountId)
         gotTimeAtStart shouldBe timeAtStart
         gotWhat        shouldBe what
         gotElapsed     shouldBe elapsed
-        gotUUID.uuid   shouldBe auditUUID
+        gotUUID        shouldBe auditUUID
+        gotAccessPath  should contain(accessPath)
     }
   }
 
@@ -153,14 +171,15 @@
          |}
        """.stripMargin
 
-    inside (AuditEvent.fromJsonString(jsonEvent).success.value) {
-      case SshAuditEvent(gotSessionId, gotWho, gotTimeAtStart, gotWhat, gotElapsed, gotUUID) =>
+    inside (AuditEvent.parseRaw(jsonEvent).success.value) {
+      case SshAuditEvent(gotAccessPath, gotSessionId, gotWho, gotTimeAtStart, gotWhat, gotElapsed, gotUUID) =>
         gotSessionId   shouldBe sessionId
-        gotWho         shouldBe None
+        gotWho         shouldBe empty
         gotTimeAtStart shouldBe timeAtStart
         gotWhat        shouldBe what
         gotElapsed     shouldBe elapsed
-        gotUUID.uuid   shouldBe auditUUID
+        gotUUID        shouldBe auditUUID
+        gotAccessPath  shouldBe empty
     }
   }
 
@@ -202,16 +221,17 @@
          |}
        """.stripMargin
 
-    inside (AuditEvent.fromJsonString(jsonEvent).success.value) {
-      case ExtendedHttpAuditEvent(gotHttpMethod, gotHttpStatus, gotSessionId, gotWho, gotTimeAtStart, gotWhat, gotElapsed, gotUUID) =>
-        gotHttpMethod  shouldBe httpMethod
-        gotHttpStatus  shouldBe httpStatus
+    inside (AuditEvent.parseRaw(jsonEvent).success.value) {
+      case ExtendedHttpAuditEvent(gotAccessPath,gotHttpMethod,gotHttpStatus,gotSessionId,gotWho,gotTimeAtStart,gotWhat,gotElapsed,gotUUID) =>
         gotSessionId   shouldBe sessionId
-        gotWho         shouldBe CurrentUser(accessPath = accessPath, accountId = Some(AccountId(accountId)))
+        gotWho         should   contain(accountId)
         gotTimeAtStart shouldBe timeAtStart
+        gotHttpMethod  shouldBe httpMethod
         gotWhat        shouldBe what
+        gotHttpStatus  shouldBe httpStatus
         gotElapsed     shouldBe elapsed
-        gotUUID.uuid   shouldBe auditUUID
+        gotUUID        shouldBe auditUUID
+        gotAccessPath  should   contain(accessPath)
     }
   }
 
@@ -222,81 +242,80 @@
     val httpMethod = "GET"
     val httpStatus = 200
     val sessionId = "someSessionId"
-    val who = CurrentUser(accessPath = "UNKNOWN", accountId = None)
+    val accessPath = "GIT"
     val timeAtStart = 1000L
+    val elapsed = 12
     val what="https://review.gerrithub.io/Mirantis/tcp-qa/git-upload-pack"
-    val elapsed = 22
-    val uuid = AuditUUID("audit:5f10fea5-35d1-4252-b86f-99db7a9b549b")
+    val uuid = "audit:5f10fea5-35d1-4252-b86f-99db7a9b549b"
 
-    val event = HttpAuditEvent(httpMethod, httpStatus, sessionId, who, timeAtStart, what, elapsed, uuid)
+    val event = HttpAuditEvent(
+      Some(accessPath), httpMethod, httpStatus, sessionId, None, timeAtStart, what, elapsed, uuid)
 
     val expectedJson: JValue =
+      ("session_id" -> sessionId) ~
+      ("access_path" -> accessPath) ~
+      ("time_at_start" -> timeAtStart) ~
       ("http_method" -> httpMethod) ~
-        ("http_status" -> httpStatus) ~
-        ("session_id" -> sessionId) ~
-        ("who" ->
-          ("access_path" -> who.accessPath)
-        ) ~
-        ("time_at_start" -> timeAtStart) ~
-        ("what" -> what) ~
-        ("elapsed" -> elapsed) ~
-        ("uuid" -> uuid.uuid)
+      ("http_status" -> httpStatus) ~
+      ("what" -> what) ~
+      ("elapsed" -> elapsed) ~
+      ("uuid" -> uuid) ~
+      ("audit_type" -> HttpAuditEvent.auditType)
 
-    parse(AuditEvent.toJsonString(event)) shouldBe expectedJson
+    parse(event.toJsonString) shouldBe expectedJson
   }
 
   "an ExtendedHttpAuditEvent" should "be serializable into json" in {
 
     val httpMethod = "GET"
     val httpStatus = 200
+    val accessPath = "REST_API"
     val sessionId = "someSessionId"
     val accountId = 123
-    val who = CurrentUser(accessPath = "/config/server/info", accountId = Some(AccountId(accountId)))
     val timeAtStart = 1000L
     val what="/config/server/info"
     val elapsed = 22
-    val uuid = AuditUUID("audit:5f10fea5-35d1-4252-b86f-99db7a9b549b")
+    val uuid = "audit:5f10fea5-35d1-4252-b86f-99db7a9b549b"
 
-    val event = ExtendedHttpAuditEvent(httpMethod, httpStatus, sessionId, who, timeAtStart, what, elapsed, uuid)
+    val event = ExtendedHttpAuditEvent(Some(accessPath), httpMethod, httpStatus, sessionId, Some(accountId), timeAtStart, what, elapsed, uuid)
 
     val expectedJson: JValue =
+      ("session_id" -> sessionId) ~
+      ("who" -> accountId) ~
+      ("access_path" -> accessPath) ~
+      ("time_at_start" -> timeAtStart) ~
       ("http_method" -> httpMethod) ~
       ("http_status" -> httpStatus) ~
-      ("session_id" -> sessionId) ~
-      ("who" ->
-        ("access_path" -> who.accessPath) ~
-        ("account_id" -> accountId)
-      ) ~
-      ("time_at_start" -> timeAtStart) ~
       ("what" -> what) ~
       ("elapsed" -> elapsed) ~
-      ("uuid" -> uuid.uuid)
+      ("uuid" -> uuid) ~
+      ("audit_type" -> ExtendedHttpAuditEvent.auditType)
 
-    parse(AuditEvent.toJsonString(event)) shouldBe expectedJson
+    parse(event.toJsonString) shouldBe expectedJson
   }
 
   "an SshAuditEvent" should "be serializable into json" in {
+    val sshAuditEvent = "SSH"
     val sessionId   = "2adc5bef"
     val accountId   = 1009124
     val accessPath  = "SSH_COMMAND"
     val timeAtStart = 1542240322369L
     val what        = "gerrit.stream-events.-s.patchset-created.-s.change-restored.-s.comment-added"
     val elapsed     = 12
-    val auditUUID   = "audit:dd74e098-9260-4720-9143-38a0a0a5e500"
+    val uuid   = "audit:dd74e098-9260-4720-9143-38a0a0a5e500"
 
-    val event = SshAuditEvent(sessionId, Some(CurrentUser(accessPath, Some(AccountId(accountId)))), timeAtStart, what, elapsed, AuditUUID(auditUUID))
+    val event = SshAuditEvent(Some(accessPath), sessionId, Some(accountId), timeAtStart, what, elapsed, uuid)
 
     val expectedJson: JValue =
       ("session_id" -> sessionId) ~
-      ("who" ->
-        ("access_path" -> accessPath) ~
-        ("account_id" -> accountId)
-      ) ~
+      ("who" -> accountId) ~
+      ("access_path" -> accessPath) ~
       ("time_at_start" -> timeAtStart) ~
       ("what" -> what) ~
       ("elapsed" -> elapsed) ~
-      ("uuid" -> auditUUID)
+      ("uuid" -> uuid) ~
+      ("audit_type" -> sshAuditEvent)
 
-    parse(AuditEvent.toJsonString(event)) shouldBe expectedJson
+    parse(event.toJsonString) shouldBe expectedJson
   }
 }
\ No newline at end of file
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/range/TimeRangeSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/range/TimeRangeSpec.scala
new file mode 100644
index 0000000..db4f264
--- /dev/null
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/range/TimeRangeSpec.scala
@@ -0,0 +1,71 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.range
+
+import java.time.{Instant, LocalDate}
+import com.gerritforge.analytics.support.ops.implicits._
+
+import org.scalatest.{FlatSpec, Inside, Matchers}
+import TimeRangeSpec._
+
+class TimeRangeSpec extends FlatSpec with Matchers with Inside {
+  behavior of "isWithin"
+
+  it should "always return true when time range is boundless" in {
+    val range = TimeRange(None, None)
+
+    range.isWithin(nowMs) shouldBe true
+  }
+
+  it should "return true when 'until' is unbounded and time is greater than 'since'" in {
+    val range = TimeRange(Some(yesterday), None)
+
+    range.isWithin(nowMs) shouldBe true
+  }
+
+  it should "return false when 'until' is unbounded and time is less than 'since'" in {
+    val range = TimeRange(Some(now), None)
+
+    range.isWithin(yesterdayMs) shouldBe false
+  }
+
+  it should "return true when 'since' is unbounded and time is less than 'until'" in {
+    val range = TimeRange(None, Some(tomorrow))
+
+    range.isWithin(nowMs) shouldBe true
+  }
+
+  it should "return false when 'since' is unbounded and time is greater than 'until'" in {
+    val range = TimeRange(None, Some(now))
+
+    range.isWithin(tomorrowMs) shouldBe false
+  }
+
+  it should "return true when time is within bounded range" in {
+    val range = TimeRange(Some(yesterday), Some(tomorrow))
+
+    range.isWithin(nowMs) shouldBe true
+  }
+}
+
+object TimeRangeSpec {
+  val yesterday: LocalDate = LocalDate.now().minusDays(1)
+  val tomorrow: LocalDate = LocalDate.now().plusDays(1)
+  val now: LocalDate = LocalDate.now()
+  val nowMs: Long = Instant.now().toEpochMilli
+  val yesterdayMs = yesterday.atStartOfDay().convertToUTCEpochMillis
+  val tomorrowMs = tomorrow.atStartOfDay().convertToUTCEpochMillis
+
+}
\ No newline at end of file
diff --git a/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala
new file mode 100644
index 0000000..4ec9210
--- /dev/null
+++ b/auditlog/src/test/scala/com/gerritforge/analytics/auditlog/spark/sql/udf/SparkExtractorsSpec.scala
@@ -0,0 +1,190 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.auditlog.spark.sql.udf
+
+import org.scalatest.{FlatSpec, Matchers}
+
+class SparkExtractorsSpec extends FlatSpec with Matchers {
+  behavior of "extractCommand"
+
+  it should "extract gerrit command from gerrit ssh command" in {
+    val what = s"gerrit.command.-f.with.some-r.options"
+    val accessPath = "SSH_COMMAND"
+
+    SparkExtractors.extractCommand(what, accessPath) shouldBe "gerrit"
+  }
+
+  it should "extract replication command from replication ssh command" in {
+    val what = s"replication.start.GerritCodeReview/*"
+    val accessPath = "SSH_COMMAND"
+
+    SparkExtractors.extractCommand(what, accessPath) shouldBe "replication"
+  }
+
+  it should "extract 'LOGIN' command over SSH" in {
+    val what = s"LOGIN"
+    val accessPath = "SSH_COMMAND"
+
+    SparkExtractors.extractCommand(what, accessPath) shouldBe what
+  }
+
+  it should "extract 'LOGOUT' command over SSH" in {
+    val what = s"LOGOUT"
+    val accessPath = "SSH_COMMAND"
+
+    SparkExtractors.extractCommand(what, accessPath) shouldBe what
+  }
+
+  it should "extract GIT 'git-upload-pack' command over SSH" in {
+    val what = s"git-upload-pack./spdk/spdk"
+    val accessPath = "GIT"
+
+    SparkExtractors.extractCommand(what, accessPath) shouldBe "git-upload-pack"
+  }
+
+  it should "extract GIT 'git-receive-pack' command over SSH" in {
+    val what = s"git-receive-pack./spdk/spdk"
+    val accessPath = "GIT"
+
+    SparkExtractors.extractCommand(what, accessPath) shouldBe "git-receive-pack"
+  }
+
+  it should "extract GIT 'git-upload-pack' command over HTTP" in {
+    val what = s"https://review.gerrithub.io/rhos-infra/patch-components/git-upload-pack"
+    val accessPath = "GIT"
+
+    SparkExtractors.extractCommand(what, accessPath) shouldBe "git-upload-pack"
+  }
+
+  it should "extract GIT 'git-receive-pack' command over HTTP" in {
+    val what = s"https://review.gerrithub.io/spdk/spdk/info/refs?service=git-receive-pack"
+    val accessPath = "GIT"
+
+    SparkExtractors.extractCommand(what, accessPath) shouldBe "git-receive-pack"
+  }
+
+  it should "extract 'LOGOUT' command over GIT" in {
+    val what = s"LOGOUT"
+    val accessPath = "GIT"
+
+    SparkExtractors.extractCommand(what, accessPath) shouldBe what
+  }
+
+  it should "extract http method for rest api calls" in {
+    val what = s"/config/server/version"
+    val accessPath = "REST_API"
+    val httpMethod = "GET"
+
+    SparkExtractors.extractCommand(what, accessPath, httpMethod) shouldBe httpMethod
+  }
+
+  it should "extract http method for unknown access path" in {
+    val what = s"/config/server/version"
+    val accessPath = "UNKNOWN"
+    val httpMethod = "PUT"
+
+    SparkExtractors.extractCommand(what, accessPath, httpMethod) shouldBe httpMethod
+  }
+
+  it should "extract 'what' when access path is unexpected value" in {
+    val what = s"any"
+    val accessPath = "unexpected"
+
+    SparkExtractors.extractCommand(what, accessPath) shouldBe what
+  }
+
+  it should "extract 'what' when access path is JSON_RPC" in {
+    val what = s"ProjectAdminService.changeProjectAccess"
+    val accessPath = "JSON_RPC"
+
+    SparkExtractors.extractCommand(what, accessPath) shouldBe what
+  }
+
+  it should "extract failed SSH authentication when no access path is provided and what is AUTH" in {
+    val what = s"AUTH"
+    val accessPath = null
+
+    SparkExtractors.extractCommand(what, accessPath) shouldBe SparkExtractors.FAILED_SSH_AUTH
+  }
+
+  behavior of "extractCommandArguments"
+
+  it should "extract SSH command arguments" in {
+    val sshArguments = "stream-events.-s.patchset-created.-s.change-restored.-s.comment-added"
+    val what = s"gerrit.$sshArguments"
+    val accessPath = "SSH_COMMAND"
+
+    SparkExtractors.extractCommandArguments(what, accessPath) should contain(sshArguments)
+  }
+
+  it should "extract GIT command arguments when in the form git-upload-pack.<gitArguments>" in {
+    val gitArguments = "/spdk/spdk.github.io"
+    val what = s"git-upload-pack.$gitArguments"
+    val accessPath = "GIT"
+
+    SparkExtractors.extractCommandArguments(what, accessPath) should contain(gitArguments)
+  }
+
+  it should "extract GIT command arguments when in the form git-receive-pack.<gitArguments>" in {
+    val gitArguments = "/spdk/spdk.github.io"
+    val what = s"git-receive-pack.$gitArguments"
+    val accessPath = "GIT"
+
+    SparkExtractors.extractCommandArguments(what, accessPath) should contain(gitArguments)
+  }
+
+  it should "extract GIT commands over HTTP endpoint as-is" in {
+    val what = "https://review.gerrithub.io/redhat-openstack/infrared.git/git-upload-pack"
+    val accessPath = "GIT"
+
+    SparkExtractors.extractCommandArguments(what, accessPath) should contain(what)
+  }
+
+  it should "extract empty arguments for 'LOGOUT' commands" in {
+    val what = "LOGOUT"
+    val accessPath = "GIT"
+
+    SparkExtractors.extractCommandArguments(what, accessPath) shouldBe empty
+  }
+
+  it should "extract REST API endpoint as-is" in {
+    val what = "/changes/ffilz%2Fnfs-ganesha~372229/comments"
+    val accessPath = "REST_API"
+
+    SparkExtractors.extractCommandArguments(what, accessPath) should contain(what)
+  }
+
+  it should "extract empty arguments for a failed ssh authorization" in {
+    val what = s"AUTH"
+    val accessPath = null
+
+    SparkExtractors.extractCommandArguments(what, accessPath) shouldBe empty
+  }
+
+  it should "extract empty arguments a JSON _RPC access path" in {
+    val what = s"some_command"
+    val accessPath = "JSON_RPC"
+
+    SparkExtractors.extractCommandArguments(what, accessPath) shouldBe empty
+  }
+
+  it should "extract empty arguments for an unexpected access path" in {
+    val what = s"any"
+    val accessPath = "unexpected"
+
+    SparkExtractors.extractCommandArguments(what, accessPath) shouldBe empty
+  }
+
+}
\ No newline at end of file
diff --git a/build.sbt b/build.sbt
index 51c0448..1eace17 100644
--- a/build.sbt
+++ b/build.sbt
@@ -22,7 +22,7 @@
         .cmd(s"/bin/sh", s"$entryPointBase/gerrit-analytics-etl-gitcommits.sh")
     }
   )
-  .dependsOn(common)
+  .dependsOn(common % "compile->compile;test->test")
 
 lazy val analyticsETLAuditLog = (project in file("auditlog"))
   .enablePlugins(GitVersioning)
@@ -37,7 +37,7 @@
       baseDockerfile(projectName="auditlog", artifact, artifactTargetPath=s"$entryPointBase/${name.value}-assembly.jar")
     }
   )
-  .dependsOn(common)
+  .dependsOn(common % "compile->compile;test->test")
 
 lazy val root = (project in file("."))
   .disablePlugins(AssemblyPlugin)
diff --git a/common/src/main/scala/com/gerritforge/analytics/common/api/TrustAll.scala b/common/src/main/scala/com/gerritforge/analytics/common/api/TrustAll.scala
index eb9394b..8d68514 100644
--- a/common/src/main/scala/com/gerritforge/analytics/common/api/TrustAll.scala
+++ b/common/src/main/scala/com/gerritforge/analytics/common/api/TrustAll.scala
@@ -1,3 +1,17 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package com.gerritforge.analytics.common.api
 
 import java.security.cert.X509Certificate
diff --git a/common/src/main/scala/com/gerritforge/analytics/spark/SparkApp.scala b/common/src/main/scala/com/gerritforge/analytics/spark/SparkApp.scala
new file mode 100644
index 0000000..b860100
--- /dev/null
+++ b/common/src/main/scala/com/gerritforge/analytics/spark/SparkApp.scala
@@ -0,0 +1,26 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+
+// http://www.apache.org/licenses/LICENSE-2.0
+
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.spark
+import org.apache.spark.sql.SparkSession
+
+trait SparkApp {
+
+  def appName: String
+
+  implicit lazy val spark: SparkSession = SparkSession
+    .builder()
+    .appName(appName)
+    .getOrCreate()
+}
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOps.scala b/common/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
similarity index 87%
rename from gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOps.scala
rename to common/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
index 5628ceb..94c53f1 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOps.scala
+++ b/common/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
@@ -12,21 +12,17 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.gerritforge.analytics.gitcommits.support.ops
+package com.gerritforge.analytics.support.ops
 
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.time._
 import java.time.format.DateTimeFormatter
-import java.time.{LocalDateTime, ZoneOffset}
+import java.util.TimeZone
 
-package AnalyticsTimeOps {
+import scala.util.Try
 
-  import java.sql.Timestamp
-  import java.text.SimpleDateFormat
-  import java.time.{Instant, LocalDate, OffsetDateTime}
-  import java.util.TimeZone
-
-  import scala.util.Try
-
-  object AnalyticsDateTimeFormater {
+  object AnalyticsDateTimeFormatter {
 
     val yyyy_MM_dd_HHmmss_SSSSSSSSS: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS")
     val yyyy_MM_dd: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
@@ -44,7 +40,7 @@
   }
 
   object CommonTimeOperations {
-    def nowEpoch: Long = Instant.now().getEpochSecond
+    def nowEpoch: Long = Instant.now().atOffset(ZoneOffset.UTC).toInstant.getEpochSecond
 
     def epochToSqlTimestampOps(epoch: Long) = new Timestamp(epoch)
 
@@ -81,4 +77,3 @@
 
     implicit def nullableStringToOption(nullableString: String): Option[String] = Option(nullableString)
   }
-}
diff --git a/common/src/main/scala/com/gerritforge/analytics/support/ops/GerritSourceOps.scala b/common/src/main/scala/com/gerritforge/analytics/support/ops/GerritSourceOps.scala
new file mode 100644
index 0000000..27d0280
--- /dev/null
+++ b/common/src/main/scala/com/gerritforge/analytics/support/ops/GerritSourceOps.scala
@@ -0,0 +1,28 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.support.ops
+
+import scala.io.Source
+
+object GerritSourceOps {
+
+  implicit class PimpedSource(source: Source) {
+    def dropGerritPrefix: Iterator[Char] = {
+      val GERRIT_PREFIX = ")]}'\n"
+      source.drop(GERRIT_PREFIX.length)
+    }
+  }
+
+}
diff --git a/common/src/main/scala/com/gerritforge/analytics/support/ops/ReadsOps.scala b/common/src/main/scala/com/gerritforge/analytics/support/ops/ReadsOps.scala
new file mode 100644
index 0000000..2e0ed66
--- /dev/null
+++ b/common/src/main/scala/com/gerritforge/analytics/support/ops/ReadsOps.scala
@@ -0,0 +1,37 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.support.ops
+
+import java.time.LocalDate
+
+import scopt.Read
+import scopt.Read.reads
+
+import scala.util.control.NonFatal
+
+object ReadsOps {
+
+  implicit val localDateRead: Read[LocalDate] = reads { dateStr =>
+    val cliDateFormat = AnalyticsDateTimeFormatter.yyyy_MM_dd
+    try {
+      import com.gerritforge.analytics.support.ops.implicits._
+      dateStr.parseStringToLocalDate(cliDateFormat).get
+    } catch {
+      case NonFatal(e) =>
+        throw new IllegalArgumentException(
+          s"Invalid date '$dateStr' expected format is '${cliDateFormat}'", e)
+    }
+  }
+}
diff --git a/gitcommits/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala b/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
similarity index 95%
rename from gitcommits/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
rename to common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
index 5f89e78..18002ff 100644
--- a/gitcommits/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
+++ b/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
@@ -1,11 +1,11 @@
-// Copyright (C) 2017 GerritForge Ltd
+// Copyright (C) 2018 GerritForge Ltd
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // You may obtain a copy of the License at
-//
+
 // http://www.apache.org/licenses/LICENSE-2.0
-//
+
 // Unless required by applicable law or agreed to in writing, software
 // distributed under the License is distributed on an "AS IS" BASIS,
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -30,4 +30,4 @@
   override protected def afterAll() = {
     spark.close()
   }
-}
\ No newline at end of file
+}
diff --git a/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOpsSpec.scala b/common/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala
similarity index 78%
rename from gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOpsSpec.scala
rename to common/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala
index c8fc602..8c2d456 100644
--- a/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOpsSpec.scala
+++ b/common/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala
@@ -12,11 +12,10 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.gerritforge.analytics.gitcommits.support.ops
+package com.gerritforge.analytics.support.ops
 
 import java.time.{LocalDate, LocalDateTime, ZoneOffset}
 
-import com.gerritforge.analytics.gitcommits.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
 import org.scalatest.{FlatSpec, Matchers}
 
 class AnalyticsTimeOpsSpec extends FlatSpec with Matchers {
@@ -30,9 +29,9 @@
         .toEpochMilli
 
     val stringDate = "2018-01-01 12:00:00.000000000"
-    val dateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd_HHmmss_SSSSSSSSS
+    val dateFormat = AnalyticsDateTimeFormatter.yyyy_MM_dd_HHmmss_SSSSSSSSS
 
-    import AnalyticsTimeOps.implicits._
+    import com.gerritforge.analytics.support.ops.implicits._
 
     stringDate.parseStringToUTCEpoch(dateFormat).get should equal(epochValueUTC)
   }
@@ -41,18 +40,18 @@
       LocalDate.of(2018, 1, 1)
 
     val stringDate = "2018-01-01"
-    val dateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd
+    val dateFormat = AnalyticsDateTimeFormatter.yyyy_MM_dd
 
-    import AnalyticsTimeOps.implicits._
+    import com.gerritforge.analytics.support.ops.implicits._
 
     stringDate.parseStringToLocalDate(dateFormat).get should equal(utcLocalDate)
   }
 
   "String parser - An incorrect string a given format" should "return None" in {
     val stringDate = "2018-01-01 12:00:00.000000000"
-    val dateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd
+    val dateFormat = AnalyticsDateTimeFormatter.yyyy_MM_dd
 
-    import AnalyticsTimeOps.implicits._
+    import com.gerritforge.analytics.support.ops.implicits._
     stringDate.parseStringToUTCEpoch(dateFormat) should equal(None)
   }
 
@@ -64,7 +63,7 @@
         .toInstant.toEpochMilli
 
     val yyyyMMddHHStr = "2018010112"
-    AnalyticsDateTimeFormater.yyyyMMddHH.format(epochValueUTC) should equal(yyyyMMddHHStr)
+    AnalyticsDateTimeFormatter.yyyyMMddHH.format(epochValueUTC) should equal(yyyyMMddHHStr)
   }
 
   it should "convert to the correct strings - yyyyMMdd" in {
@@ -75,7 +74,7 @@
         .toInstant.toEpochMilli
 
     val yyyyMMddStr = "20180101"
-    AnalyticsDateTimeFormater.yyyyMMdd.format(epochValueUTC) should equal(yyyyMMddStr)
+    AnalyticsDateTimeFormatter.yyyyMMdd.format(epochValueUTC) should equal(yyyyMMddStr)
   }
 
   it should "convert to the correct strings - yyyyMM" in {
@@ -86,7 +85,7 @@
         .toInstant.toEpochMilli
 
     val yyyyMMStr = "201801"
-    AnalyticsDateTimeFormater.yyyyMM.format(epochValueUTC) should equal(yyyyMMStr)
+    AnalyticsDateTimeFormatter.yyyyMM.format(epochValueUTC) should equal(yyyyMMStr)
   }
 
   it should "convert to the correct strings - yyyy" in {
@@ -97,7 +96,7 @@
         .toInstant.toEpochMilli
 
     val yyyyStr = "2018"
-    AnalyticsDateTimeFormater.yyyy.format(epochValueUTC) should equal(yyyyStr)
+    AnalyticsDateTimeFormatter.yyyy.format(epochValueUTC) should equal(yyyyStr)
   }
 
   "UTC conversion" should "check date operations return always UTC" in {
@@ -108,7 +107,7 @@
     val etcDateTime = dateTime.atOffset(ZoneOffset.ofHours(9))
     val utcDateTime = dateTime.atOffset(ZoneOffset.UTC)
 
-    import AnalyticsTimeOps.implicits._
+    import com.gerritforge.analytics.support.ops.implicits._
     dateTime.convertToUTCEpochMillis should equal(utcDateTime.toInstant.toEpochMilli)
     dateTime.convertToUTCEpochMillis should not equal (etcDateTime.toInstant.toEpochMilli)
 
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/AggregationStrategy.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/AggregationStrategy.scala
index 197367a..b56babe 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/AggregationStrategy.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/AggregationStrategy.scala
@@ -17,7 +17,7 @@
 import java.text.DateFormat
 import java.time.LocalDateTime
 
-import com.gerritforge.analytics.gitcommits.support.ops.AnalyticsTimeOps.{AnalyticsDateTimeFormater, CommonTimeOperations}
+import com.gerritforge.analytics.support.ops.{AnalyticsDateTimeFormatter, CommonTimeOperations}
 
 import scala.util.Try
 
@@ -60,25 +60,25 @@
   }
 
   object aggregateByEmailAndHour extends EmailAndTimeBasedAggregation {
-    val dateFormat = AnalyticsDateTimeFormater.yyyyMMddHH
+    val dateFormat = AnalyticsDateTimeFormatter.yyyyMMddHH
   }
 
   object aggregateByEmailAndDay extends EmailAndTimeBasedAggregation {
-    val dateFormat = AnalyticsDateTimeFormater.yyyyMMdd
+    val dateFormat = AnalyticsDateTimeFormatter.yyyyMMdd
 
     override def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts =
       super.decomposeTimeOfAggregatedEvent(event).copy(hour = 0)
   }
 
   object aggregateByEmailAndMonth extends EmailAndTimeBasedAggregation {
-    val dateFormat = AnalyticsDateTimeFormater.yyyyMM
+    val dateFormat = AnalyticsDateTimeFormatter.yyyyMM
 
     override def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts =
       super.decomposeTimeOfAggregatedEvent(event).copy(day = 0, hour = 0)
   }
 
   object aggregateByEmailAndYear extends EmailAndTimeBasedAggregation {
-    val dateFormat = AnalyticsDateTimeFormater.yyyy
+    val dateFormat = AnalyticsDateTimeFormatter.yyyy
 
     override def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts =
       super.decomposeTimeOfAggregatedEvent(event).copy(month = 0, day = 0, hour = 0)
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
index de9ae01..996a553 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
@@ -14,33 +14,25 @@
 
 package com.gerritforge.analytics.gitcommits.job
 
+
 import java.time.LocalDate
 
 import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations.NotParsableJsonEvent
-import com.gerritforge.analytics.gitcommits.engine.events.{
-  AggregationStrategy,
-  EventParser,
-  GerritJsonEvent
-}
-import com.gerritforge.analytics.gitcommits.model.{
-  GerritEndpointConfig,
-  GerritProject,
-  GerritProjectsSupport
-}
-import com.gerritforge.analytics.gitcommits.support.ops.AnalyticsTimeOps
-import com.gerritforge.analytics.gitcommits.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
+import com.gerritforge.analytics.gitcommits.engine.events.{AggregationStrategy, EventParser, GerritJsonEvent}
+import com.gerritforge.analytics.gitcommits.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
+import com.gerritforge.analytics.spark.SparkApp
+import com.gerritforge.analytics.support.ops.ReadsOps._
 import com.typesafe.scalalogging.LazyLogging
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SparkSession}
-import scopt.Read.reads
-import scopt.{OptionParser, Read}
+import scopt.OptionParser
 
 import scala.io.Codec
-import scala.util.control.NonFatal
 import scala.util.{Failure, Success}
 
-object Main extends App with Job with LazyLogging with FetchRemoteProjects {
+object Main extends App with SparkApp with Job with LazyLogging with FetchRemoteProjects {
+  override val appName = "Gerrit GitCommits Analytics ETL"
 
   private val fileExists: String => Either[String, Unit] = { path =>
     if (!new java.io.File(path).exists) {
@@ -50,18 +42,6 @@
     }
   }
 
-  implicit val localDateRead: Read[LocalDate] = reads { dateStr =>
-    val cliDateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd
-    try {
-      import AnalyticsTimeOps.implicits._
-      dateStr.parseStringToLocalDate(cliDateFormat).get
-    } catch {
-      case NonFatal(e) =>
-        throw new IllegalArgumentException(
-          s"Invalid date '$dateStr' expected format is '${cliDateFormat}'")
-    }
-  }
-
   private val cliOptionParser: OptionParser[GerritEndpointConfig] =
     new scopt.OptionParser[GerritEndpointConfig]("scopt") {
       head("scopt", "3.x")
@@ -117,10 +97,6 @@
 
   cliOptionParser.parse(args, GerritEndpointConfig()) match {
     case Some(config) =>
-      implicit val spark: SparkSession = SparkSession
-        .builder()
-        .appName("Gerrit Analytics ETL")
-        .getOrCreate()
 
       implicit val _: GerritEndpointConfig = config
 
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala
index b949fa9..99de66d 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala
@@ -18,8 +18,7 @@
 import java.time.{LocalDate, ZoneOffset}
 
 import com.gerritforge.analytics.common.api.GerritConnectivity
-import com.gerritforge.analytics.gitcommits.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
-
+import com.gerritforge.analytics.support.ops.AnalyticsDateTimeFormatter
 case class GerritEndpointConfig(
     baseUrl: Option[String] = None,
     prefix: Option[String] = None,
@@ -51,7 +50,7 @@
 
   @transient
   private lazy val format: DateTimeFormatter =
-    AnalyticsDateTimeFormater.yyyy_MM_dd.withZone(ZoneOffset.UTC)
+    AnalyticsDateTimeFormatter.yyyy_MM_dd.withZone(ZoneOffset.UTC)
   val queryString = Seq(
     "since"            -> since.map(format.format),
     "until"            -> until.map(format.format),
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritProject.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritProject.scala
index 5dda011..31b6a3a 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritProject.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritProject.scala
@@ -14,13 +14,13 @@
 
 package com.gerritforge.analytics.gitcommits.model
 
+import com.gerritforge.analytics.support.ops.GerritSourceOps._
 import com.google.gerrit.extensions.api.GerritApi
 import com.google.inject.Inject
 import org.json4s.native.JsonMethods.parse
 
 import scala.io.Source
 import scala.util.Try
-
 case class GerritProject(id: String, name: String)
 
 class GerritProjectsSupport @Inject()(gerritApi: GerritApi) {
@@ -36,11 +36,8 @@
 
 object GerritProjectsSupport {
 
-  val GERRIT_PREFIX = ")]}'\n"
-  private val GERRIT_PREFIX_LEN = GERRIT_PREFIX.length
-
   def parseJsonProjectListResponse(jsonSource: Source): Seq[GerritProject] = {
-    parse(jsonSource.drop(GERRIT_PREFIX_LEN).mkString)
+    parse(jsonSource.dropGerritPrefix.mkString)
       .values
       .asInstanceOf[Map[String, Map[String, String]]]
       .mapValues(projectAttributes => projectAttributes("id"))
diff --git a/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformationsSpec.scala b/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformationsSpec.scala
index 043bd5b..df3d1f5 100644
--- a/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformationsSpec.scala
+++ b/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformationsSpec.scala
@@ -13,12 +13,8 @@
 // limitations under the License.
 
 package com.gerritforge.analytics.gitcommits.engine.events
-
 import com.gerritforge.analytics.SparkTestSupport
-import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations.{
-  CommitInfo,
-  UserActivitySummary
-}
+import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations.{CommitInfo, UserActivitySummary}
 import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations.NotParsableJsonEvent
 import org.apache.spark.rdd.RDD
 import org.scalatest.{Inside, Matchers, WordSpec}
diff --git a/project/SharedSettings.scala b/project/SharedSettings.scala
index 9461d32..7a6f5b2 100644
--- a/project/SharedSettings.scala
+++ b/project/SharedSettings.scala
@@ -1,3 +1,17 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 import Versions._
 import com.typesafe.sbt.GitPlugin.autoImport.git
 import sbt.Keys._