Create separate package for date and time operations

- Move date time operations into its own package
- Test date time opeartions
- Create reusable and tested date time formatters

Bug: Issue 8833
Change-Id: Iba2581bee0819d5c446c2ddf279f5d9bc3a500a9
diff --git a/src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala b/src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala
index 9b5990a..95aa113 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala
@@ -14,8 +14,10 @@
 
 package com.gerritforge.analytics.engine.events
 
-import java.text.{DateFormat, SimpleDateFormat}
-import java.time.{LocalDateTime, ZoneOffset}
+import java.text.DateFormat
+import java.time.LocalDateTime
+
+import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.{AnalyticsDateTimeFormater, CommonTimeOperations}
 
 import scala.util.Try
 
@@ -25,7 +27,7 @@
   case class DateTimeParts(year: Integer, month: Integer, day: Integer, hour: Integer)
 
   def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts = {
-    val date = LocalDateTime.ofEpochSecond(event.eventCreatedOn, 0, ZoneOffset.UTC)
+    val date: LocalDateTime = CommonTimeOperations.utcDateTimeFromEpoch(event.eventCreatedOn)
     DateTimeParts(date.getYear, date.getMonthValue, date.getDayOfMonth, date.getHour)
   }
 }
@@ -58,27 +60,28 @@
   }
 
   object aggregateByEmailAndHour extends EmailAndTimeBasedAggregation {
-    val dateFormat = new SimpleDateFormat("yyyyMMddHH")
+    val dateFormat = AnalyticsDateTimeFormater.yyyyMMddHH
   }
 
   object aggregateByEmailAndDay extends EmailAndTimeBasedAggregation {
-    val dateFormat = new SimpleDateFormat("yyyyMMdd")
+    val dateFormat = AnalyticsDateTimeFormater.yyyyMMdd
 
     override def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts =
       super.decomposeTimeOfAggregatedEvent(event).copy(hour = 0)
   }
 
   object aggregateByEmailAndMonth extends EmailAndTimeBasedAggregation {
-    val dateFormat = new SimpleDateFormat("yyyyMM")
+    val dateFormat = AnalyticsDateTimeFormater.yyyyMM
 
     override def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts =
       super.decomposeTimeOfAggregatedEvent(event).copy(day = 0, hour = 0)
   }
 
   object aggregateByEmailAndYear extends EmailAndTimeBasedAggregation {
-    val dateFormat = new SimpleDateFormat("yyyy")
+    val dateFormat = AnalyticsDateTimeFormater.yyyy
 
     override def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts =
       super.decomposeTimeOfAggregatedEvent(event).copy(month = 0, day = 0, hour = 0)
   }
+
 }
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index dc7be1a..3d1c4d4 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -14,12 +14,13 @@
 
 package com.gerritforge.analytics.job
 
-import java.time.format.DateTimeFormatter
-import java.time.{LocalDate, ZoneId}
+import java.time.LocalDate
 
 import com.gerritforge.analytics.engine.events.GerritEventsTransformations.NotParsableJsonEvent
 import com.gerritforge.analytics.engine.events.{AggregationStrategy, EventParser, GerritJsonEvent}
 import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjectsSupport}
+import com.gerritforge.analytics.support.ops.AnalyticsTimeOps
+import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
 import com.typesafe.scalalogging.LazyLogging
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
@@ -41,13 +42,14 @@
     }
   }
 
-  implicit val localDateRead: Read[LocalDate] = reads { str =>
-    val format = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"))
+  implicit val localDateRead: Read[LocalDate] = reads { dateStr =>
+    val cliDateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd
     try {
-      LocalDate.parse(str, format)
+      import AnalyticsTimeOps.implicits._
+      dateStr.parseStringToLocalDate(cliDateFormat).get
     } catch {
       case NonFatal(e) =>
-        throw new IllegalArgumentException(s"Invalid date '$str' expected format is '$format'")
+        throw new IllegalArgumentException(s"Invalid date '$dateStr' expected format is '${cliDateFormat}'")
     }
   }
 
@@ -81,7 +83,7 @@
     opt[String]("events") optional() action { (eventsPath, config) =>
       config.copy(eventsPath = Some(eventsPath))
     } text "location where to load the Gerrit Events"
-    opt[String]("writeNotProcessedEventsTo") optional() action{ (failedEventsPath, config) =>
+    opt[String]("writeNotProcessedEventsTo") optional() action { (failedEventsPath, config) =>
       config.copy(eventsFailureOutputPath = Some(failedEventsPath))
     } text "location where to write a TSV file containing the events we couldn't process with a description fo the reason why"
   }
@@ -107,7 +109,8 @@
   }
 }
 
-trait Job { self: LazyLogging =>
+trait Job {
+  self: LazyLogging =>
   implicit val codec = Codec.ISO8859
 
   def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
@@ -128,7 +131,7 @@
 
     val projects = GerritProjectsSupport.parseJsonProjectListResponse(Source.fromURL(config.gerritProjectsUrl))
 
-    logger.info(s"Loaded a list of ${projects.size} projects ${if(projects.size > 20) projects.take(20).mkString("[", ",", ", ...]") else projects.mkString("[", ",", "]")}")
+    logger.info(s"Loaded a list of ${projects.size} projects ${if (projects.size > 20) projects.take(20).mkString("[", ",", ", ...]") else projects.mkString("[", ",", "]")}")
 
     val aliasesDF = getAliasDF(config.emailAlias)
 
@@ -136,7 +139,7 @@
 
     val failedEvents: RDD[NotParsableJsonEvent] = events.collect { case Left(eventFailureDescription) => eventFailureDescription }
 
-    if(!failedEvents.isEmpty()) {
+    if (!failedEvents.isEmpty()) {
       config.eventsFailureOutputPath.foreach { failurePath =>
         logger.info(s"Events failures will be stored at '$failurePath'")
 
@@ -148,11 +151,11 @@
     //We might want to use the time of the events as information to feed to the collection of data from the repository
     val repositoryAlteringEvents = events.collect { case Right(event) => event }.repositoryWithNewRevisionEvents
 
-    val firstEventDateMaybe: Option[LocalDate] = if(repositoryAlteringEvents.isEmpty()) None else Some(repositoryAlteringEvents.earliestEventTime.toLocalDate)
+    val firstEventDateMaybe: Option[LocalDate] = if (repositoryAlteringEvents.isEmpty()) None else Some(repositoryAlteringEvents.earliestEventTime.toLocalDate)
 
     val configWithOverriddenUntil = firstEventDateMaybe.fold(config) { firstEventDate =>
       val lastAggregationDate = firstEventDate.plusMonths(1)
-      if(lastAggregationDate.isBefore(LocalDate.now())) {
+      if (lastAggregationDate.isBefore(LocalDate.now())) {
         logger.info(s"Overriding 'until' date '${config.until}' with '$lastAggregationDate' since events ara available until $firstEventDate")
         config.copy(until = Some(lastAggregationDate))
       } else {
@@ -167,17 +170,17 @@
 
     require(statsFromAnalyticsPlugin.schema == statsFromEvents.schema,
       s""" Schemas from the stats collected from events and from the analytics datasets differs!!
-        | From analytics plugin: ${statsFromAnalyticsPlugin.schema}
-        | From gerrit events: ${statsFromEvents.schema}
+         | From analytics plugin: ${statsFromAnalyticsPlugin.schema}
+         | From gerrit events: ${statsFromEvents.schema}
       """.stripMargin)
 
     (statsFromAnalyticsPlugin union statsFromEvents).dashboardStats(aliasesDF)
   }
 
-  def loadEvents(implicit config: GerritEndpointConfig, spark: SparkSession): RDD[Either[NotParsableJsonEvent,GerritJsonEvent]] = { // toDF
+  def loadEvents(implicit config: GerritEndpointConfig, spark: SparkSession): RDD[Either[NotParsableJsonEvent, GerritJsonEvent]] = { // toDF
     import com.gerritforge.analytics.engine.events.GerritEventsTransformations._
 
-    config.eventsPath.fold(spark.sparkContext.emptyRDD[Either[NotParsableJsonEvent,GerritJsonEvent]]) { eventsPath =>
+    config.eventsPath.fold(spark.sparkContext.emptyRDD[Either[NotParsableJsonEvent, GerritJsonEvent]]) { eventsPath =>
       spark
         .read.textFile(eventsPath).rdd
         .parseEvents(EventParser)
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
index 899e56a..c3ea2be 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
@@ -15,7 +15,9 @@
 package com.gerritforge.analytics.model
 
 import java.time.format.DateTimeFormatter
-import java.time.{LocalDate, ZoneId}
+import java.time.{LocalDate, ZoneOffset}
+
+import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
 
 case class GerritEndpointConfig(baseUrl: String = "",
                                 prefix: Option[String] = None,
@@ -38,7 +40,7 @@
   }
 
   @transient
-  private lazy val format = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"))
+  private lazy val format: DateTimeFormatter = AnalyticsDateTimeFormater.yyyy_MM_dd.withZone(ZoneOffset.UTC)
   val queryString = Seq("since" -> since.map(format.format), "until" -> until.map(format.format), "aggregate" -> aggregate)
     .flatMap(queryOpt).mkString("?", "&", "")
 
diff --git a/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala b/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
new file mode 100644
index 0000000..f99697f
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
@@ -0,0 +1,70 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+
+package com.gerritforge.analytics.support.ops
+
+import java.time.format.DateTimeFormatter
+import java.time.{LocalDateTime, ZoneOffset}
+
+package AnalyticsTimeOps {
+
+  import java.sql.Timestamp
+  import java.text.SimpleDateFormat
+  import java.time.{Instant, LocalDate, OffsetDateTime}
+
+  import scala.util.Try
+
+
+  object AnalyticsDateTimeFormater {
+
+    val yyyy_MM_dd_HHmmss_SSSSSSSSS: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS")
+    val yyyy_MM_dd: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
+
+    val yyyyMMddHH: SimpleDateFormat = new SimpleDateFormat("yyyyMMddHH")
+    val yyyyMMdd: SimpleDateFormat = new SimpleDateFormat("yyyyMMdd")
+    val yyyyMM: SimpleDateFormat = new SimpleDateFormat("yyyyMM")
+    val yyyy: SimpleDateFormat = new SimpleDateFormat("yyyy")
+  }
+
+  object CommonTimeOperations {
+    def nowEpoch: Long = Instant.now().getEpochSecond
+
+    def epochToSqlTimestampOps(epoch: Long) = new Timestamp(epoch)
+
+    def nowSqlTimestmap: Timestamp = epochToSqlTimestampOps(nowEpoch)
+
+    def utcDateTimeFromEpoch(epoch: Long): LocalDateTime = LocalDateTime.ofEpochSecond(epoch, 0, ZoneOffset.UTC)
+  }
+
+  object implicits {
+
+    implicit class LocalDateTimeOps(val localDateTime: LocalDateTime) extends AnyVal {
+      def convertToUTCEpochMillis: Long = localDateTime.atOffset(ZoneOffset.UTC).toInstant.toEpochMilli
+
+      def convertToUTCLocalDateTime: OffsetDateTime = localDateTime.atOffset(ZoneOffset.UTC)
+    }
+
+
+    implicit class StringToTimeParsingOps(val dateStr: String) extends AnyVal {
+      def parseStringToUTCEpoch(stringFormat: DateTimeFormatter): Option[Long] =
+        Try(LocalDateTime.parse(dateStr, stringFormat).convertToUTCEpochMillis).toOption
+
+      def parseStringToLocalDate(stringFormat: DateTimeFormatter): Option[LocalDate] =
+        Try(LocalDate.parse(dateStr, stringFormat)).toOption
+    }
+
+  }
+
+}
\ No newline at end of file
diff --git a/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala b/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala
new file mode 100644
index 0000000..c76efdb
--- /dev/null
+++ b/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala
@@ -0,0 +1,98 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.support.ops
+
+import java.time.{LocalDate, LocalDateTime, ZoneOffset}
+
+import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
+import org.scalatest.{FlatSpec, Matchers}
+
+class AnalyticsTimeOpsSpec extends FlatSpec with Matchers {
+
+
+  "String parser - Given a correct string and date format" should "return an epoch value" in {
+    val epochValueUTC =
+      LocalDateTime
+        .of(2018, 1, 1, 12, 0, 0, 0)
+        .atOffset(ZoneOffset.UTC)
+        .toInstant
+        .toEpochMilli
+
+    val stringDate = "2018-01-01 12:00:00.000000000"
+    val dateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd_HHmmss_SSSSSSSSS
+
+    import AnalyticsTimeOps.implicits._
+
+    stringDate.parseStringToUTCEpoch(dateFormat).get should equal(epochValueUTC)
+  }
+  "String parser - Given a correct string and date format" should "return also a local date" in {
+    val utcLocalDate: LocalDate =
+      LocalDate.of(2018, 1, 1)
+
+    val stringDate = "2018-01-01"
+    val dateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd
+
+    import AnalyticsTimeOps.implicits._
+
+    stringDate.parseStringToLocalDate(dateFormat).get should equal(utcLocalDate)
+  }
+
+
+  "String parser - An incorrect string a given format" should "return None" in {
+    val stringDate = "2018-01-01 12:00:00.000000000"
+    val dateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd
+
+    import AnalyticsTimeOps.implicits._
+    stringDate.parseStringToUTCEpoch(dateFormat) should equal(None)
+  }
+
+
+  "Simple Date Formats" should "convert to the correct strings" in {
+
+    val epochValueUTC =
+      LocalDateTime
+        .of(2018, 1, 1, 12, 0, 0, 0)
+        .atOffset(ZoneOffset.UTC)
+        .toInstant.toEpochMilli
+
+    val yyyyMMddHHStr = "2018010112"
+    val yyyyMMddStr = "20180101"
+    val yyyyMMStr = "201801"
+    val yyyyStr = "2018"
+
+    AnalyticsDateTimeFormater.yyyyMMddHH.format(epochValueUTC) should equal(yyyyMMddHHStr)
+    AnalyticsDateTimeFormater.yyyyMMdd.format(epochValueUTC) should equal(yyyyMMddStr)
+    AnalyticsDateTimeFormater.yyyyMM.format(epochValueUTC) should equal(yyyyMMStr)
+    AnalyticsDateTimeFormater.yyyy.format(epochValueUTC) should equal(yyyyStr)
+  }
+
+
+  "UTC conversion" should "check date operations return always UTC" in {
+    val dateTime =
+      LocalDateTime
+        .of(2018, 1, 1, 12, 0, 0, 0)
+
+    val etcDateTime = dateTime.atOffset(ZoneOffset.ofHours(9))
+    val utcDateTime = dateTime.atOffset(ZoneOffset.UTC)
+
+    import AnalyticsTimeOps.implicits._
+    dateTime.convertToUTCEpochMillis should equal(utcDateTime.toInstant.toEpochMilli)
+    dateTime.convertToUTCEpochMillis should not equal (etcDateTime.toInstant.toEpochMilli)
+
+    dateTime.convertToUTCLocalDateTime should equal(utcDateTime)
+    dateTime.convertToUTCLocalDateTime should not equal (etcDateTime)
+
+  }
+}