Add Git commits Analytics namespace

This will allow a better code layout when adding new sources of analytics data (i.e.:audit logs).
Package name has been extended with the name of the data source: "com.gerritforge.analytics.<source>"

Feature: Issue 9983
Change-Id: I3a8d912ab34550f2a21f51f66d5ca86671f5dfe2
diff --git a/README.md b/README.md
index 2cdf5d2..44c9229 100644
--- a/README.md
+++ b/README.md
@@ -9,6 +9,7 @@
 
 ```bash
 bin/spark-submit \
+    --class com.gerritforge.analytics.gitcommits.job \
     --conf spark.es.nodes=es.mycompany.com \
     $JARS/analytics-etl.jar \
     --since 2000-06-01 \
diff --git a/build.sbt b/build.sbt
index 2a2c22e..3da5061 100644
--- a/build.sbt
+++ b/build.sbt
@@ -18,7 +18,7 @@
 
 val pluginName = "analytics-etl"
 
-val mainClassPackage = "com.gerritforge.analytics.job.Main"
+val mainClassPackage = "com.gerritforge.analytics.gitcommits.job.Main"
 val dockerRepository = "spark-gerrit-analytics-etl"
 
 libraryDependencies ++= Seq(
@@ -87,8 +87,8 @@
 packageOptions in(Compile, packageBin) += Package.ManifestAttributes(
   ("Gerrit-ApiType", "plugin"),
   ("Gerrit-PluginName", pluginName),
-  ("Gerrit-Module", "com.gerritforge.analytics.plugin.Module"),
-  ("Gerrit-SshModule", "com.gerritforge.analytics.plugin.SshModule"),
+  ("Gerrit-Module", "com.gerritforge.analytics.gitcommits.plugin.Module"),
+  ("Gerrit-SshModule", "com.gerritforge.analytics.gitcommits.plugin.SshModule"),
   ("Implementation-Title", "Analytics ETL plugin"),
   ("Implementation-URL", "https://gerrit.googlesource.com/plugins/analytics-etl")
 )
diff --git a/scripts/gerrit-analytics-etl.sh b/scripts/gerrit-analytics-etl.sh
index 30cf2cf..5a0de69 100755
--- a/scripts/gerrit-analytics-etl.sh
+++ b/scripts/gerrit-analytics-etl.sh
@@ -10,7 +10,7 @@
 # Optional
 ES_PORT="${ES_PORT:-9200}"
 SPARK_JAR_PATH="${SPARK_JAR_PATH:-/app/analytics-etl-assembly.jar}"
-SPARK_JAR_CLASS="${SPARK_JAR_CLASS:-com.gerritforge.analytics.job.Main}"
+SPARK_JAR_CLASS="${SPARK_JAR_CLASS:-com.gerritforge.analytics.gitcommits.job.Main}"
 
 echo "* Elastic Search Host: $ES_HOST:$ES_PORT"
 echo "* Gerrit URL: $GERRIT_URL"
diff --git a/src/main/scala/com/gerritforge/analytics/api/gerritApiConnectivity.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/api/gerritApiConnectivity.scala
similarity index 97%
rename from src/main/scala/com/gerritforge/analytics/api/gerritApiConnectivity.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/api/gerritApiConnectivity.scala
index 8417f1d..ece68d1 100644
--- a/src/main/scala/com/gerritforge/analytics/api/gerritApiConnectivity.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/api/gerritApiConnectivity.scala
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.gerritforge.analytics.api
+package com.gerritforge.analytics.gitcommits.api
 
 import java.net.URL
 
@@ -21,7 +21,6 @@
 
 import scala.io.{BufferedSource, Codec, Source}
 
-
 sealed trait HttpBasicAuthentication {
 
   val BASIC = "Basic"
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala
similarity index 97%
rename from src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala
index a0733bc..ada6a12 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala
@@ -12,14 +12,14 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.gerritforge.analytics.engine
+package com.gerritforge.analytics.gitcommits.engine
 
 import java.io.IOException
 import java.time.format.DateTimeFormatter
 import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
 
-import com.gerritforge.analytics.api.GerritConnectivity
-import com.gerritforge.analytics.model._
+import com.gerritforge.analytics.gitcommits.api.GerritConnectivity
+import com.gerritforge.analytics.gitcommits.model._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.functions.{udf, _}
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
@@ -41,7 +41,6 @@
     sourceURL.toArray.flatMap(getProjectJsonContributorsArrayFromUrl(project, _, gerritApiConnection))
   }
 
-
   def filterEmptyStrings(urlSource: BufferedSource): Iterator[String] =
     urlSource.getLines()
       .filterNot(_.trim.isEmpty)
@@ -163,7 +162,6 @@
 
   private def emailToDomainUdf = udf(emailToDomain(_: String))
 
-
   implicit class PimpedRDDProjectContributionSource(val projectsAndUrls: RDD[ProjectContributionSource]) extends AnyVal {
 
     def fetchRawContributors(gerritApiConnection: GerritConnectivity)(implicit spark: SparkSession): RDD[(String, String)] = {
@@ -192,5 +190,4 @@
       .toDF("project", "json")
       .transformCommitterInfo
   }
-
 }
diff --git a/src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/AggregationStrategy.scala
similarity index 94%
rename from src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/AggregationStrategy.scala
index 95aa113..197367a 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/AggregationStrategy.scala
@@ -12,12 +12,12 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.gerritforge.analytics.engine.events
+package com.gerritforge.analytics.gitcommits.engine.events
 
 import java.text.DateFormat
 import java.time.LocalDateTime
 
-import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.{AnalyticsDateTimeFormater, CommonTimeOperations}
+import com.gerritforge.analytics.gitcommits.support.ops.AnalyticsTimeOps.{AnalyticsDateTimeFormater, CommonTimeOperations}
 
 import scala.util.Try
 
@@ -84,4 +84,4 @@
       super.decomposeTimeOfAggregatedEvent(event).copy(month = 0, day = 0, hour = 0)
   }
 
-}
\ No newline at end of file
+}
diff --git a/src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformations.scala
similarity index 97%
rename from src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformations.scala
index 121b98c..07d9bc1 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformations.scala
@@ -12,18 +12,17 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.gerritforge.analytics.engine.events
+package com.gerritforge.analytics.gitcommits.engine.events
 
 import java.time.{LocalDateTime, ZoneOffset}
 
-import com.gerritforge.analytics.engine.GerritAnalyticsTransformations.{CommitInfo, UserActivitySummary}
+import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations.{ CommitInfo, UserActivitySummary }
 import com.typesafe.scalalogging.LazyLogging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
 
 import scala.util.{Failure, Success}
 
-
 object GerritEventsTransformations extends LazyLogging {
 
   case class NotParsableJsonEvent(source: String, failureDescription: String)
@@ -35,6 +34,7 @@
   }
 
   implicit class PimpedGerritJsonEventRDD[T <: GerritJsonEvent](val self: RDD[T]) extends AnyVal {
+
     /**
       * Returns the UTC date of the earliest event in the RDD
       *
@@ -97,7 +97,6 @@
     }
   }
 
-
   private def addCommitSummary(summaryTemplate: UserActivitySummary, changes: Iterable[ChangeMergedEvent]): UserActivitySummary = {
     // We assume all the changes are consistent on the is_merge filter
     summaryTemplate.copy(
diff --git a/src/main/scala/com/gerritforge/analytics/engine/events/model.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/model.scala
similarity index 98%
rename from src/main/scala/com/gerritforge/analytics/engine/events/model.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/model.scala
index 15956a6..fe86d01 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/events/model.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/model.scala
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.gerritforge.analytics.engine.events
+package com.gerritforge.analytics.gitcommits.engine.events
 
 import com.typesafe.scalalogging.LazyLogging
 import scala.util.{Failure, Success, Try}
@@ -23,12 +23,10 @@
   def fromJson(json: String): Try[GerritJsonEvent]
 }
 
-
 object EventParser extends GerritJsonEventParser with LazyLogging {
   val ChangeMergedEventType = "change-merged"
   val RefUpdatedEventType = "ref-updated"
 
-
   override def fromJson(json: String): Try[GerritJsonEvent] = {
     import org.json4s._
     import org.json4s.native.JsonMethods._
@@ -93,7 +91,6 @@
                            by: GerritAccount
                          )
 
-
 case class GerritPatchSet(
                            number: String,
                            revision: String,
@@ -111,7 +108,6 @@
   def isDraft : Boolean = draft.getOrElse(false)
 }
 
-
 sealed trait GerritJsonEvent {
   def `type`: String
 
@@ -120,7 +116,6 @@
   def account: GerritAccount
 }
 
-
 sealed trait GerritRepositoryModifiedEvent extends GerritJsonEvent {
   def modifiedProject: String
 
@@ -161,7 +156,6 @@
 
 case class GerritRefUpdate(project: String, refName: String, oldRev: String, newRev: String)
 
-
 case class RefUpdatedEvent(
                             refUpdate: GerritRefUpdate,
                             submitter: Option[GerritAccount],
@@ -176,4 +170,4 @@
   def modifiedRef: String = refUpdate.refName
 
   def newRev: String = refUpdate.newRev
-}
\ No newline at end of file
+}
diff --git a/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
new file mode 100644
index 0000000..16a376a
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
@@ -0,0 +1,263 @@
+// Copyright (C) 2017 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.gitcommits.job
+
+import java.time.LocalDate
+
+import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations.NotParsableJsonEvent
+import com.gerritforge.analytics.gitcommits.engine.events.{
+  AggregationStrategy,
+  EventParser,
+  GerritJsonEvent
+}
+import com.gerritforge.analytics.gitcommits.model.{
+  GerritEndpointConfig,
+  GerritProject,
+  GerritProjectsSupport
+}
+import com.gerritforge.analytics.gitcommits.support.ops.AnalyticsTimeOps
+import com.gerritforge.analytics.gitcommits.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import scopt.Read.reads
+import scopt.{OptionParser, Read}
+
+import scala.io.Codec
+import scala.util.control.NonFatal
+import scala.util.{Failure, Success}
+
+object Main extends App with Job with LazyLogging with FetchRemoteProjects {
+
+  private val fileExists: String => Either[String, Unit] = { path =>
+    if (!new java.io.File(path).exists) {
+      Left(s"ERROR: Path '$path' doesn't exists!")
+    } else {
+      Right()
+    }
+  }
+
+  implicit val localDateRead: Read[LocalDate] = reads { dateStr =>
+    val cliDateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd
+    try {
+      import AnalyticsTimeOps.implicits._
+      dateStr.parseStringToLocalDate(cliDateFormat).get
+    } catch {
+      case NonFatal(e) =>
+        throw new IllegalArgumentException(
+          s"Invalid date '$dateStr' expected format is '${cliDateFormat}'")
+    }
+  }
+
+  private val cliOptionParser: OptionParser[GerritEndpointConfig] =
+    new scopt.OptionParser[GerritEndpointConfig]("scopt") {
+      head("scopt", "3.x")
+      opt[String]('u', "url") optional () action { (x, c) =>
+        c.copy(baseUrl = Some(x))
+      } text "gerrit url"
+      opt[String]('p', "prefix") optional () action { (p, c) =>
+        c.copy(prefix = Some(p))
+      } text "projects prefix"
+      opt[String]('o', "out") optional () action { (x, c) =>
+        c.copy(outputDir = x)
+      } text "output directory"
+      opt[String]('e', "elasticIndex") optional () action { (x, c) =>
+        c.copy(elasticIndex = Some(x))
+      } text "index name"
+      opt[LocalDate]('s', "since") optional () action { (x, c) =>
+        c.copy(since = Some(x))
+      } text "begin date "
+      opt[LocalDate]('u', "until") optional () action { (x, c) =>
+        c.copy(until = Some(x))
+      } text "since date"
+      opt[String]('g', "aggregate") optional () action { (x, c) =>
+        c.copy(aggregate = Some(x))
+      } text "aggregate email/email_hour/email_day/email_month/email_year"
+
+      opt[String]('a', "email-aliases") optional () validate fileExists action { (path, c) =>
+        c.copy(emailAlias = Some(path))
+      } text "\"emails to author alias\" input data path"
+      opt[String]("events") optional () action { (eventsPath, config) =>
+        config.copy(eventsPath = Some(eventsPath))
+      } text "location where to load the Gerrit Events"
+      opt[String]("writeNotProcessedEventsTo") optional () action { (failedEventsPath, config) =>
+        config.copy(eventsFailureOutputPath = Some(failedEventsPath))
+      } text "location where to write a TSV file containing the events we couldn't process with a description fo the reason why"
+
+      opt[String]("username") optional () action { (input, c) =>
+        c.copy(username = Some(input))
+      } text "Gerrit API Username"
+
+      opt[String]("password") optional () action { (input, c) =>
+        c.copy(password = Some(input))
+      } text "Gerrit API Password"
+
+      opt[Boolean]('r', "extract-branches") optional () action { (input, c) =>
+        c.copy(extractBranches = Some(input))
+      } text "enables branches extraction for each commit"
+
+    }
+
+  cliOptionParser.parse(args, GerritEndpointConfig()) match {
+    case Some(config) =>
+      implicit val spark: SparkSession = SparkSession
+        .builder()
+        .appName("Gerrit Analytics ETL")
+        .getOrCreate()
+
+      implicit val _: GerritEndpointConfig = config
+
+      logger.info(s"Starting analytics app with config $config")
+
+      val dataFrame = buildProjectStats().cache() //This dataframe is written twice
+
+      logger.info(s"ES content created, saving it to '${config.outputDir}'")
+      dataFrame.write.json(config.outputDir)
+
+      saveES(dataFrame)
+
+    case None => // invalid configuration usage has been displayed
+  }
+}
+
+trait Job {
+  self: LazyLogging with FetchProjects =>
+  implicit val codec = Codec.ISO8859
+
+  def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
+    import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations._
+    import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations._
+
+    implicit val sc: SparkContext = spark.sparkContext
+
+    val aggregationStrategy: AggregationStrategy =
+      config.aggregate
+        .flatMap { agg =>
+          AggregationStrategy.byName(agg) match {
+            case Success(strategy) => Some(strategy)
+            case Failure(e) =>
+              logger.warn(s"Invalid aggregation strategy '$agg", e)
+              None
+          }
+        }
+        .getOrElse(AggregationStrategy.aggregateByEmail)
+
+    val projects = fetchProjects(config)
+
+    logger.info(
+      s"Loaded a list of ${projects.size} projects ${if (projects.size > 20) projects.take(20).mkString("[", ",", ", ...]")
+      else projects.mkString("[", ",", "]")}")
+
+    val aliasesDF = getAliasDF(config.emailAlias)
+
+    val events = loadEvents
+
+    val failedEvents: RDD[NotParsableJsonEvent] = events.collect {
+      case Left(eventFailureDescription) => eventFailureDescription
+    }
+
+    if (!failedEvents.isEmpty()) {
+      config.eventsFailureOutputPath.foreach { failurePath =>
+        logger.info(s"Events failures will be stored at '$failurePath'")
+
+        import spark.implicits._
+        failedEvents.toDF().write.option("sep", "\t").option("header", true).csv(failurePath)
+      }
+    }
+
+    //We might want to use the time of the events as information to feed to the collection of data from the repository
+    val repositoryAlteringEvents = events.collect { case Right(event) => event }.repositoryWithNewRevisionEvents
+
+    val firstEventDateMaybe: Option[LocalDate] =
+      if (repositoryAlteringEvents.isEmpty()) None
+      else Some(repositoryAlteringEvents.earliestEventTime.toLocalDate)
+
+    val configWithOverriddenUntil = firstEventDateMaybe.fold(config) { firstEventDate =>
+      val lastAggregationDate = firstEventDate.plusMonths(1)
+      if (lastAggregationDate.isBefore(LocalDate.now())) {
+        logger.info(
+          s"Overriding 'until' date '${config.until}' with '$lastAggregationDate' since events are available until $firstEventDate")
+        config.copy(until = Some(lastAggregationDate))
+      } else {
+        config
+      }
+    }
+
+    val statsFromAnalyticsPlugin =
+      getContributorStatsFromAnalyticsPlugin(spark.sparkContext.parallelize(projects),
+                                             configWithOverriddenUntil.contributorsUrl,
+                                             config.gerritApiConnection)
+
+    val statsFromEvents = getContributorStatsFromGerritEvents(
+      repositoryAlteringEvents,
+      statsFromAnalyticsPlugin.commitSet.rdd,
+      aggregationStrategy)
+
+    val mergedEvents = if (statsFromEvents.head(1).isEmpty) {
+      statsFromAnalyticsPlugin
+    } else {
+      require(
+        statsFromAnalyticsPlugin.schema == statsFromEvents.schema,
+        s""" Schemas from the stats collected from events and from the analytics datasets differs!!
+           | From analytics plugin: ${statsFromAnalyticsPlugin.schema}
+           | From gerrit events: ${statsFromEvents.schema}
+      """.stripMargin
+      )
+
+      (statsFromAnalyticsPlugin union statsFromEvents)
+    }
+
+    mergedEvents.dashboardStats(aliasesDF)
+  }
+
+  def loadEvents(
+      implicit config: GerritEndpointConfig,
+      spark: SparkSession): RDD[Either[NotParsableJsonEvent, GerritJsonEvent]] = { // toDF
+    import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations._
+
+    config.eventsPath.fold(
+      spark.sparkContext.emptyRDD[Either[NotParsableJsonEvent, GerritJsonEvent]]) { eventsPath =>
+      spark.read
+        .textFile(eventsPath)
+        .rdd
+        .parseEvents(EventParser)
+    }
+  }
+
+  def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
+    import org.elasticsearch.spark.sql._
+    config.elasticIndex.foreach { esIndex =>
+      logger.info(
+        s"ES content created, saving it to elastic search instance at '${config.elasticIndex}'")
+
+      df.saveToEs(esIndex)
+    }
+
+  }
+}
+
+trait FetchProjects {
+  def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject]
+}
+
+trait FetchRemoteProjects extends FetchProjects {
+  def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
+    config.gerritProjectsUrl.toSeq.flatMap { url =>
+      GerritProjectsSupport.parseJsonProjectListResponse(
+        config.gerritApiConnection.getContentFromApi(url))
+    }
+  }
+}
diff --git a/src/main/scala/com/gerritforge/analytics/model/Email.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/model/Email.scala
similarity index 94%
rename from src/main/scala/com/gerritforge/analytics/model/Email.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/model/Email.scala
index e439ae7..e99a8f1 100644
--- a/src/main/scala/com/gerritforge/analytics/model/Email.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/model/Email.scala
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.gerritforge.analytics.model
+package com.gerritforge.analytics.gitcommits.model
 
 case class Email(user: String, domain: String)
 
@@ -27,4 +27,4 @@
       case _ => None
     }
   }
-}
\ No newline at end of file
+}
diff --git a/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala
new file mode 100644
index 0000000..69e8cee
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala
@@ -0,0 +1,65 @@
+// Copyright (C) 2017 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.gitcommits.model
+
+import java.time.format.DateTimeFormatter
+import java.time.{LocalDate, ZoneOffset}
+
+import com.gerritforge.analytics.gitcommits.api.GerritConnectivity
+import com.gerritforge.analytics.gitcommits.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
+
+case class GerritEndpointConfig(
+    baseUrl: Option[String] = None,
+    prefix: Option[String] = None,
+    outputDir: String =
+      s"file://${System.getProperty("java.io.tmpdir")}/analytics-${System.nanoTime()}",
+    elasticIndex: Option[String] = None,
+    since: Option[LocalDate] = None,
+    until: Option[LocalDate] = None,
+    aggregate: Option[String] = None,
+    emailAlias: Option[String] = None,
+    eventsPath: Option[String] = None,
+    eventsFailureOutputPath: Option[String] = None,
+    username: Option[String] = None,
+    password: Option[String] = None,
+    extractBranches: Option[Boolean] = None) {
+
+  val gerritApiConnection: GerritConnectivity = new GerritConnectivity(username, password)
+
+  val gerritProjectsUrl: Option[String] = baseUrl.map { url =>
+    s"${url}/projects/" + prefix.fold("")("?p=" + _)
+  }
+
+  def queryOpt(opt: (String, Option[String])): Option[String] = {
+    opt match {
+      case (name: String, value: Option[String]) => value.map(name + "=" + _)
+    }
+  }
+
+  @transient
+  private lazy val format: DateTimeFormatter =
+    AnalyticsDateTimeFormater.yyyy_MM_dd.withZone(ZoneOffset.UTC)
+  val queryString = Seq(
+    "since"            -> since.map(format.format),
+    "until"            -> until.map(format.format),
+    "aggregate"        -> aggregate,
+    "extract-branches" -> extractBranches.map(_.toString)
+  ).flatMap(queryOpt).mkString("?", "&", "")
+
+  def contributorsUrl(projectName: String): Option[String] =
+    baseUrl.map { url =>
+      s"$url/projects/$projectName/analytics~contributors$queryString"
+    }
+}
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritProject.scala
similarity index 96%
rename from src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritProject.scala
index 139791d..5dda011 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritProject.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritProject.scala
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.gerritforge.analytics.model
+package com.gerritforge.analytics.gitcommits.model
 
 import com.google.gerrit.extensions.api.GerritApi
 import com.google.inject.Inject
@@ -51,4 +51,4 @@
   }
 }
 
-case class ProjectContributionSource(name: String, contributorsUrl: Option[String])
\ No newline at end of file
+case class ProjectContributionSource(name: String, contributorsUrl: Option[String])
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/GerritConfigSupport.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/GerritConfigSupport.scala
similarity index 93%
rename from src/main/scala/com/gerritforge/analytics/plugin/GerritConfigSupport.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/plugin/GerritConfigSupport.scala
index c980431..51f1c39 100644
--- a/src/main/scala/com/gerritforge/analytics/plugin/GerritConfigSupport.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/GerritConfigSupport.scala
@@ -1,4 +1,4 @@
-package com.gerritforge.analytics.plugin
+package com.gerritforge.analytics.gitcommits.plugin
 
 import com.google.gerrit.server.config.GerritServerConfig
 import com.google.inject.Inject
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/Module.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/Module.scala
similarity index 68%
rename from src/main/scala/com/gerritforge/analytics/plugin/Module.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/plugin/Module.scala
index a42e44e..e7ca14c 100644
--- a/src/main/scala/com/gerritforge/analytics/plugin/Module.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/Module.scala
@@ -1,4 +1,4 @@
-package com.gerritforge.analytics.plugin
+package com.gerritforge.analytics.gitcommits.plugin
 
 import com.google.inject.AbstractModule
 
diff --git a/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
new file mode 100644
index 0000000..883b1f8
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
@@ -0,0 +1,134 @@
+package com.gerritforge.analytics.gitcommits.plugin
+
+import java.sql.Timestamp
+import java.time.LocalDate
+
+import com.gerritforge.analytics.gitcommits.engine.events.AggregationStrategy
+import com.gerritforge.analytics.gitcommits.job.{FetchProjects, Job}
+import com.gerritforge.analytics.gitcommits.model.{
+  GerritEndpointConfig,
+  GerritProject,
+  GerritProjectsSupport
+}
+import com.google.gerrit.server.project.ProjectControl
+import com.google.gerrit.sshd.{CommandMetaData, SshCommand}
+import com.google.inject.Inject
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
+import org.kohsuke.args4j.{Argument, Option => ArgOption}
+
+import scala.io.Source
+import scala.util.{Failure, Success}
+
+@CommandMetaData(name = "processGitCommits",
+                 description = "Start the extraction of Git Commits Gerrit analytics")
+class ProcessGitCommitsCommand @Inject()(implicit val gerritProjects: GerritProjectsSupport,
+                                         val gerritConfig: GerritConfigSupport)
+    extends SshCommand
+    with Job
+    with DateConversions
+    with FetchProjects
+    with LazyLogging {
+
+  @Argument(index = 0, required = true, metaVar = "PROJECT", usage = "project name")
+  var projectControl: ProjectControl = null
+
+  @ArgOption(name = "--elasticIndex", aliases = Array("-e"), usage = "index name")
+  var elasticIndex: String = "gerrit/analytics"
+
+  @ArgOption(name = "--since", aliases = Array("-s"), usage = "begin date")
+  var beginDate: Timestamp = NO_TIMESTAMP
+
+  @ArgOption(name = "--until", aliases = Array("-u"), usage = "end date")
+  var endDate: Timestamp = NO_TIMESTAMP
+
+  @ArgOption(name = "--aggregate",
+             aliases = Array("-g"),
+             usage = "aggregate email/email_hour/email_day/email_month/email_year")
+  var aggregate: String = "email_day"
+
+  @ArgOption(name = "--email-aliases",
+             aliases = Array("-a"),
+             usage = "\"emails to author alias\" input data path")
+  var emailAlias: String = null
+
+  @ArgOption(name = "--extract-branches",
+             aliases = Array("-r"),
+             usage = "enables branches extraction for each commit")
+  var extractBranches: Boolean = false
+
+  override def run() {
+    implicit val config = GerritEndpointConfig(gerritConfig.getListenUrl(),
+                                               prefix =
+                                                 Option(projectControl).map(_.getProject.getName),
+                                               "",
+                                               elasticIndex,
+                                               beginDate,
+                                               endDate,
+                                               aggregate,
+                                               emailAlias)
+
+    implicit val spark: SparkSession = SparkSession
+      .builder()
+      .appName("analytics-etl")
+      .master("local")
+      .config("key", "value")
+      .getOrCreate()
+
+    implicit lazy val sc: SparkContext = spark.sparkContext
+    implicit lazy val sql: SQLContext  = spark.sqlContext
+
+    val prevClassLoader = Thread.currentThread().getContextClassLoader
+    Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
+    try {
+      stdout.println(s"Starting new Spark job with parameters: $config")
+      stdout.flush()
+      val startTs      = System.currentTimeMillis
+      val projectStats = buildProjectStats().cache()
+      val numRows      = projectStats.count()
+
+      import org.elasticsearch.spark.sql._
+      config.elasticIndex.foreach { esIndex =>
+        stdout.println(
+          s"$numRows rows extracted. Posting Elasticsearch at '${config.elasticIndex}'")
+        stdout.flush()
+        projectStats.saveToEs(esIndex)
+      }
+
+      val elaspsedTs = (System.currentTimeMillis - startTs) / 1000L
+      stdout.println(s"Job COMPLETED in $elaspsedTs secs")
+    } catch {
+      case e: Throwable =>
+        e.printStackTrace()
+        stderr.println(s"Job FAILED: ${e.getClass} : ${e.getMessage}")
+        die(e)
+    } finally {
+      Thread.currentThread().setContextClassLoader(prevClassLoader)
+    }
+  }
+
+  def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
+    config.prefix.toSeq.flatMap(projectName =>
+      gerritProjects.getProject(projectName) match {
+        case Success(project) =>
+          Seq(project)
+        case Failure(e) => {
+          logger.warn(s"Unable to fetch project $projectName", e)
+          Seq()
+        }
+    })
+  }
+}
+
+trait DateConversions {
+  val NO_TIMESTAMP = new Timestamp(0L)
+
+  implicit def timestampToLocalDate(timestamp: Timestamp): Option[LocalDate] = timestamp match {
+    case NO_TIMESTAMP => None
+    case ts           => Some(ts.toLocalDateTime.toLocalDate)
+  }
+
+  implicit def nullableStringToOption(nullableString: String): Option[String] =
+    Option(nullableString)
+}
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/SshModule.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/SshModule.scala
similarity index 60%
rename from src/main/scala/com/gerritforge/analytics/plugin/SshModule.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/plugin/SshModule.scala
index 290b1ff..9d1bc0b 100644
--- a/src/main/scala/com/gerritforge/analytics/plugin/SshModule.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/SshModule.scala
@@ -1,9 +1,9 @@
-package com.gerritforge.analytics.plugin
+package com.gerritforge.analytics.gitcommits.plugin
 
 import com.google.gerrit.sshd.PluginCommandModule
 
 class SshModule extends PluginCommandModule {
   override protected def configureCommands {
-    command(classOf[StartCommand])
+    command(classOf[ProcessGitCommitsCommand])
   }
 }
diff --git a/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala b/src/main/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOps.scala
similarity index 97%
rename from src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
rename to src/main/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOps.scala
index 9012617..5628ceb 100644
--- a/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
+++ b/src/main/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOps.scala
@@ -12,8 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-
-package com.gerritforge.analytics.support.ops
+package com.gerritforge.analytics.gitcommits.support.ops
 
 import java.time.format.DateTimeFormatter
 import java.time.{LocalDateTime, ZoneOffset}
@@ -62,7 +61,6 @@
       def convertToUTCLocalDateTime: OffsetDateTime = localDateTime.atOffset(ZoneOffset.UTC)
     }
 
-
     implicit class StringToTimeParsingOps(val dateStr: String) extends AnyVal {
       def parseStringToUTCEpoch(stringFormat: DateTimeFormatter): Option[Long] =
         Try(LocalDateTime.parse(dateStr, stringFormat).convertToUTCEpochMillis).toOption
@@ -83,4 +81,4 @@
 
     implicit def nullableStringToOption(nullableString: String): Option[String] = Option(nullableString)
   }
-}
\ No newline at end of file
+}
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
deleted file mode 100644
index 099efac..0000000
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ /dev/null
@@ -1,228 +0,0 @@
-// Copyright (C) 2017 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics.job
-
-import java.time.LocalDate
-
-import com.gerritforge.analytics.engine.events.GerritEventsTransformations.NotParsableJsonEvent
-import com.gerritforge.analytics.engine.events.{AggregationStrategy, EventParser, GerritJsonEvent}
-import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
-import com.gerritforge.analytics.support.ops.AnalyticsTimeOps
-import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
-import com.typesafe.scalalogging.LazyLogging
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import scopt.Read.reads
-import scopt.{OptionParser, Read}
-
-import scala.io.Codec
-import scala.util.control.NonFatal
-import scala.util.{Failure, Success}
-
-object Main extends App with Job with LazyLogging with FetchRemoteProjects {
-
-  private val fileExists: String => Either[String, Unit] = { path =>
-    if (!new java.io.File(path).exists) {
-      Left(s"ERROR: Path '$path' doesn't exists!")
-    } else {
-      Right()
-    }
-  }
-
-  implicit val localDateRead: Read[LocalDate] = reads { dateStr =>
-    val cliDateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd
-    try {
-      import AnalyticsTimeOps.implicits._
-      dateStr.parseStringToLocalDate(cliDateFormat).get
-    } catch {
-      case NonFatal(e) =>
-        throw new IllegalArgumentException(s"Invalid date '$dateStr' expected format is '${cliDateFormat}'")
-    }
-  }
-
-  private val cliOptionParser: OptionParser[GerritEndpointConfig] = new scopt.OptionParser[GerritEndpointConfig]("scopt") {
-    head("scopt", "3.x")
-    opt[String]('u', "url") optional() action { (x, c) =>
-      c.copy(baseUrl = Some(x))
-    } text "gerrit url"
-    opt[String]('p', "prefix") optional() action { (p, c) =>
-      c.copy(prefix = Some(p))
-    } text "projects prefix"
-    opt[String]('o', "out") optional() action { (x, c) =>
-      c.copy(outputDir = x)
-    } text "output directory"
-    opt[String]('e', "elasticIndex") optional() action { (x, c) =>
-      c.copy(elasticIndex = Some(x))
-    } text "index name"
-    opt[LocalDate]('s', "since") optional() action { (x, c) =>
-      c.copy(since = Some(x))
-    } text "begin date "
-    opt[LocalDate]('u', "until") optional() action { (x, c) =>
-      c.copy(until = Some(x))
-    } text "since date"
-    opt[String]('g', "aggregate") optional() action { (x, c) =>
-      c.copy(aggregate = Some(x))
-    } text "aggregate email/email_hour/email_day/email_month/email_year"
-
-    opt[String]('a', "email-aliases") optional() validate fileExists action { (path, c) =>
-      c.copy(emailAlias = Some(path))
-    } text "\"emails to author alias\" input data path"
-    opt[String]("events") optional() action { (eventsPath, config) =>
-      config.copy(eventsPath = Some(eventsPath))
-    } text "location where to load the Gerrit Events"
-    opt[String]("writeNotProcessedEventsTo") optional() action { (failedEventsPath, config) =>
-      config.copy(eventsFailureOutputPath = Some(failedEventsPath))
-    } text "location where to write a TSV file containing the events we couldn't process with a description fo the reason why"
-
-    opt[String]("username") optional() action { (input, c) =>
-      c.copy(username = Some(input))
-    } text "Gerrit API Username"
-
-    opt[String]("password") optional() action { (input, c) =>
-      c.copy(password = Some(input))
-    } text "Gerrit API Password"
-
-    opt[Boolean]('r', "extract-branches") optional() action { (input, c) =>
-      c.copy(extractBranches = Some(input))
-    } text "enables branches extraction for each commit"
-
-  }
-
-  cliOptionParser.parse(args, GerritEndpointConfig()) match {
-    case Some(config) =>
-      implicit val spark: SparkSession = SparkSession.builder()
-        .appName("Gerrit Analytics ETL")
-        .getOrCreate()
-
-      implicit val _: GerritEndpointConfig = config
-
-      logger.info(s"Starting analytics app with config $config")
-
-      val dataFrame = buildProjectStats().cache() //This dataframe is written twice
-
-      logger.info(s"ES content created, saving it to '${config.outputDir}'")
-      dataFrame.write.json(config.outputDir)
-
-      saveES(dataFrame)
-
-    case None => // invalid configuration usage has been displayed
-  }
-}
-
-trait Job {
-  self: LazyLogging with FetchProjects =>
-  implicit val codec = Codec.ISO8859
-
-  def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
-    import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
-    import com.gerritforge.analytics.engine.events.GerritEventsTransformations._
-
-    implicit val sc: SparkContext = spark.sparkContext
-
-    val aggregationStrategy: AggregationStrategy =
-      config.aggregate.flatMap { agg =>
-        AggregationStrategy.byName(agg) match {
-          case Success(strategy) => Some(strategy)
-          case Failure(e) =>
-            logger.warn(s"Invalid aggregation strategy '$agg", e)
-            None
-        }
-      }.getOrElse(AggregationStrategy.aggregateByEmail)
-
-    val projects = fetchProjects(config)
-
-    logger.info(s"Loaded a list of ${projects.size} projects ${if (projects.size > 20) projects.take(20).mkString("[", ",", ", ...]") else projects.mkString("[", ",", "]")}")
-
-    val aliasesDF = getAliasDF(config.emailAlias)
-
-    val events = loadEvents
-
-    val failedEvents: RDD[NotParsableJsonEvent] = events.collect { case Left(eventFailureDescription) => eventFailureDescription }
-
-    if (!failedEvents.isEmpty()) {
-      config.eventsFailureOutputPath.foreach { failurePath =>
-        logger.info(s"Events failures will be stored at '$failurePath'")
-
-        import spark.implicits._
-        failedEvents.toDF().write.option("sep", "\t").option("header", true).csv(failurePath)
-      }
-    }
-
-    //We might want to use the time of the events as information to feed to the collection of data from the repository
-    val repositoryAlteringEvents = events.collect { case Right(event) => event }.repositoryWithNewRevisionEvents
-
-    val firstEventDateMaybe: Option[LocalDate] = if (repositoryAlteringEvents.isEmpty()) None else Some(repositoryAlteringEvents.earliestEventTime.toLocalDate)
-
-    val configWithOverriddenUntil = firstEventDateMaybe.fold(config) { firstEventDate =>
-      val lastAggregationDate = firstEventDate.plusMonths(1)
-      if (lastAggregationDate.isBefore(LocalDate.now())) {
-        logger.info(s"Overriding 'until' date '${config.until}' with '$lastAggregationDate' since events are available until $firstEventDate")
-        config.copy(until = Some(lastAggregationDate))
-      } else {
-        config
-      }
-    }
-
-    val statsFromAnalyticsPlugin =
-      getContributorStatsFromAnalyticsPlugin(spark.sparkContext.parallelize(projects), configWithOverriddenUntil.contributorsUrl, config.gerritApiConnection)
-
-    val statsFromEvents = getContributorStatsFromGerritEvents(repositoryAlteringEvents, statsFromAnalyticsPlugin.commitSet.rdd, aggregationStrategy)
-
-    val mergedEvents = if (statsFromEvents.head(1).isEmpty) {
-      statsFromAnalyticsPlugin
-    } else {
-      require(statsFromAnalyticsPlugin.schema == statsFromEvents.schema,
-        s""" Schemas from the stats collected from events and from the analytics datasets differs!!
-           | From analytics plugin: ${statsFromAnalyticsPlugin.schema}
-           | From gerrit events: ${statsFromEvents.schema}
-      """.stripMargin)
-
-      (statsFromAnalyticsPlugin union statsFromEvents)
-    }
-
-    mergedEvents.dashboardStats(aliasesDF)
-  }
-
-  def loadEvents(implicit config: GerritEndpointConfig, spark: SparkSession): RDD[Either[NotParsableJsonEvent, GerritJsonEvent]] = { // toDF
-    import com.gerritforge.analytics.engine.events.GerritEventsTransformations._
-
-    config.eventsPath.fold(spark.sparkContext.emptyRDD[Either[NotParsableJsonEvent, GerritJsonEvent]]) { eventsPath =>
-      spark
-        .read.textFile(eventsPath).rdd
-        .parseEvents(EventParser)
-    }
-  }
-
-  def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
-    import org.elasticsearch.spark.sql._
-    config.elasticIndex.foreach { esIndex =>
-      logger.info(s"ES content created, saving it to elastic search instance at '${config.elasticIndex}'")
-
-      df.saveToEs(esIndex)
-    }
-
-  }
-}
-
-trait FetchProjects {
-  def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject]
-}
-
-trait FetchRemoteProjects extends FetchProjects {
-  def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
-    config.gerritProjectsUrl.toSeq.flatMap { url => GerritProjectsSupport.parseJsonProjectListResponse(config.gerritApiConnection.getContentFromApi(url)) }
-  }
-}
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
deleted file mode 100644
index d76743d..0000000
--- a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-// Copyright (C) 2017 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics.model
-
-import java.time.format.DateTimeFormatter
-import java.time.{LocalDate, ZoneOffset}
-
-import com.gerritforge.analytics.api.GerritConnectivity
-import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
-
-case class GerritEndpointConfig(baseUrl: Option[String] = None,
-                                prefix: Option[String] = None,
-                                outputDir: String = s"file://${System.getProperty("java.io.tmpdir")}/analytics-${System.nanoTime()}",
-                                elasticIndex: Option[String] = None,
-                                since: Option[LocalDate] = None,
-                                until: Option[LocalDate] = None,
-                                aggregate: Option[String] = None,
-                                emailAlias: Option[String] = None,
-                                eventsPath: Option[String] = None,
-                                eventsFailureOutputPath: Option[String] = None,
-                                username: Option[String] = None,
-                                password: Option[String] = None,
-                                extractBranches: Option[Boolean] = None
-                               ) {
-
-
-  val gerritApiConnection: GerritConnectivity = new GerritConnectivity(username, password)
-
-  val gerritProjectsUrl: Option[String] = baseUrl.map { url => s"${url}/projects/" + prefix.fold("")("?p=" + _) }
-
-  def queryOpt(opt: (String, Option[String])): Option[String] = {
-    opt match {
-      case (name: String, value: Option[String]) => value.map(name + "=" + _)
-    }
-  }
-
-  @transient
-  private lazy val format: DateTimeFormatter = AnalyticsDateTimeFormater.yyyy_MM_dd.withZone(ZoneOffset.UTC)
-  val queryString = Seq(
-    "since" -> since.map(format.format),
-    "until" -> until.map(format.format),
-    "aggregate" -> aggregate,
-    "extract-branches" -> extractBranches.map(_.toString)
-  ).flatMap(queryOpt).mkString("?", "&", "")
-
-  def contributorsUrl(projectName: String): Option[String] =
-    baseUrl.map { url => s"$url/projects/$projectName/analytics~contributors$queryString" }
-}
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala b/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala
deleted file mode 100644
index 9832df5..0000000
--- a/src/main/scala/com/gerritforge/analytics/plugin/StartCommand.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-package com.gerritforge.analytics.plugin
-
-import java.sql.Timestamp
-import java.time.LocalDate
-
-import com.gerritforge.analytics.engine.events.AggregationStrategy
-import com.gerritforge.analytics.job.{FetchProjects, Job}
-import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
-import com.google.gerrit.server.project.ProjectControl
-import com.google.gerrit.sshd.{CommandMetaData, SshCommand}
-import com.google.inject.Inject
-import com.typesafe.scalalogging.LazyLogging
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
-import org.kohsuke.args4j.{Argument, Option => ArgOption}
-
-import scala.io.Source
-import scala.util.{Failure, Success}
-
-@CommandMetaData(name = "start", description = "Start the extraction of Gerrit analytics")
-class StartCommand @Inject()(implicit val gerritProjects: GerritProjectsSupport, val gerritConfig: GerritConfigSupport)
-  extends SshCommand with Job with DateConversions with FetchProjects with LazyLogging {
-
-  @Argument(index = 0, required = true, metaVar = "PROJECT", usage = "project name")
-  var projectControl: ProjectControl = null
-
-  @ArgOption(name = "--elasticIndex", aliases = Array("-e"),
-    usage = "index name")
-  var elasticIndex: String = "gerrit/analytics"
-
-  @ArgOption(name = "--since", aliases = Array("-s"), usage = "begin date")
-  var beginDate: Timestamp = NO_TIMESTAMP
-
-  @ArgOption(name = "--until", aliases = Array("-u"), usage = "end date")
-  var endDate: Timestamp = NO_TIMESTAMP
-
-  @ArgOption(name = "--aggregate", aliases = Array("-g"), usage = "aggregate email/email_hour/email_day/email_month/email_year")
-  var aggregate: String = "email_day"
-
-  @ArgOption(name = "--email-aliases", aliases = Array("-a"), usage = "\"emails to author alias\" input data path")
-  var emailAlias: String = null
-
-  @ArgOption(name = "--extract-branches", aliases = Array("-r"), usage = "enables branches extraction for each commit")
-  var extractBranches: Boolean = false
-
-  override def run() {
-    implicit val config = GerritEndpointConfig(gerritConfig.getListenUrl(), prefix = Option(projectControl).map(_.getProject.getName), "", elasticIndex,
-      beginDate, endDate, aggregate, emailAlias)
-
-    implicit val spark: SparkSession = SparkSession.builder()
-      .appName("analytics-etl")
-      .master("local")
-      .config("key", "value")
-      .getOrCreate()
-
-    implicit lazy val sc: SparkContext = spark.sparkContext
-    implicit lazy val sql: SQLContext = spark.sqlContext
-
-    val prevClassLoader = Thread.currentThread().getContextClassLoader
-    Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
-    try {
-      stdout.println(s"Starting new Spark job with parameters: $config")
-      stdout.flush()
-      val startTs = System.currentTimeMillis
-      val projectStats = buildProjectStats().cache()
-      val numRows = projectStats.count()
-
-        import org.elasticsearch.spark.sql._
-        config.elasticIndex.foreach { esIndex =>
-          stdout.println(s"$numRows rows extracted. Posting Elasticsearch at '${config.elasticIndex}'")
-          stdout.flush()
-          projectStats.saveToEs(esIndex)
-        }
-
-      val elaspsedTs = (System.currentTimeMillis - startTs) / 1000L
-      stdout.println(s"Job COMPLETED in $elaspsedTs secs")
-    } catch {
-      case e: Throwable =>
-        e.printStackTrace()
-        stderr.println(s"Job FAILED: ${e.getClass} : ${e.getMessage}")
-        die(e)
-    } finally {
-      Thread.currentThread().setContextClassLoader(prevClassLoader)
-    }
-  }
-
-  def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
-    config.prefix.toSeq.flatMap(projectName => gerritProjects.getProject(projectName) match {
-      case Success(project) =>
-        Seq(project)
-      case Failure(e) => {
-        logger.warn(s"Unable to fetch project $projectName", e)
-        Seq()
-      }
-    })
-  }
-}
-
-trait DateConversions {
-  val NO_TIMESTAMP = new Timestamp(0L)
-
-  implicit def timestampToLocalDate(timestamp: Timestamp): Option[LocalDate] = timestamp match {
-    case NO_TIMESTAMP => None
-    case ts => Some(ts.toLocalDateTime.toLocalDate)
-  }
-
-  implicit def nullableStringToOption(nullableString: String): Option[String] = Option(nullableString)
-}
-
-
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index ec9d3e5..65f4571 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -17,9 +17,9 @@
 import java.io.{ByteArrayInputStream, File, FileOutputStream, OutputStreamWriter}
 import java.nio.charset.StandardCharsets
 
-import com.gerritforge.analytics.api.GerritConnectivity
-import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
-import com.gerritforge.analytics.model.{GerritProject, GerritProjectsSupport, ProjectContributionSource}
+import com.gerritforge.analytics.gitcommits.api.GerritConnectivity
+import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations._
+import com.gerritforge.analytics.gitcommits.model.{GerritProject, GerritProjectsSupport, ProjectContributionSource}
 import org.apache.spark.sql.Row
 import org.json4s.JsonDSL._
 import org.json4s._
@@ -48,7 +48,6 @@
     projects should contain only(GerritProject("All-Projects-id", "All-Projects-name"), GerritProject("Test-id", "Test-name"))
   }
 
-
   "enrichWithSource" should "enrich project RDD object with its source" in {
 
     val projectRdd = sc.parallelize(Seq(GerritProject("project-id", "project-name")))
@@ -75,7 +74,6 @@
       """.stripMargin
     val expectedResult = List("LineOne", "LineTwo", "LineThree")
 
-
     val inputStream = new ByteArrayInputStream(contentWithEmptyLines.getBytes)
     val contentWithoutEmptyLines = filterEmptyStrings(Source.fromInputStream(inputStream, Codec.UTF8.name))
 
@@ -179,7 +177,6 @@
       ("aliased_author_with_organization", "aliased_email@aliased_organization.com", "aliased_organization"),
       ("aliased_author_empty_organization", "aliased_email@emtpy_organization.com", ""),
       ("aliased_author_null_organization", "aliased_email@null_organization.com", null)
-
     )).toDF("author", "email", "organization")
 
     val inputSampleDF = sc.parallelize(Seq(
@@ -196,8 +193,7 @@
 
     val df = inputSampleDF.handleAliases(Some(aliasDF))
 
-    df.schema.fields.map(_.name) should contain allOf(
-      "author", "email", "organization")
+    df.schema.fields.map(_.name) should contain allOf ("author", "email", "organization")
 
     df.collect should contain theSameElementsAs expectedDF.collect
   }
@@ -214,18 +210,18 @@
 
     val df = inputSampleDF.handleAliases(Some(aliasDF))
 
-    df.schema.fields.map(_.name) should contain allOf("author", "email", "organization")
+    df.schema.fields.map(_.name) should contain allOf ("author", "email", "organization")
   }
 
   it should "return correct columns when alias DF is not defined" in {
     import spark.implicits._
     val expectedTuple = ("author_name", "email@mail.com", "an_organization")
     val inputSampleDF = sc.parallelize(Seq(expectedTuple)).toDF("author", "email", "organization")
-    val expectedRow = Row.fromTuple(expectedTuple)
+    val expectedRow   = Row.fromTuple(expectedTuple)
 
     val df = inputSampleDF.handleAliases(None)
 
-    df.schema.fields.map(_.name) should contain allOf("author", "email", "organization")
+    df.schema.fields.map(_.name) should contain allOf ("author", "email", "organization")
     df.collect().head should be(expectedRow)
   }
 
@@ -262,7 +258,6 @@
       "email@mail.companyname-couk.co.uk",
       "email@mail.companyname-com.com",
       "email@mail.companyname-info.info"
-
     )).toDF("email")
 
     val transformed = df.addOrganization()
@@ -283,7 +278,6 @@
     )
   }
 
-
   "extractCommitsPerProject" should "generate a Dataset with the all the SHA of commits with associated project" in {
     import sql.implicits._
 
@@ -306,8 +300,7 @@
   }
 
   private def newSource(contributorsJson: JObject*): Option[String] = {
-    val tmpFile = File.createTempFile(System.getProperty("java.io.tmpdir"),
-      s"${getClass.getName}-${System.nanoTime()}")
+    val tmpFile = File.createTempFile(System.getProperty("java.io.tmpdir"),s"${getClass.getName}-${System.nanoTime()}")
 
     val out = new OutputStreamWriter(new FileOutputStream(tmpFile), StandardCharsets.UTF_8)
     contributorsJson.foreach(json => out.write(compact(render(json)) + '\n'))
diff --git a/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala
deleted file mode 100644
index 434662f..0000000
--- a/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala
+++ /dev/null
@@ -1,257 +0,0 @@
-// Copyright (C) 2018 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics.engine.events
-
-import com.gerritforge.analytics.SparkTestSupport
-import com.gerritforge.analytics.engine.GerritAnalyticsTransformations.{CommitInfo, UserActivitySummary}
-import com.gerritforge.analytics.engine.events.GerritEventsTransformations.NotParsableJsonEvent
-import org.apache.spark.rdd.RDD
-import org.scalatest.{Inside, Matchers, WordSpec}
-
-class GerritEventsTransformationsSpec extends WordSpec with Matchers with SparkTestSupport with Inside with EventFixture {
-
-  "tryParseGerritTriggeredEvent" should {
-
-    implicit val eventParser: GerritJsonEventParser = EventParser
-
-    "Parse a correctly formed event" in new EventFixture {
-      GerritEventsTransformations.tryParseGerritTriggeredEvent(refUpdated.json) shouldBe Right(refUpdated.event)
-    }
-
-    "Return a description of the failure in with the original event source a Left object if the JSON provided is invalid" in {
-      val invalidJson = "invalid json string"
-      GerritEventsTransformations.tryParseGerritTriggeredEvent(invalidJson) shouldBe Left(NotParsableJsonEvent(invalidJson, "unknown token i - Near: i"))
-    }
-
-    "Return a description of the failure with the original event source  in a Left object if the JSON event is not supported" in {
-      val unsupportedEvent = """{"type":"ref-updated-UNSUPPORTED","eventCreatedOn":1516531868}"""
-
-      GerritEventsTransformations.tryParseGerritTriggeredEvent(unsupportedEvent) shouldBe Left(NotParsableJsonEvent(unsupportedEvent, "Unsupported event type 'ref-updated-UNSUPPORTED'"))
-    }
-  }
-
-  "PimpedJsonRDD" should {
-    "Convert an RDD of JSON events into an RDD of events or unparsed json strings" in {
-      val jsonRdd = sc.parallelize(Seq(refUpdated.json, changeMerged.json, "invalid json string"))
-
-      import GerritEventsTransformations._
-
-      jsonRdd.parseEvents(EventParser).collect() should contain only(
-        Right(refUpdated.event),
-        Right(changeMerged.event),
-        Left(NotParsableJsonEvent("invalid json string", "unknown token i - Near: i"))
-      )
-    }
-  }
-
-  "extractUserActivitySummary" should {
-    implicit val eventParser: GerritJsonEventParser = EventParser
-
-    "Build one UserActivitySummary object if given a series of non-merge commits" in {
-      val events: Seq[ChangeMergedEvent] = Seq(
-        aChangeMergedEvent("1", 1001l, newRev = "rev1", insertions = 2, deletions = 1, branch = "stable-1.14"),
-        aChangeMergedEvent("2", 1002l, newRev = "rev2", insertions = 3, deletions = 0, branch = "stable-1.14"),
-        aChangeMergedEvent("3", 1003l, newRev = "rev3", insertions = 1, deletions = 4, branch = "stable-1.14"),
-        aChangeMergedEvent("4", 1004l, newRev = "rev4", insertions = 0, deletions = 2, branch = "stable-1.14"),
-        aChangeMergedEvent("5", 1005l, newRev = "rev5", insertions = 1, deletions = 1, branch = "stable-1.14")
-      ).map(_.event)
-
-      val summaries: Iterable[UserActivitySummary] = GerritEventsTransformations.extractUserActivitySummary(
-        changes = events,
-        year = 2018, month = 1, day = 10, hour = 1
-      )
-
-      summaries should have size 1
-
-      inside(summaries.head) {
-        case UserActivitySummary(year, month, day, hour, name, email, num_commits, _, _, added_lines, deleted_lines, commits, branches, last_commit_date, is_merge) =>
-          year shouldBe 2018
-          month shouldBe 1
-          day shouldBe 10
-          hour shouldBe 1
-          name shouldBe "Administrator"
-          email shouldBe "admin@example.com"
-          num_commits shouldBe events.size
-          last_commit_date shouldBe 1005000l
-          is_merge shouldBe false
-          added_lines shouldBe 7
-          deleted_lines shouldBe 8
-          commits should contain only(
-            CommitInfo("rev1", 1001000l, false),
-            CommitInfo("rev2", 1002000l, false),
-            CommitInfo("rev3", 1003000l, false),
-            CommitInfo("rev4", 1004000l, false),
-            CommitInfo("rev5", 1005000l, false)
-          )
-          branches should contain only "stable-1.14"
-      }
-    }
-
-    "Build two UserActivitySummaries object if given a mixed series of merge and non merge commits" in {
-      val events: Seq[ChangeMergedEvent] = Seq(
-        aChangeMergedEvent("1", 1001l, newRev = "rev1", isMergeCommit = true),
-        aChangeMergedEvent("2", 1002l, newRev = "rev2"),
-        aChangeMergedEvent("3", 1003l, newRev = "rev3", isMergeCommit = true),
-        aChangeMergedEvent("4", 1004l, newRev = "rev4"),
-        aChangeMergedEvent("5", 1005l, newRev = "rev5")
-      ).map(_.event)
-
-      val summaries: Iterable[UserActivitySummary] = GerritEventsTransformations.extractUserActivitySummary(
-        changes = events,
-        year = 2018, month = 1, day = 10, hour = 1
-      )
-
-      summaries should have size 2
-
-      summaries.foreach { summary =>
-        inside(summary) {
-          case UserActivitySummary(year, month, day, hour, name, email,_, _, _, _, _, _, _, _, _) =>
-            year shouldBe 2018
-            month shouldBe 1
-            day shouldBe 10
-            hour shouldBe 1
-            name shouldBe "Administrator"
-            email shouldBe "admin@example.com"
-        }
-      }
-
-      summaries.foreach { summary =>
-        inside(summary) {
-          case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, _, last_commit_date, false) =>
-            num_commits shouldBe 3
-            last_commit_date shouldBe 1005000l
-            commits should contain only(
-              CommitInfo("rev2", 1002000l, false),
-              CommitInfo("rev4", 1004000l, false),
-              CommitInfo("rev5", 1005000l, false)
-            )
-
-          case UserActivitySummary(_, _, _, _, _, _, num_commits, _, _, _, _, commits, _, last_commit_date, true) =>
-            num_commits shouldBe 2
-            last_commit_date shouldBe 1003000l
-            commits should contain only(
-              CommitInfo("rev1", 1001000l, true),
-              CommitInfo("rev3", 1003000l, true)
-            )
-        }
-      }
-
-    }
-  }
-
-  "Pimped Per Project UserActivitySummary RDD" should {
-    "Allow conversion to a DataFrame equivalent to what extracted from the Analytics plugin" in {
-      import GerritEventsTransformations._
-      import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
-      import spark.implicits._
-
-      val aliasDF = sc.parallelize(Seq(
-        ("stefano_alias", "stefano@galarraga-org.com", "")
-      )).toDF("author", "email", "organization")
-
-      val expectedDate = System.currentTimeMillis
-
-      val analyticsJobOutput =
-        sc.parallelize(Seq(
-          "project1" -> UserActivitySummary(2018, 1, 20, 10, "Stefano", "stefano@galarraga-org.com", 1, 2, 1, 10, 4, Array(CommitInfo("sha1", expectedDate, false)),
-            Array("master", "stable-2.14"), expectedDate, false)
-        ))
-          .asEtlDataFrame(sql)
-          .addOrganization()
-          .handleAliases(Some(aliasDF))
-          .dropCommits
-
-      val expected = sc.parallelize(Seq(
-        ("project1", "stefano_alias", "stefano@galarraga-org.com", 2018, 1, 20, 10, 2, 1, 10, 4, 1, expectedDate, false, Array("master", "stable-2.14"), "galarraga-org")
-      )).toDF("project", "author", "email", "year", "month", "day", "hour", "num_files", "num_distinct_files",
-        "added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "branches", "organization")
-
-      val collected = analyticsJobOutput.collect()
-
-      collected should contain theSameElementsAs expected.collect()
-    }
-  }
-
-  "removeEventsForCommits" should {
-    "remove any event modifying a gerrit repo to go toward the commits to be excluded" in {
-      import GerritEventsTransformations._
-
-      val toKeep1 = aRefUpdatedEvent("oldRev", "RevToKeep")
-      val toKeep2 = aChangeMergedEvent(changeId = "changeId2", newRev = "RevToKeep2")
-      val events: RDD[GerritRefHasNewRevisionEvent] = sc.parallelize(Seq(
-        aRefUpdatedEvent("oldRev", "RevToExclude1"),
-        toKeep1,
-        aRefUpdatedEvent("oldRev", "RevToExclude2"),
-        aChangeMergedEvent(changeId = "changeId1", newRev = "RevToExclude3"),
-        toKeep2
-      ).map(_.event))
-
-      events
-        .removeEventsForCommits(sc.parallelize(Seq("RevToExclude1", "RevToExclude2", "RevToExclude3")))
-        .collect() should contain only (toKeep1.event, toKeep2.event )
-    }
-  }
-}
-
-trait EventFixture {
-
-  // Forcing early type failures
-  case class JsonEvent[T <: GerritJsonEvent](json: String) {
-    val event: T = EventParser.fromJson(json).get.asInstanceOf[T]
-  }
-
-  val refUpdated: JsonEvent[RefUpdatedEvent] = aRefUpdatedEvent(oldRev = "863b64002f2a9922deba69407804a44703c996e0", newRev = "d3131be8d7c920badd28b70d8c039682568c8de5")
-
-  val changeMerged: JsonEvent[ChangeMergedEvent] = aChangeMergedEvent("I5e6b5a3bbe8a29fb0393e4a28da536e0a198b755")
-
-  def aRefUpdatedEvent(oldRev: String, newRev: String, createdOn: Long = 1000l) = JsonEvent[RefUpdatedEvent](
-    s"""{"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"},
-       | "refUpdate":{"oldRev":"$oldRev",
-       | "newRev":"$newRev",
-       | "refName": "refs/heads/master","project":"subcut"},
-       |"type":"ref-updated","eventCreatedOn":$createdOn}""".stripMargin)
-
-  def aChangeMergedEvent(changeId: String, createdOnInSecs: Long = 1000l, newRev: String = "863b64002f2a9922deba69407804a44703c996e0",
-                         isMergeCommit: Boolean = false, insertions: Integer = 0, deletions: Integer = 0, branch: String = "master") = JsonEvent[ChangeMergedEvent](
-    s"""{
-       |"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"},
-       |"newRev":"$newRev",
-       |"patchSet":{
-       | "number":1,
-       | "revision":"$newRev",
-       | "parents": ${if (isMergeCommit) """["4a4e59272f1f88824d805c0f4233c1ee7331e986", "4a4e59272f1f88824d805c0f4233c1ee7331e987"]""" else """["4a4e59272f1f88824d805c0f4233c1ee7331e986"]"""},
-       | "ref":"refs/changes/01/1/1",
-       | "uploader":{"name":"Administrator","email":"admin@example.com","username":"admin"},
-       | "createdOn":1516530259,
-       | "author":{"name":"Stefano Galarraga","email":"galarragas@gmail.com","username":""},
-       | "isDraft":false,
-       | "kind":"REWORK",
-       | "sizeInsertions":$insertions,
-       | "sizeDeletions":$deletions
-       |},
-       |"change":{
-       | "project":"subcut","branch":"$branch","topic":"TestEvents","id":"$changeId","number":1,"subject":"Generating some changes to test events",
-       | "owner":{"name":"Administrator","email":"admin@example.com","username":"admin"},
-       | "url":"http://842860da5b33:8080/1","commitMessage":"Generating some changes to test events Change-Id: $changeId",
-       | "createdOn":1516530259,"status":"MERGED"
-       |},
-       |"project":{"name":"subcut"},
-       |"refName":"refs/heads/$branch",
-       |"changeKey":{"id":"$changeId"},
-       |"type":"change-merged",
-       |"eventCreatedOn": $createdOnInSecs
-       |}""".stripMargin)
-
-}
\ No newline at end of file
diff --git a/src/test/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformationsSpec.scala
new file mode 100644
index 0000000..043bd5b
--- /dev/null
+++ b/src/test/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformationsSpec.scala
@@ -0,0 +1,424 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.gitcommits.engine.events
+
+import com.gerritforge.analytics.SparkTestSupport
+import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations.{
+  CommitInfo,
+  UserActivitySummary
+}
+import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations.NotParsableJsonEvent
+import org.apache.spark.rdd.RDD
+import org.scalatest.{Inside, Matchers, WordSpec}
+
+class GerritEventsTransformationsSpec
+    extends WordSpec
+    with Matchers
+    with SparkTestSupport
+    with Inside
+    with EventFixture {
+
+  "tryParseGerritTriggeredEvent" should {
+
+    implicit val eventParser: GerritJsonEventParser = EventParser
+
+    "Parse a correctly formed event" in new EventFixture {
+      GerritEventsTransformations.tryParseGerritTriggeredEvent(refUpdated.json) shouldBe Right(
+        refUpdated.event)
+    }
+
+    "Return a description of the failure in with the original event source a Left object if the JSON provided is invalid" in {
+      val invalidJson = "invalid json string"
+      GerritEventsTransformations.tryParseGerritTriggeredEvent(invalidJson) shouldBe Left(
+        NotParsableJsonEvent(invalidJson, "unknown token i - Near: i"))
+    }
+
+    "Return a description of the failure with the original event source  in a Left object if the JSON event is not supported" in {
+      val unsupportedEvent = """{"type":"ref-updated-UNSUPPORTED","eventCreatedOn":1516531868}"""
+
+      GerritEventsTransformations.tryParseGerritTriggeredEvent(unsupportedEvent) shouldBe Left(
+        NotParsableJsonEvent(unsupportedEvent, "Unsupported event type 'ref-updated-UNSUPPORTED'"))
+    }
+  }
+
+  "PimpedJsonRDD" should {
+    "Convert an RDD of JSON events into an RDD of events or unparsed json strings" in {
+      val jsonRdd = sc.parallelize(Seq(refUpdated.json, changeMerged.json, "invalid json string"))
+
+      import GerritEventsTransformations._
+
+      jsonRdd.parseEvents(EventParser).collect() should contain only (
+        Right(refUpdated.event),
+        Right(changeMerged.event),
+        Left(NotParsableJsonEvent("invalid json string", "unknown token i - Near: i"))
+      )
+    }
+  }
+
+  "extractUserActivitySummary" should {
+    implicit val eventParser: GerritJsonEventParser = EventParser
+
+    "Build one UserActivitySummary object if given a series of non-merge commits" in {
+      val events: Seq[ChangeMergedEvent] = Seq(
+        aChangeMergedEvent("1",
+                           1001l,
+                           newRev = "rev1",
+                           insertions = 2,
+                           deletions = 1,
+                           branch = "stable-1.14"),
+        aChangeMergedEvent("2",
+                           1002l,
+                           newRev = "rev2",
+                           insertions = 3,
+                           deletions = 0,
+                           branch = "stable-1.14"),
+        aChangeMergedEvent("3",
+                           1003l,
+                           newRev = "rev3",
+                           insertions = 1,
+                           deletions = 4,
+                           branch = "stable-1.14"),
+        aChangeMergedEvent("4",
+                           1004l,
+                           newRev = "rev4",
+                           insertions = 0,
+                           deletions = 2,
+                           branch = "stable-1.14"),
+        aChangeMergedEvent("5",
+                           1005l,
+                           newRev = "rev5",
+                           insertions = 1,
+                           deletions = 1,
+                           branch = "stable-1.14")
+      ).map(_.event)
+
+      val summaries: Iterable[UserActivitySummary] =
+        GerritEventsTransformations.extractUserActivitySummary(
+          changes = events,
+          year = 2018,
+          month = 1,
+          day = 10,
+          hour = 1
+        )
+
+      summaries should have size 1
+
+      inside(summaries.head) {
+        case UserActivitySummary(year,
+                                 month,
+                                 day,
+                                 hour,
+                                 name,
+                                 email,
+                                 num_commits,
+                                 _,
+                                 _,
+                                 added_lines,
+                                 deleted_lines,
+                                 commits,
+                                 branches,
+                                 last_commit_date,
+                                 is_merge) =>
+          year shouldBe 2018
+          month shouldBe 1
+          day shouldBe 10
+          hour shouldBe 1
+          name shouldBe "Administrator"
+          email shouldBe "admin@example.com"
+          num_commits shouldBe events.size
+          last_commit_date shouldBe 1005000l
+          is_merge shouldBe false
+          added_lines shouldBe 7
+          deleted_lines shouldBe 8
+          commits should contain only (
+            CommitInfo("rev1", 1001000l, false),
+            CommitInfo("rev2", 1002000l, false),
+            CommitInfo("rev3", 1003000l, false),
+            CommitInfo("rev4", 1004000l, false),
+            CommitInfo("rev5", 1005000l, false)
+          )
+          branches should contain only "stable-1.14"
+      }
+    }
+
+    "Build two UserActivitySummaries object if given a mixed series of merge and non merge commits" in {
+      val events: Seq[ChangeMergedEvent] = Seq(
+        aChangeMergedEvent("1", 1001l, newRev = "rev1", isMergeCommit = true),
+        aChangeMergedEvent("2", 1002l, newRev = "rev2"),
+        aChangeMergedEvent("3", 1003l, newRev = "rev3", isMergeCommit = true),
+        aChangeMergedEvent("4", 1004l, newRev = "rev4"),
+        aChangeMergedEvent("5", 1005l, newRev = "rev5")
+      ).map(_.event)
+
+      val summaries: Iterable[UserActivitySummary] =
+        GerritEventsTransformations.extractUserActivitySummary(
+          changes = events,
+          year = 2018,
+          month = 1,
+          day = 10,
+          hour = 1
+        )
+
+      summaries should have size 2
+
+      summaries.foreach { summary =>
+        inside(summary) {
+          case UserActivitySummary(year,
+                                   month,
+                                   day,
+                                   hour,
+                                   name,
+                                   email,
+                                   _,
+                                   _,
+                                   _,
+                                   _,
+                                   _,
+                                   _,
+                                   _,
+                                   _,
+                                   _) =>
+            year shouldBe 2018
+            month shouldBe 1
+            day shouldBe 10
+            hour shouldBe 1
+            name shouldBe "Administrator"
+            email shouldBe "admin@example.com"
+        }
+      }
+
+      summaries.foreach { summary =>
+        inside(summary) {
+          case UserActivitySummary(_,
+                                   _,
+                                   _,
+                                   _,
+                                   _,
+                                   _,
+                                   num_commits,
+                                   _,
+                                   _,
+                                   _,
+                                   _,
+                                   commits,
+                                   _,
+                                   last_commit_date,
+                                   false) =>
+            num_commits shouldBe 3
+            last_commit_date shouldBe 1005000l
+            commits should contain only (
+              CommitInfo("rev2", 1002000l, false),
+              CommitInfo("rev4", 1004000l, false),
+              CommitInfo("rev5", 1005000l, false)
+            )
+
+          case UserActivitySummary(_,
+                                   _,
+                                   _,
+                                   _,
+                                   _,
+                                   _,
+                                   num_commits,
+                                   _,
+                                   _,
+                                   _,
+                                   _,
+                                   commits,
+                                   _,
+                                   last_commit_date,
+                                   true) =>
+            num_commits shouldBe 2
+            last_commit_date shouldBe 1003000l
+            commits should contain only (
+              CommitInfo("rev1", 1001000l, true),
+              CommitInfo("rev3", 1003000l, true)
+            )
+        }
+      }
+
+    }
+  }
+
+  "Pimped Per Project UserActivitySummary RDD" should {
+    "Allow conversion to a DataFrame equivalent to what extracted from the Analytics plugin" in {
+      import GerritEventsTransformations._
+      import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations._
+      import spark.implicits._
+
+      val aliasDF = sc
+        .parallelize(
+          Seq(
+            ("stefano_alias", "stefano@galarraga-org.com", "")
+          ))
+        .toDF("author", "email", "organization")
+
+      val expectedDate = System.currentTimeMillis
+
+      val analyticsJobOutput =
+        sc.parallelize(
+            Seq(
+              "project1" -> UserActivitySummary(
+                2018,
+                1,
+                20,
+                10,
+                "Stefano",
+                "stefano@galarraga-org.com",
+                1,
+                2,
+                1,
+                10,
+                4,
+                Array(CommitInfo("sha1", expectedDate, false)),
+                Array("master", "stable-2.14"),
+                expectedDate,
+                false
+              )
+            ))
+          .asEtlDataFrame(sql)
+          .addOrganization()
+          .handleAliases(Some(aliasDF))
+          .dropCommits
+
+      val expected = sc
+        .parallelize(
+          Seq(
+            ("project1",
+             "stefano_alias",
+             "stefano@galarraga-org.com",
+             2018,
+             1,
+             20,
+             10,
+             2,
+             1,
+             10,
+             4,
+             1,
+             expectedDate,
+             false,
+             Array("master", "stable-2.14"),
+             "galarraga-org")
+          ))
+        .toDF(
+          "project",
+          "author",
+          "email",
+          "year",
+          "month",
+          "day",
+          "hour",
+          "num_files",
+          "num_distinct_files",
+          "added_lines",
+          "deleted_lines",
+          "num_commits",
+          "last_commit_date",
+          "is_merge",
+          "branches",
+          "organization"
+        )
+
+      val collected = analyticsJobOutput.collect()
+
+      collected should contain theSameElementsAs expected.collect()
+    }
+  }
+
+  "removeEventsForCommits" should {
+    "remove any event modifying a gerrit repo to go toward the commits to be excluded" in {
+      import GerritEventsTransformations._
+
+      val toKeep1 = aRefUpdatedEvent("oldRev", "RevToKeep")
+      val toKeep2 = aChangeMergedEvent(changeId = "changeId2", newRev = "RevToKeep2")
+      val events: RDD[GerritRefHasNewRevisionEvent] = sc.parallelize(
+        Seq(
+          aRefUpdatedEvent("oldRev", "RevToExclude1"),
+          toKeep1,
+          aRefUpdatedEvent("oldRev", "RevToExclude2"),
+          aChangeMergedEvent(changeId = "changeId1", newRev = "RevToExclude3"),
+          toKeep2
+        ).map(_.event))
+
+      events
+        .removeEventsForCommits(
+          sc.parallelize(Seq("RevToExclude1", "RevToExclude2", "RevToExclude3")))
+        .collect() should contain only (toKeep1.event, toKeep2.event)
+    }
+  }
+}
+
+trait EventFixture {
+
+  // Forcing early type failures
+  case class JsonEvent[T <: GerritJsonEvent](json: String) {
+    val event: T = EventParser.fromJson(json).get.asInstanceOf[T]
+  }
+
+  val refUpdated: JsonEvent[RefUpdatedEvent] = aRefUpdatedEvent(
+    oldRev = "863b64002f2a9922deba69407804a44703c996e0",
+    newRev = "d3131be8d7c920badd28b70d8c039682568c8de5")
+
+  val changeMerged: JsonEvent[ChangeMergedEvent] = aChangeMergedEvent(
+    "I5e6b5a3bbe8a29fb0393e4a28da536e0a198b755")
+
+  def aRefUpdatedEvent(oldRev: String, newRev: String, createdOn: Long = 1000l) =
+    JsonEvent[RefUpdatedEvent](
+      s"""{"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"},
+       | "refUpdate":{"oldRev":"$oldRev",
+       | "newRev":"$newRev",
+       | "refName": "refs/heads/master","project":"subcut"},
+       |"type":"ref-updated","eventCreatedOn":$createdOn}""".stripMargin)
+
+  def aChangeMergedEvent(changeId: String,
+                         createdOnInSecs: Long = 1000l,
+                         newRev: String = "863b64002f2a9922deba69407804a44703c996e0",
+                         isMergeCommit: Boolean = false,
+                         insertions: Integer = 0,
+                         deletions: Integer = 0,
+                         branch: String = "master") =
+    JsonEvent[ChangeMergedEvent](
+      s"""{
+       |"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"},
+       |"newRev":"$newRev",
+       |"patchSet":{
+       | "number":1,
+       | "revision":"$newRev",
+       | "parents": ${if (isMergeCommit)
+           """["4a4e59272f1f88824d805c0f4233c1ee7331e986", "4a4e59272f1f88824d805c0f4233c1ee7331e987"]"""
+         else """["4a4e59272f1f88824d805c0f4233c1ee7331e986"]"""},
+       | "ref":"refs/changes/01/1/1",
+       | "uploader":{"name":"Administrator","email":"admin@example.com","username":"admin"},
+       | "createdOn":1516530259,
+       | "author":{"name":"Stefano Galarraga","email":"galarragas@gmail.com","username":""},
+       | "isDraft":false,
+       | "kind":"REWORK",
+       | "sizeInsertions":$insertions,
+       | "sizeDeletions":$deletions
+       |},
+       |"change":{
+       | "project":"subcut","branch":"$branch","topic":"TestEvents","id":"$changeId","number":1,"subject":"Generating some changes to test events",
+       | "owner":{"name":"Administrator","email":"admin@example.com","username":"admin"},
+       | "url":"http://842860da5b33:8080/1","commitMessage":"Generating some changes to test events Change-Id: $changeId",
+       | "createdOn":1516530259,"status":"MERGED"
+       |},
+       |"project":{"name":"subcut"},
+       |"refName":"refs/heads/$branch",
+       |"changeKey":{"id":"$changeId"},
+       |"type":"change-merged",
+       |"eventCreatedOn": $createdOnInSecs
+       |}""".stripMargin)
+
+}
diff --git a/src/test/scala/com/gerritforge/analytics/model/EmailSpec.scala b/src/test/scala/com/gerritforge/analytics/gitcommits/model/EmailSpec.scala
similarity index 97%
rename from src/test/scala/com/gerritforge/analytics/model/EmailSpec.scala
rename to src/test/scala/com/gerritforge/analytics/gitcommits/model/EmailSpec.scala
index e18e328..fe65ec5 100644
--- a/src/test/scala/com/gerritforge/analytics/model/EmailSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/gitcommits/model/EmailSpec.scala
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.gerritforge.analytics.model
+package com.gerritforge.analytics.gitcommits.model
 
 import org.scalatest.{FlatSpec, Matchers}
 
@@ -81,4 +81,4 @@
       case _ =>
     }
   }
-}
\ No newline at end of file
+}
diff --git a/src/test/scala/com/gerritforge/analytics/model/GerritEndpointConfigTest.scala b/src/test/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfigTest.scala
similarity index 95%
rename from src/test/scala/com/gerritforge/analytics/model/GerritEndpointConfigTest.scala
rename to src/test/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfigTest.scala
index 591184b..a839430 100644
--- a/src/test/scala/com/gerritforge/analytics/model/GerritEndpointConfigTest.scala
+++ b/src/test/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfigTest.scala
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.gerritforge.analytics.model
+package com.gerritforge.analytics.gitcommits.model
 
 import org.scalatest.{FlatSpec, Matchers}
 
diff --git a/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala b/src/test/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOpsSpec.scala
similarity index 95%
rename from src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala
rename to src/test/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOpsSpec.scala
index 1c9adb8..c8fc602 100644
--- a/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/gitcommits/support/ops/AnalyticsTimeOpsSpec.scala
@@ -12,16 +12,15 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.gerritforge.analytics.support.ops
+package com.gerritforge.analytics.gitcommits.support.ops
 
 import java.time.{LocalDate, LocalDateTime, ZoneOffset}
 
-import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
+import com.gerritforge.analytics.gitcommits.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
 import org.scalatest.{FlatSpec, Matchers}
 
 class AnalyticsTimeOpsSpec extends FlatSpec with Matchers {
 
-
   "String parser - Given a correct string and date format" should "return an epoch value" in {
     val epochValueUTC =
       LocalDateTime
@@ -49,7 +48,6 @@
     stringDate.parseStringToLocalDate(dateFormat).get should equal(utcLocalDate)
   }
 
-
   "String parser - An incorrect string a given format" should "return None" in {
     val stringDate = "2018-01-01 12:00:00.000000000"
     val dateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd
@@ -58,7 +56,6 @@
     stringDate.parseStringToUTCEpoch(dateFormat) should equal(None)
   }
 
-
   "Simple Date Formats" should "convert to the correct strings - yyyyMMddHH" in {
     val epochValueUTC =
       LocalDateTime
@@ -103,7 +100,6 @@
     AnalyticsDateTimeFormater.yyyy.format(epochValueUTC) should equal(yyyyStr)
   }
 
-
   "UTC conversion" should "check date operations return always UTC" in {
     val dateTime =
       LocalDateTime