Keep dates in epocMillis in the output Row

Do not rely anymore on ElasticSearch auto-mapping feature and pass
dates as epocMillis without post-processing.

Cutting a post-processing unneeded step is going to be better
and safer, with less code and heap allocated.

Change-Id: Ibbc85b44c4e250fc63a792b41162394a38fd8a31
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index 8aa89d2..a34e702 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -139,11 +139,6 @@
         .getOrElse(df)
     }
 
-
-    def convertDates(columnName: String)(implicit spark: SparkSession): DataFrame = {
-      df.withColumn(columnName, longDateToISOUdf(col(columnName)))
-    }
-
     def dropCommits(implicit spark: SparkSession): DataFrame = {
       df.drop("commits")
     }
@@ -159,7 +154,6 @@
       df
         .addOrganization()
         .handleAliases(aliasesDFMaybe)
-        .convertDates("last_commit_date")
         .dropCommits
     }
   }
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 31d37c4..187b5ce 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -202,13 +202,14 @@
 
   it should "return correct columns when alias DF is not defined" in {
     import spark.implicits._
-    val inputSampleDF = sc.parallelize(Seq(
-      ("author_name", "email@mail.com", "an_organization")
-    )).toDF("author", "email", "organization")
+    val expectedTuple = ("author_name", "email@mail.com", "an_organization")
+    val inputSampleDF = sc.parallelize(Seq(expectedTuple)).toDF("author", "email", "organization")
+    val expectedRow = Row.fromTuple(expectedTuple)
 
     val df = inputSampleDF.handleAliases(None)
 
     df.schema.fields.map(_.name) should contain allOf("author", "email", "organization")
+    df.collect().head should be(expectedRow)
   }
 
   it should "lowercase aliased organizations" in {
@@ -265,27 +266,6 @@
     )
   }
 
-  "convertDates" should "process specific column from Long to ISO date" in {
-    // some notable dates converted in UnixMillisecs and ISO format
-    val DATES = Map(
-      0L -> "1970-01-01T00:00:00Z",
-      1500000000000L -> "2017-07-14T02:40:00Z",
-      1600000000000L -> "2020-09-13T12:26:40Z")
-    import sql.implicits._
-    val df = sc.parallelize(Seq(
-      ("a", 0L, 1),
-      ("b", 1500000000000L, 2),
-      ("c", 1600000000000L, 3))).toDF("name", "date", "num")
-
-    val converted = df
-      .convertDates("date")
-
-    converted.collect should contain allOf(
-      Row("a", DATES(0), 1),
-      Row("b", DATES(1500000000000L), 2),
-      Row("c", DATES(1600000000000L), 3)
-    )
-  }
 
   "extractCommitsPerProject" should "generate a Dataset with the all the SHA of commits with associated project" in {
     import sql.implicits._
diff --git a/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala
index bde0705..05b98dd 100644
--- a/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/engine/events/GerritEventsTransformationsSpec.scala
@@ -164,21 +164,20 @@
         ("stefano_alias", "stefano@galarraga-org.com", "")
       )).toDF("author", "email", "organization")
 
-      val expectedDate : ZonedDateTime = ZonedDateTime.now(ZoneId.of("UTC")).`with`(MILLI_OF_SECOND, 0).`with`(NANO_OF_SECOND, 0)
+      val expectedDate = System.currentTimeMillis
 
       val analyticsJobOutput =
         sc.parallelize(Seq(
-          "project1" -> UserActivitySummary(2018, 1, 20, 10, "Stefano", "stefano@galarraga-org.com", 1, 2, 1, 10, 4, Array(CommitInfo("sha1", expectedDate.toInstant.toEpochMilli, false)),
-            expectedDate.toInstant.toEpochMilli, false)
+          "project1" -> UserActivitySummary(2018, 1, 20, 10, "Stefano", "stefano@galarraga-org.com", 1, 2, 1, 10, 4, Array(CommitInfo("sha1", expectedDate, false)),
+            expectedDate, false)
         ))
           .asEtlDataFrame(sql)
           .addOrganization()
           .handleAliases(Some(aliasDF))
-          .convertDates("last_commit_date")
           .dropCommits
 
       val expected = sc.parallelize(Seq(
-        ("project1", "stefano_alias", "stefano@galarraga-org.com", 2018, 1, 20, 10, 2, 1, 10, 4, 1, expectedDate.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME), false, "galarraga-org")
+        ("project1", "stefano_alias", "stefano@galarraga-org.com", 2018, 1, 20, 10, 2, 1, 10, 4, 1, expectedDate, false, "galarraga-org")
       )).toDF("project", "author", "email", "year", "month", "day", "hour", "num_files", "num_distinct_files",
         "added_lines", "deleted_lines", "num_commits", "last_commit_date", "is_merge", "organization")