Remove Change processing from events stream

This feature is not currently used and it is not working as it is.
It was a POC to check the possibility of combining batch and stream events processing.
The code is getting in the way of new features development, hence
the decision of removing it, until the necessity of stream processing will arise again.

Feature: Issue 10211
Change-Id: I4a3e993b80ae0c43fa5248ec2acfe10da9b6ed5c
diff --git a/README.md b/README.md
index 0d723b3..165c68a 100644
--- a/README.md
+++ b/README.md
@@ -36,8 +36,6 @@
     --since 2000-06-01 \
     --aggregate email_hour \
     --url http://gerrit.mycompany.com \
-    --events file:///tmp/gerrit-events-export.json \
-    --writeNotProcessedEventsTo file:///tmp/failed-events \
     -e gerrit \
     --username gerrit-api-username \
     --password gerrit-api-password
@@ -49,7 +47,7 @@
 docker run -ti --rm \
     -e ES_HOST="es.mycompany.com" \
     -e GERRIT_URL="http://gerrit.mycompany.com" \
-    -e ANALYTICS_ARGS="--since 2000-06-01 --aggregate email_hour --writeNotProcessedEventsTo file:///tmp/failed-events -e gerrit" \
+    -e ANALYTICS_ARGS="--since 2000-06-01 --aggregate email_hour -e gerrit" \
     gerritforge/gerrit-analytics-etl-gitcommits:latest
 ```
 
@@ -63,11 +61,6 @@
     if not provided data is saved to </tmp>/analytics-<NNNN> where </tmp> is
     the system temporary directory
 - -a --email-aliases (*optional*) "emails to author alias" input data path.
-
-- --events location where to load the Gerrit Events
-    If not specified events will be ignored
-- --writeNotProcessedEventsTo location where to write a TSV file containing the events we couldn't process
-    with a description fo the reason why
 - -k --ignore-ssl-cert allows to proceed even for server connections otherwise considered insecure.
 
 
@@ -207,7 +200,7 @@
           --network analytics-etl_ek \
           -e ES_HOST="elasticsearch" \
           -e GERRIT_URL="http://$HOST_IP:8080" \
-          -e ANALYTICS_ARGS="--since 2000-06-01 --aggregate email_hour --writeNotProcessedEventsTo file:///tmp/failed-events -e gerrit" \
+          -e ANALYTICS_ARGS="--since 2000-06-01 --aggregate email_hour -e gerrit" \
           gerritforge/gerrit-analytics-etl-gitcommits:latest
   ```
 
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala
index a98e92b..af185aa 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/GerritAnalyticsTransformations.scala
@@ -181,7 +181,7 @@
       ZoneOffset.UTC, ZoneId.of("Z")
     ) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
 
-  def getContributorStatsFromAnalyticsPlugin(projects: RDD[GerritProject], projectToContributorsAnalyticsUrlFactory: String => Option[String], gerritApiConnection: GerritConnectivity)(implicit spark: SparkSession) = {
+  def getContributorStats(projects: RDD[GerritProject], projectToContributorsAnalyticsUrlFactory: String => Option[String], gerritApiConnection: GerritConnectivity)(implicit spark: SparkSession) = {
     import spark.sqlContext.implicits._ // toDF
 
     projects
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/AggregationStrategy.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/AggregationStrategy.scala
deleted file mode 100644
index b56babe..0000000
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/AggregationStrategy.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-// Copyright (C) 2017 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics.gitcommits.engine.events
-
-import java.text.DateFormat
-import java.time.LocalDateTime
-
-import com.gerritforge.analytics.support.ops.{AnalyticsDateTimeFormatter, CommonTimeOperations}
-
-import scala.util.Try
-
-sealed trait AggregationStrategy extends Serializable {
-  def aggregationKey(event: GerritJsonEvent): String
-
-  case class DateTimeParts(year: Integer, month: Integer, day: Integer, hour: Integer)
-
-  def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts = {
-    val date: LocalDateTime = CommonTimeOperations.utcDateTimeFromEpoch(event.eventCreatedOn)
-    DateTimeParts(date.getYear, date.getMonthValue, date.getDayOfMonth, date.getHour)
-  }
-}
-
-object AggregationStrategy {
-
-  @throws(classOf[IllegalArgumentException])
-  def byName(aggregationName: String): Try[AggregationStrategy] = Try {
-    aggregationName.toUpperCase match {
-      case "EMAIL" => aggregateByEmail
-      case "EMAIL_YEAR" => aggregateByEmailAndYear
-      case "EMAIL_MONTH" => aggregateByEmailAndMonth
-      case "EMAIL_DAY" => aggregateByEmailAndDay
-      case "EMAIL_HOUR" => aggregateByEmailAndHour
-      case unsupported =>
-        throw new IllegalArgumentException(s"Unsupported aggregation '$aggregationName")
-    }
-  }
-
-  object aggregateByEmail extends AggregationStrategy {
-    override def aggregationKey(event: GerritJsonEvent): String = event.account.email
-  }
-
-  trait EmailAndTimeBasedAggregation extends AggregationStrategy {
-    val dateFormat: DateFormat
-
-    final override def aggregationKey(event: GerritJsonEvent): String = {
-      s"${event.account.email}/${dateFormat.format(event.eventCreatedOn)}"
-    }
-  }
-
-  object aggregateByEmailAndHour extends EmailAndTimeBasedAggregation {
-    val dateFormat = AnalyticsDateTimeFormatter.yyyyMMddHH
-  }
-
-  object aggregateByEmailAndDay extends EmailAndTimeBasedAggregation {
-    val dateFormat = AnalyticsDateTimeFormatter.yyyyMMdd
-
-    override def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts =
-      super.decomposeTimeOfAggregatedEvent(event).copy(hour = 0)
-  }
-
-  object aggregateByEmailAndMonth extends EmailAndTimeBasedAggregation {
-    val dateFormat = AnalyticsDateTimeFormatter.yyyyMM
-
-    override def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts =
-      super.decomposeTimeOfAggregatedEvent(event).copy(day = 0, hour = 0)
-  }
-
-  object aggregateByEmailAndYear extends EmailAndTimeBasedAggregation {
-    val dateFormat = AnalyticsDateTimeFormatter.yyyy
-
-    override def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts =
-      super.decomposeTimeOfAggregatedEvent(event).copy(month = 0, day = 0, hour = 0)
-  }
-
-}
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformations.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformations.scala
deleted file mode 100644
index 07d9bc1..0000000
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformations.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-// Copyright (C) 2018 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics.gitcommits.engine.events
-
-import java.time.{LocalDateTime, ZoneOffset}
-
-import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations.{ CommitInfo, UserActivitySummary }
-import com.typesafe.scalalogging.LazyLogging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
-
-import scala.util.{Failure, Success}
-
-object GerritEventsTransformations extends LazyLogging {
-
-  case class NotParsableJsonEvent(source: String, failureDescription: String)
-
-  implicit class PimpedJsonRDD(val self: RDD[String]) extends AnyVal {
-    def parseEvents(implicit eventParser: GerritJsonEventParser): RDD[Either[NotParsableJsonEvent, GerritJsonEvent]] = {
-      self.map(tryParseGerritTriggeredEvent)
-    }
-  }
-
-  implicit class PimpedGerritJsonEventRDD[T <: GerritJsonEvent](val self: RDD[T]) extends AnyVal {
-
-    /**
-      * Returns the UTC date of the earliest event in the RDD
-      *
-      * @return
-      */
-    def earliestEventTime: LocalDateTime = {
-      val earliestEpoch = self.map { event =>
-        event.eventCreatedOn
-      }.min()
-
-      LocalDateTime.ofEpochSecond(earliestEpoch, 0, ZoneOffset.UTC)
-    }
-
-    def repositoryWithNewRevisionEvents: RDD[GerritRefHasNewRevisionEvent] = self.collect { case e: GerritRefHasNewRevisionEvent => e }
-  }
-
-  implicit class PimpedRepositoryModifiedGerritEventRDD(val self: RDD[GerritRefHasNewRevisionEvent]) extends AnyVal {
-    def removeEventsForCommits(commitsToExclude: RDD[String]): RDD[GerritRefHasNewRevisionEvent] = {
-      self
-        .keyBy(_.newRev)
-        .leftOuterJoin(commitsToExclude.keyBy(identity))
-        .flatMap[GerritRefHasNewRevisionEvent] {
-        case (_, (_, Some(_))) => None
-
-        case (_, (event, None)) => Some(event)
-      }
-    }
-
-    def userActivitySummaryPerProject(aggregationStrategy: AggregationStrategy, eventParser: GerritJsonEventParser): RDD[(String, UserActivitySummary)] = {
-      self
-        .collect { case e: ChangeMergedEvent => e }
-        .groupBy { event =>
-          (event.change.project, aggregationStrategy.aggregationKey(event))
-        }.flatMap { case ((project, _), changesPerUserAndTimeWindow) =>
-        val dateParts: aggregationStrategy.DateTimeParts = aggregationStrategy.decomposeTimeOfAggregatedEvent(changesPerUserAndTimeWindow.head)
-
-        //          val branchesPerCommitForThisProject = broadcastBranchesPerCommitByProject.value.getOrElse(project, Map.empty)
-
-        extractUserActivitySummary(
-          changesPerUserAndTimeWindow, dateParts.year, dateParts.month, dateParts.day, dateParts.hour
-        ).map(
-          project -> _
-        )
-      }
-    }
-  }
-
-  implicit class PimpedUserActivitySummaryPerProjectRDD(val self: RDD[(String, UserActivitySummary)]) extends AnyVal {
-    def asEtlDataFrame(implicit sqlContext: SQLContext): DataFrame = {
-      convertAsDataFrame(self)
-    }
-  }
-
-  def tryParseGerritTriggeredEvent(eventJson: String)(implicit eventParser: GerritJsonEventParser): Either[NotParsableJsonEvent, GerritJsonEvent] = {
-    eventParser.fromJson(eventJson) match {
-      case Success(event) => Right(event)
-      case Failure(exception) =>
-        logger.warn(s"Unable to parse event '$eventJson'", exception)
-        Left(NotParsableJsonEvent(eventJson, exception.getMessage.replace("\n", " - ")))
-    }
-  }
-
-  private def addCommitSummary(summaryTemplate: UserActivitySummary, changes: Iterable[ChangeMergedEvent]): UserActivitySummary = {
-    // We assume all the changes are consistent on the is_merge filter
-    summaryTemplate.copy(
-      num_commits = changes.size,
-      commits = changes.map(changeMerged =>
-        CommitInfo(changeMerged.newRev, changeMerged.eventCreatedOn * 1000, changeMerged.patchSet.parents.size > 1)
-      ).toArray,
-      branches = changes.map(changeMerged => changeMerged.change.branch).toArray,
-      last_commit_date = changes.map(_.eventCreatedOn).max * 1000,
-      added_lines = changes.map(_.patchSet.sizeInsertions).sum,
-      deleted_lines = changes.map(_.patchSet.sizeDeletions).sum,
-      // We cannot calculate anything about files until we export the list of files in the gerrit event
-      num_files = 0,
-      num_distinct_files = 0
-    )
-  }
-
-  /**
-    * Extract up to two UserActivitySummary object from the given iterable of ChangeMerged events
-    * depending if there is a mix of merge and non merge changes
-    */
-  def extractUserActivitySummary(changes: Iterable[ChangeMergedEvent],
-                                 //commitIdToBranchesMap: Map[String, Set[String]],
-                                 year: Integer,
-                                 month: Integer,
-                                 day: Integer,
-                                 hour: Integer): Iterable[UserActivitySummary] = {
-
-    changes.headOption.fold(List.empty[UserActivitySummary]) { firstChange =>
-      val name = firstChange.account.name
-      val email = firstChange.account.email
-
-      val summaryTemplate = UserActivitySummary(
-        year = year,
-        month = month,
-        day = day,
-        hour = hour,
-        name = name,
-        email = email,
-        num_commits = 0,
-        num_files = 0,
-        num_distinct_files = 0,
-        added_lines = 0,
-        deleted_lines = 0,
-        commits = Array.empty,
-        branches = Array.empty,
-        last_commit_date = 0l,
-        is_merge = false)
-
-      val (mergeCommits, nonMergeCommits) =
-        changes.foldLeft(List.empty[ChangeMergedEvent], List.empty[ChangeMergedEvent]) { case ((mergeCommits, nonMergeCommits), currentCommit) =>
-          if (currentCommit.patchSet.parents.size > 1) {
-            (currentCommit :: mergeCommits, nonMergeCommits)
-          } else {
-            (mergeCommits, currentCommit :: nonMergeCommits)
-          }
-        }
-
-      List(
-        (summaryTemplate.copy(is_merge = true), mergeCommits.reverse),
-        (summaryTemplate.copy(is_merge = false), nonMergeCommits.reverse)
-      )
-        .filter(_._2.nonEmpty)
-        .map { case (template, commits) =>
-          addCommitSummary(template, commits)
-        }
-    }
-  }
-
-  def convertAsDataFrame(self: RDD[(String, UserActivitySummary)])(implicit sqlContext: SQLContext): DataFrame = {
-    import sqlContext.implicits._
-
-    self.map { case (project, summary) =>
-      (project, summary.name, summary.email, summary.year, summary.month, summary.day, summary.hour, summary.num_files, summary.num_distinct_files,
-        summary.added_lines, summary.deleted_lines, Option(summary.num_commits), Option(summary.last_commit_date), Option(summary.is_merge), summary.commits, summary.branches)
-    }.toDF("project", "author", "email", "year", "month", "day", "hour", "num_files", "num_distinct_files",
-      "added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "commits", "branches")
-  }
-
-  def getContributorStatsFromGerritEvents(events: RDD[GerritRefHasNewRevisionEvent],
-                                          commitsToExclude: RDD[String],
-                                          aggregationStrategy: AggregationStrategy)(implicit spark: SparkSession) = {
-    implicit val sqlCtx = spark.sqlContext
-
-    events
-      .removeEventsForCommits(commitsToExclude)
-      .userActivitySummaryPerProject(aggregationStrategy, EventParser)
-      .asEtlDataFrame
-  }
-
-}
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/model.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/model.scala
deleted file mode 100644
index fe86d01..0000000
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/engine/events/model.scala
+++ /dev/null
@@ -1,173 +0,0 @@
-// Copyright (C) 2018 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics.gitcommits.engine.events
-
-import com.typesafe.scalalogging.LazyLogging
-import scala.util.{Failure, Success, Try}
-
-// for documentation about this object model see https://gerrit-review.googlesource.com/Documentation/cmd-stream-events.html
-
-trait GerritJsonEventParser extends Serializable {
-  def fromJson(json: String): Try[GerritJsonEvent]
-}
-
-object EventParser extends GerritJsonEventParser with LazyLogging {
-  val ChangeMergedEventType = "change-merged"
-  val RefUpdatedEventType = "ref-updated"
-
-  override def fromJson(json: String): Try[GerritJsonEvent] = {
-    import org.json4s._
-    import org.json4s.native.JsonMethods._
-
-    implicit val formats: DefaultFormats.type = DefaultFormats
-
-    def tryParse[T](jvalue: JValue)(implicit formats: Formats, mf: scala.reflect.Manifest[T]): Try[T] = {
-      try {
-        Success(jvalue.extract[T])
-      } catch {
-        case e: MappingException =>
-          Failure(MappingException(s"Invalid event of type `${mf.runtimeClass.getName}`: ${e.getMessage}", e))
-      }
-    }
-
-    Try(parse(json)).flatMap { parsedJson =>
-      parsedJson \ "type" match {
-        case JString(ChangeMergedEventType) =>
-          tryParse[ChangeMergedEvent](parsedJson)
-
-        case JString(RefUpdatedEventType) =>
-          tryParse[RefUpdatedEvent](parsedJson)
-
-        case JNothing =>
-          Failure(new IllegalArgumentException("Invalid JSON object received, missing 'type' field"))
-
-        case JString(unsupportedType) =>
-          Failure(new IllegalArgumentException(s"Unsupported event type '$unsupportedType'"))
-
-        case unexpectedJsonType =>
-          Failure(new IllegalArgumentException(s"Invalid JSON format for field 'type' `${unexpectedJsonType.getClass.getName}`"))
-      }
-    }
-  }
-}
-
-case class GerritAccount(name: String, email: String, username: String)
-object GerritAccount {
-  val NoAccountInfo = GerritAccount("", "", "")
-}
-
-case class GerritComment(reviewer: GerritAccount, message: String)
-
-case class GerritChange(project: String,
-                        branch: String,
-                        topic: Option[String],
-                        id: String,
-                        number: String,
-                        subject: String,
-                        commitMessage: String,
-                        owner: GerritAccount,
-                        url: String,
-                        createdOn: Long,
-                        lastUpdated: Option[Long],
-                        comments: List[GerritComment])
-
-case class GerritApproval(
-                           `type`: String,
-                           value: String,
-                           updated: Boolean = false,
-                           oldValue: String,
-                           by: GerritAccount
-                         )
-
-case class GerritPatchSet(
-                           number: String,
-                           revision: String,
-                           ref: String,
-                           draft: Option[Boolean] ,
-                           kind: String,
-                           uploader: GerritAccount,
-                           author: GerritAccount,
-                           approvals: List[GerritApproval],
-                           parents: List[String],
-                           createdOn: Long,
-                           sizeInsertions: Int,
-                           sizeDeletions: Int
-                         ) {
-  def isDraft : Boolean = draft.getOrElse(false)
-}
-
-sealed trait GerritJsonEvent {
-  def `type`: String
-
-  def eventCreatedOn: Long
-
-  def account: GerritAccount
-}
-
-sealed trait GerritRepositoryModifiedEvent extends GerritJsonEvent {
-  def modifiedProject: String
-
-  def modifiedRef: String
-}
-
-sealed trait GerritRefHasNewRevisionEvent extends GerritRepositoryModifiedEvent {
-  def newRev: String
-}
-
-sealed trait GerritChangeBasedEvent extends GerritRefHasNewRevisionEvent {
-  def change: GerritChange
-
-  def patchSet: GerritPatchSet
-
-  //def files: Set[String]
-}
-
-case class GerritChangeKey(id: String, `type`: Option[String], eventCreatedOn: Option[Long])
-
-//https://gerrit-review.googlesource.com/Documentation/cmd-stream-events.html#_change_merged
-case class ChangeMergedEvent(
-                              override val change: GerritChange,
-                              override val patchSet: GerritPatchSet,
-                              submitter: GerritAccount,
-                              override val newRev: String,
-                              override val eventCreatedOn: Long,
-                              changeKey: GerritChangeKey
-                            ) extends GerritChangeBasedEvent {
-  override val `type`: String = "change-merged"
-
-  override def account: GerritAccount = submitter
-
-  def modifiedProject: String = change.project
-
-  def modifiedRef: String = patchSet.ref
-}
-
-case class GerritRefUpdate(project: String, refName: String, oldRev: String, newRev: String)
-
-case class RefUpdatedEvent(
-                            refUpdate: GerritRefUpdate,
-                            submitter: Option[GerritAccount],
-                            override val eventCreatedOn: Long
-                          ) extends GerritRefHasNewRevisionEvent {
-  override val `type`: String = "ref-updated"
-
-  override def account: GerritAccount = submitter.getOrElse(GerritAccount.NoAccountInfo)
-
-  def modifiedProject: String = refUpdate.project
-
-  def modifiedRef: String = refUpdate.refName
-
-  def newRev: String = refUpdate.newRev
-}
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
index 996a553..dd9f6bb 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
@@ -17,19 +17,15 @@
 
 import java.time.LocalDate
 
-import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations.NotParsableJsonEvent
-import com.gerritforge.analytics.gitcommits.engine.events.{AggregationStrategy, EventParser, GerritJsonEvent}
 import com.gerritforge.analytics.gitcommits.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
 import com.gerritforge.analytics.spark.SparkApp
 import com.gerritforge.analytics.support.ops.ReadsOps._
 import com.typesafe.scalalogging.LazyLogging
 import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import scopt.OptionParser
 
 import scala.io.Codec
-import scala.util.{Failure, Success}
 
 object Main extends App with SparkApp with Job with LazyLogging with FetchRemoteProjects {
   override val appName = "Gerrit GitCommits Analytics ETL"
@@ -70,12 +66,6 @@
       opt[String]('a', "email-aliases") optional () validate fileExists action { (path, c) =>
         c.copy(emailAlias = Some(path))
       } text "\"emails to author alias\" input data path"
-      opt[String]("events") optional () action { (eventsPath, config) =>
-        config.copy(eventsPath = Some(eventsPath))
-      } text "location where to load the Gerrit Events"
-      opt[String]("writeNotProcessedEventsTo") optional () action { (failedEventsPath, config) =>
-        config.copy(eventsFailureOutputPath = Some(failedEventsPath))
-      } text "location where to write a TSV file containing the events we couldn't process with a description fo the reason why"
 
       opt[String]("username") optional () action { (input, c) =>
         c.copy(username = Some(input))
@@ -121,22 +111,9 @@
 
   def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
     import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations._
-    import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations._
 
     implicit val sc: SparkContext = spark.sparkContext
 
-    val aggregationStrategy: AggregationStrategy =
-      config.aggregate
-        .flatMap { agg =>
-          AggregationStrategy.byName(agg) match {
-            case Success(strategy) => Some(strategy)
-            case Failure(e) =>
-              logger.warn(s"Invalid aggregation strategy '$agg", e)
-              None
-          }
-        }
-        .getOrElse(AggregationStrategy.aggregateByEmail)
-
     val projects = fetchProjects(config)
 
     logger.info(
@@ -145,78 +122,10 @@
 
     val aliasesDF = getAliasDF(config.emailAlias)
 
-    val events = loadEvents
-
-    val failedEvents: RDD[NotParsableJsonEvent] = events.collect {
-      case Left(eventFailureDescription) => eventFailureDescription
-    }
-
-    if (!failedEvents.isEmpty()) {
-      config.eventsFailureOutputPath.foreach { failurePath =>
-        logger.info(s"Events failures will be stored at '$failurePath'")
-
-        import spark.implicits._
-        failedEvents.toDF().write.option("sep", "\t").option("header", true).csv(failurePath)
-      }
-    }
-
-    //We might want to use the time of the events as information to feed to the collection of data from the repository
-    val repositoryAlteringEvents = events.collect { case Right(event) => event }.repositoryWithNewRevisionEvents
-
-    val firstEventDateMaybe: Option[LocalDate] =
-      if (repositoryAlteringEvents.isEmpty()) None
-      else Some(repositoryAlteringEvents.earliestEventTime.toLocalDate)
-
-    val configWithOverriddenUntil = firstEventDateMaybe.fold(config) { firstEventDate =>
-      val lastAggregationDate = firstEventDate.plusMonths(1)
-      if (lastAggregationDate.isBefore(LocalDate.now())) {
-        logger.info(
-          s"Overriding 'until' date '${config.until}' with '$lastAggregationDate' since events are available until $firstEventDate")
-        config.copy(until = Some(lastAggregationDate))
-      } else {
-        config
-      }
-    }
-
-    val statsFromAnalyticsPlugin =
-      getContributorStatsFromAnalyticsPlugin(spark.sparkContext.parallelize(projects),
-                                             configWithOverriddenUntil.contributorsUrl,
+    val contributorsStats = getContributorStats(spark.sparkContext.parallelize(projects),
+                                             config.contributorsUrl,
                                              config.gerritApiConnection)
-
-    val statsFromEvents = getContributorStatsFromGerritEvents(
-      repositoryAlteringEvents,
-      statsFromAnalyticsPlugin.commitSet.rdd,
-      aggregationStrategy)
-
-    val mergedEvents = if (statsFromEvents.head(1).isEmpty) {
-      statsFromAnalyticsPlugin
-    } else {
-      require(
-        statsFromAnalyticsPlugin.schema == statsFromEvents.schema,
-        s""" Schemas from the stats collected from events and from the analytics datasets differs!!
-           | From analytics plugin: ${statsFromAnalyticsPlugin.schema}
-           | From gerrit events: ${statsFromEvents.schema}
-      """.stripMargin
-      )
-
-      (statsFromAnalyticsPlugin union statsFromEvents)
-    }
-
-    mergedEvents.dashboardStats(aliasesDF)
-  }
-
-  def loadEvents(
-      implicit config: GerritEndpointConfig,
-      spark: SparkSession): RDD[Either[NotParsableJsonEvent, GerritJsonEvent]] = { // toDF
-    import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations._
-
-    config.eventsPath.fold(
-      spark.sparkContext.emptyRDD[Either[NotParsableJsonEvent, GerritJsonEvent]]) { eventsPath =>
-      spark.read
-        .textFile(eventsPath)
-        .rdd
-        .parseEvents(EventParser)
-    }
+    contributorsStats.dashboardStats(aliasesDF)
   }
 
   def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala
index 99de66d..4b5b9fc 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/model/GerritEndpointConfig.scala
@@ -29,8 +29,6 @@
     until: Option[LocalDate] = None,
     aggregate: Option[String] = None,
     emailAlias: Option[String] = None,
-    eventsPath: Option[String] = None,
-    eventsFailureOutputPath: Option[String] = None,
     username: Option[String] = None,
     password: Option[String] = None,
     ignoreSSLCert: Option[Boolean] = None,
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
index bf42ee2..fccfc81 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
@@ -3,7 +3,6 @@
 import java.sql.Timestamp
 import java.time.LocalDate
 
-import com.gerritforge.analytics.gitcommits.engine.events.AggregationStrategy
 import com.gerritforge.analytics.gitcommits.job.{FetchProjects, Job}
 import com.gerritforge.analytics.gitcommits.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
 import com.google.gerrit.server.project.ProjectControl
@@ -11,10 +10,9 @@
 import com.google.inject.Inject
 import com.typesafe.scalalogging.LazyLogging
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
+import org.apache.spark.sql.{SQLContext, SparkSession}
 import org.kohsuke.args4j.{Argument, Option => ArgOption}
 
-import scala.io.Source
 import scala.util.{Failure, Success}
 
 @CommandMetaData(name = "processGitCommits",
diff --git a/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformationsSpec.scala b/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformationsSpec.scala
deleted file mode 100644
index df3d1f5..0000000
--- a/gitcommits/src/test/scala/com/gerritforge/analytics/gitcommits/engine/events/GerritEventsTransformationsSpec.scala
+++ /dev/null
@@ -1,420 +0,0 @@
-// Copyright (C) 2018 GerritForge Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.gerritforge.analytics.gitcommits.engine.events
-import com.gerritforge.analytics.SparkTestSupport
-import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations.{CommitInfo, UserActivitySummary}
-import com.gerritforge.analytics.gitcommits.engine.events.GerritEventsTransformations.NotParsableJsonEvent
-import org.apache.spark.rdd.RDD
-import org.scalatest.{Inside, Matchers, WordSpec}
-
-class GerritEventsTransformationsSpec
-    extends WordSpec
-    with Matchers
-    with SparkTestSupport
-    with Inside
-    with EventFixture {
-
-  "tryParseGerritTriggeredEvent" should {
-
-    implicit val eventParser: GerritJsonEventParser = EventParser
-
-    "Parse a correctly formed event" in new EventFixture {
-      GerritEventsTransformations.tryParseGerritTriggeredEvent(refUpdated.json) shouldBe Right(
-        refUpdated.event)
-    }
-
-    "Return a description of the failure in with the original event source a Left object if the JSON provided is invalid" in {
-      val invalidJson = "invalid json string"
-      GerritEventsTransformations.tryParseGerritTriggeredEvent(invalidJson) shouldBe Left(
-        NotParsableJsonEvent(invalidJson, "unknown token i - Near: i"))
-    }
-
-    "Return a description of the failure with the original event source  in a Left object if the JSON event is not supported" in {
-      val unsupportedEvent = """{"type":"ref-updated-UNSUPPORTED","eventCreatedOn":1516531868}"""
-
-      GerritEventsTransformations.tryParseGerritTriggeredEvent(unsupportedEvent) shouldBe Left(
-        NotParsableJsonEvent(unsupportedEvent, "Unsupported event type 'ref-updated-UNSUPPORTED'"))
-    }
-  }
-
-  "PimpedJsonRDD" should {
-    "Convert an RDD of JSON events into an RDD of events or unparsed json strings" in {
-      val jsonRdd = sc.parallelize(Seq(refUpdated.json, changeMerged.json, "invalid json string"))
-
-      import GerritEventsTransformations._
-
-      jsonRdd.parseEvents(EventParser).collect() should contain only (
-        Right(refUpdated.event),
-        Right(changeMerged.event),
-        Left(NotParsableJsonEvent("invalid json string", "unknown token i - Near: i"))
-      )
-    }
-  }
-
-  "extractUserActivitySummary" should {
-    implicit val eventParser: GerritJsonEventParser = EventParser
-
-    "Build one UserActivitySummary object if given a series of non-merge commits" in {
-      val events: Seq[ChangeMergedEvent] = Seq(
-        aChangeMergedEvent("1",
-                           1001l,
-                           newRev = "rev1",
-                           insertions = 2,
-                           deletions = 1,
-                           branch = "stable-1.14"),
-        aChangeMergedEvent("2",
-                           1002l,
-                           newRev = "rev2",
-                           insertions = 3,
-                           deletions = 0,
-                           branch = "stable-1.14"),
-        aChangeMergedEvent("3",
-                           1003l,
-                           newRev = "rev3",
-                           insertions = 1,
-                           deletions = 4,
-                           branch = "stable-1.14"),
-        aChangeMergedEvent("4",
-                           1004l,
-                           newRev = "rev4",
-                           insertions = 0,
-                           deletions = 2,
-                           branch = "stable-1.14"),
-        aChangeMergedEvent("5",
-                           1005l,
-                           newRev = "rev5",
-                           insertions = 1,
-                           deletions = 1,
-                           branch = "stable-1.14")
-      ).map(_.event)
-
-      val summaries: Iterable[UserActivitySummary] =
-        GerritEventsTransformations.extractUserActivitySummary(
-          changes = events,
-          year = 2018,
-          month = 1,
-          day = 10,
-          hour = 1
-        )
-
-      summaries should have size 1
-
-      inside(summaries.head) {
-        case UserActivitySummary(year,
-                                 month,
-                                 day,
-                                 hour,
-                                 name,
-                                 email,
-                                 num_commits,
-                                 _,
-                                 _,
-                                 added_lines,
-                                 deleted_lines,
-                                 commits,
-                                 branches,
-                                 last_commit_date,
-                                 is_merge) =>
-          year shouldBe 2018
-          month shouldBe 1
-          day shouldBe 10
-          hour shouldBe 1
-          name shouldBe "Administrator"
-          email shouldBe "admin@example.com"
-          num_commits shouldBe events.size
-          last_commit_date shouldBe 1005000l
-          is_merge shouldBe false
-          added_lines shouldBe 7
-          deleted_lines shouldBe 8
-          commits should contain only (
-            CommitInfo("rev1", 1001000l, false),
-            CommitInfo("rev2", 1002000l, false),
-            CommitInfo("rev3", 1003000l, false),
-            CommitInfo("rev4", 1004000l, false),
-            CommitInfo("rev5", 1005000l, false)
-          )
-          branches should contain only "stable-1.14"
-      }
-    }
-
-    "Build two UserActivitySummaries object if given a mixed series of merge and non merge commits" in {
-      val events: Seq[ChangeMergedEvent] = Seq(
-        aChangeMergedEvent("1", 1001l, newRev = "rev1", isMergeCommit = true),
-        aChangeMergedEvent("2", 1002l, newRev = "rev2"),
-        aChangeMergedEvent("3", 1003l, newRev = "rev3", isMergeCommit = true),
-        aChangeMergedEvent("4", 1004l, newRev = "rev4"),
-        aChangeMergedEvent("5", 1005l, newRev = "rev5")
-      ).map(_.event)
-
-      val summaries: Iterable[UserActivitySummary] =
-        GerritEventsTransformations.extractUserActivitySummary(
-          changes = events,
-          year = 2018,
-          month = 1,
-          day = 10,
-          hour = 1
-        )
-
-      summaries should have size 2
-
-      summaries.foreach { summary =>
-        inside(summary) {
-          case UserActivitySummary(year,
-                                   month,
-                                   day,
-                                   hour,
-                                   name,
-                                   email,
-                                   _,
-                                   _,
-                                   _,
-                                   _,
-                                   _,
-                                   _,
-                                   _,
-                                   _,
-                                   _) =>
-            year shouldBe 2018
-            month shouldBe 1
-            day shouldBe 10
-            hour shouldBe 1
-            name shouldBe "Administrator"
-            email shouldBe "admin@example.com"
-        }
-      }
-
-      summaries.foreach { summary =>
-        inside(summary) {
-          case UserActivitySummary(_,
-                                   _,
-                                   _,
-                                   _,
-                                   _,
-                                   _,
-                                   num_commits,
-                                   _,
-                                   _,
-                                   _,
-                                   _,
-                                   commits,
-                                   _,
-                                   last_commit_date,
-                                   false) =>
-            num_commits shouldBe 3
-            last_commit_date shouldBe 1005000l
-            commits should contain only (
-              CommitInfo("rev2", 1002000l, false),
-              CommitInfo("rev4", 1004000l, false),
-              CommitInfo("rev5", 1005000l, false)
-            )
-
-          case UserActivitySummary(_,
-                                   _,
-                                   _,
-                                   _,
-                                   _,
-                                   _,
-                                   num_commits,
-                                   _,
-                                   _,
-                                   _,
-                                   _,
-                                   commits,
-                                   _,
-                                   last_commit_date,
-                                   true) =>
-            num_commits shouldBe 2
-            last_commit_date shouldBe 1003000l
-            commits should contain only (
-              CommitInfo("rev1", 1001000l, true),
-              CommitInfo("rev3", 1003000l, true)
-            )
-        }
-      }
-
-    }
-  }
-
-  "Pimped Per Project UserActivitySummary RDD" should {
-    "Allow conversion to a DataFrame equivalent to what extracted from the Analytics plugin" in {
-      import GerritEventsTransformations._
-      import com.gerritforge.analytics.gitcommits.engine.GerritAnalyticsTransformations._
-      import spark.implicits._
-
-      val aliasDF = sc
-        .parallelize(
-          Seq(
-            ("stefano_alias", "stefano@galarraga-org.com", "")
-          ))
-        .toDF("author", "email", "organization")
-
-      val expectedDate = System.currentTimeMillis
-
-      val analyticsJobOutput =
-        sc.parallelize(
-            Seq(
-              "project1" -> UserActivitySummary(
-                2018,
-                1,
-                20,
-                10,
-                "Stefano",
-                "stefano@galarraga-org.com",
-                1,
-                2,
-                1,
-                10,
-                4,
-                Array(CommitInfo("sha1", expectedDate, false)),
-                Array("master", "stable-2.14"),
-                expectedDate,
-                false
-              )
-            ))
-          .asEtlDataFrame(sql)
-          .addOrganization()
-          .handleAliases(Some(aliasDF))
-          .dropCommits
-
-      val expected = sc
-        .parallelize(
-          Seq(
-            ("project1",
-             "stefano_alias",
-             "stefano@galarraga-org.com",
-             2018,
-             1,
-             20,
-             10,
-             2,
-             1,
-             10,
-             4,
-             1,
-             expectedDate,
-             false,
-             Array("master", "stable-2.14"),
-             "galarraga-org")
-          ))
-        .toDF(
-          "project",
-          "author",
-          "email",
-          "year",
-          "month",
-          "day",
-          "hour",
-          "num_files",
-          "num_distinct_files",
-          "added_lines",
-          "deleted_lines",
-          "num_commits",
-          "last_commit_date",
-          "is_merge",
-          "branches",
-          "organization"
-        )
-
-      val collected = analyticsJobOutput.collect()
-
-      collected should contain theSameElementsAs expected.collect()
-    }
-  }
-
-  "removeEventsForCommits" should {
-    "remove any event modifying a gerrit repo to go toward the commits to be excluded" in {
-      import GerritEventsTransformations._
-
-      val toKeep1 = aRefUpdatedEvent("oldRev", "RevToKeep")
-      val toKeep2 = aChangeMergedEvent(changeId = "changeId2", newRev = "RevToKeep2")
-      val events: RDD[GerritRefHasNewRevisionEvent] = sc.parallelize(
-        Seq(
-          aRefUpdatedEvent("oldRev", "RevToExclude1"),
-          toKeep1,
-          aRefUpdatedEvent("oldRev", "RevToExclude2"),
-          aChangeMergedEvent(changeId = "changeId1", newRev = "RevToExclude3"),
-          toKeep2
-        ).map(_.event))
-
-      events
-        .removeEventsForCommits(
-          sc.parallelize(Seq("RevToExclude1", "RevToExclude2", "RevToExclude3")))
-        .collect() should contain only (toKeep1.event, toKeep2.event)
-    }
-  }
-}
-
-trait EventFixture {
-
-  // Forcing early type failures
-  case class JsonEvent[T <: GerritJsonEvent](json: String) {
-    val event: T = EventParser.fromJson(json).get.asInstanceOf[T]
-  }
-
-  val refUpdated: JsonEvent[RefUpdatedEvent] = aRefUpdatedEvent(
-    oldRev = "863b64002f2a9922deba69407804a44703c996e0",
-    newRev = "d3131be8d7c920badd28b70d8c039682568c8de5")
-
-  val changeMerged: JsonEvent[ChangeMergedEvent] = aChangeMergedEvent(
-    "I5e6b5a3bbe8a29fb0393e4a28da536e0a198b755")
-
-  def aRefUpdatedEvent(oldRev: String, newRev: String, createdOn: Long = 1000l) =
-    JsonEvent[RefUpdatedEvent](
-      s"""{"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"},
-       | "refUpdate":{"oldRev":"$oldRev",
-       | "newRev":"$newRev",
-       | "refName": "refs/heads/master","project":"subcut"},
-       |"type":"ref-updated","eventCreatedOn":$createdOn}""".stripMargin)
-
-  def aChangeMergedEvent(changeId: String,
-                         createdOnInSecs: Long = 1000l,
-                         newRev: String = "863b64002f2a9922deba69407804a44703c996e0",
-                         isMergeCommit: Boolean = false,
-                         insertions: Integer = 0,
-                         deletions: Integer = 0,
-                         branch: String = "master") =
-    JsonEvent[ChangeMergedEvent](
-      s"""{
-       |"submitter":{"name":"Administrator","email":"admin@example.com","username":"admin"},
-       |"newRev":"$newRev",
-       |"patchSet":{
-       | "number":1,
-       | "revision":"$newRev",
-       | "parents": ${if (isMergeCommit)
-           """["4a4e59272f1f88824d805c0f4233c1ee7331e986", "4a4e59272f1f88824d805c0f4233c1ee7331e987"]"""
-         else """["4a4e59272f1f88824d805c0f4233c1ee7331e986"]"""},
-       | "ref":"refs/changes/01/1/1",
-       | "uploader":{"name":"Administrator","email":"admin@example.com","username":"admin"},
-       | "createdOn":1516530259,
-       | "author":{"name":"Stefano Galarraga","email":"galarragas@gmail.com","username":""},
-       | "isDraft":false,
-       | "kind":"REWORK",
-       | "sizeInsertions":$insertions,
-       | "sizeDeletions":$deletions
-       |},
-       |"change":{
-       | "project":"subcut","branch":"$branch","topic":"TestEvents","id":"$changeId","number":1,"subject":"Generating some changes to test events",
-       | "owner":{"name":"Administrator","email":"admin@example.com","username":"admin"},
-       | "url":"http://842860da5b33:8080/1","commitMessage":"Generating some changes to test events Change-Id: $changeId",
-       | "createdOn":1516530259,"status":"MERGED"
-       |},
-       |"project":{"name":"subcut"},
-       |"refName":"refs/heads/$branch",
-       |"changeKey":{"id":"$changeId"},
-       |"type":"change-merged",
-       |"eventCreatedOn": $createdOnInSecs
-       |}""".stripMargin)
-
-}