Create separate package for date and time operations
- Move date time operations into its own package
- Test date time opeartions
- Create reusable and tested date time formatters
Bug: Issue 8833
Change-Id: Iba2581bee0819d5c446c2ddf279f5d9bc3a500a9
diff --git a/src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala b/src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala
index 9b5990a..95aa113 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/events/AggregationStrategy.scala
@@ -14,8 +14,10 @@
package com.gerritforge.analytics.engine.events
-import java.text.{DateFormat, SimpleDateFormat}
-import java.time.{LocalDateTime, ZoneOffset}
+import java.text.DateFormat
+import java.time.LocalDateTime
+
+import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.{AnalyticsDateTimeFormater, CommonTimeOperations}
import scala.util.Try
@@ -25,7 +27,7 @@
case class DateTimeParts(year: Integer, month: Integer, day: Integer, hour: Integer)
def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts = {
- val date = LocalDateTime.ofEpochSecond(event.eventCreatedOn, 0, ZoneOffset.UTC)
+ val date: LocalDateTime = CommonTimeOperations.utcDateTimeFromEpoch(event.eventCreatedOn)
DateTimeParts(date.getYear, date.getMonthValue, date.getDayOfMonth, date.getHour)
}
}
@@ -58,27 +60,28 @@
}
object aggregateByEmailAndHour extends EmailAndTimeBasedAggregation {
- val dateFormat = new SimpleDateFormat("yyyyMMddHH")
+ val dateFormat = AnalyticsDateTimeFormater.yyyyMMddHH
}
object aggregateByEmailAndDay extends EmailAndTimeBasedAggregation {
- val dateFormat = new SimpleDateFormat("yyyyMMdd")
+ val dateFormat = AnalyticsDateTimeFormater.yyyyMMdd
override def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts =
super.decomposeTimeOfAggregatedEvent(event).copy(hour = 0)
}
object aggregateByEmailAndMonth extends EmailAndTimeBasedAggregation {
- val dateFormat = new SimpleDateFormat("yyyyMM")
+ val dateFormat = AnalyticsDateTimeFormater.yyyyMM
override def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts =
super.decomposeTimeOfAggregatedEvent(event).copy(day = 0, hour = 0)
}
object aggregateByEmailAndYear extends EmailAndTimeBasedAggregation {
- val dateFormat = new SimpleDateFormat("yyyy")
+ val dateFormat = AnalyticsDateTimeFormater.yyyy
override def decomposeTimeOfAggregatedEvent(event: GerritJsonEvent): DateTimeParts =
super.decomposeTimeOfAggregatedEvent(event).copy(month = 0, day = 0, hour = 0)
}
+
}
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index dc7be1a..3d1c4d4 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -14,12 +14,13 @@
package com.gerritforge.analytics.job
-import java.time.format.DateTimeFormatter
-import java.time.{LocalDate, ZoneId}
+import java.time.LocalDate
import com.gerritforge.analytics.engine.events.GerritEventsTransformations.NotParsableJsonEvent
import com.gerritforge.analytics.engine.events.{AggregationStrategy, EventParser, GerritJsonEvent}
import com.gerritforge.analytics.model.{GerritEndpointConfig, GerritProjectsSupport}
+import com.gerritforge.analytics.support.ops.AnalyticsTimeOps
+import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
import com.typesafe.scalalogging.LazyLogging
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
@@ -41,13 +42,14 @@
}
}
- implicit val localDateRead: Read[LocalDate] = reads { str =>
- val format = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"))
+ implicit val localDateRead: Read[LocalDate] = reads { dateStr =>
+ val cliDateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd
try {
- LocalDate.parse(str, format)
+ import AnalyticsTimeOps.implicits._
+ dateStr.parseStringToLocalDate(cliDateFormat).get
} catch {
case NonFatal(e) =>
- throw new IllegalArgumentException(s"Invalid date '$str' expected format is '$format'")
+ throw new IllegalArgumentException(s"Invalid date '$dateStr' expected format is '${cliDateFormat}'")
}
}
@@ -81,7 +83,7 @@
opt[String]("events") optional() action { (eventsPath, config) =>
config.copy(eventsPath = Some(eventsPath))
} text "location where to load the Gerrit Events"
- opt[String]("writeNotProcessedEventsTo") optional() action{ (failedEventsPath, config) =>
+ opt[String]("writeNotProcessedEventsTo") optional() action { (failedEventsPath, config) =>
config.copy(eventsFailureOutputPath = Some(failedEventsPath))
} text "location where to write a TSV file containing the events we couldn't process with a description fo the reason why"
}
@@ -107,7 +109,8 @@
}
}
-trait Job { self: LazyLogging =>
+trait Job {
+ self: LazyLogging =>
implicit val codec = Codec.ISO8859
def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
@@ -128,7 +131,7 @@
val projects = GerritProjectsSupport.parseJsonProjectListResponse(Source.fromURL(config.gerritProjectsUrl))
- logger.info(s"Loaded a list of ${projects.size} projects ${if(projects.size > 20) projects.take(20).mkString("[", ",", ", ...]") else projects.mkString("[", ",", "]")}")
+ logger.info(s"Loaded a list of ${projects.size} projects ${if (projects.size > 20) projects.take(20).mkString("[", ",", ", ...]") else projects.mkString("[", ",", "]")}")
val aliasesDF = getAliasDF(config.emailAlias)
@@ -136,7 +139,7 @@
val failedEvents: RDD[NotParsableJsonEvent] = events.collect { case Left(eventFailureDescription) => eventFailureDescription }
- if(!failedEvents.isEmpty()) {
+ if (!failedEvents.isEmpty()) {
config.eventsFailureOutputPath.foreach { failurePath =>
logger.info(s"Events failures will be stored at '$failurePath'")
@@ -148,11 +151,11 @@
//We might want to use the time of the events as information to feed to the collection of data from the repository
val repositoryAlteringEvents = events.collect { case Right(event) => event }.repositoryWithNewRevisionEvents
- val firstEventDateMaybe: Option[LocalDate] = if(repositoryAlteringEvents.isEmpty()) None else Some(repositoryAlteringEvents.earliestEventTime.toLocalDate)
+ val firstEventDateMaybe: Option[LocalDate] = if (repositoryAlteringEvents.isEmpty()) None else Some(repositoryAlteringEvents.earliestEventTime.toLocalDate)
val configWithOverriddenUntil = firstEventDateMaybe.fold(config) { firstEventDate =>
val lastAggregationDate = firstEventDate.plusMonths(1)
- if(lastAggregationDate.isBefore(LocalDate.now())) {
+ if (lastAggregationDate.isBefore(LocalDate.now())) {
logger.info(s"Overriding 'until' date '${config.until}' with '$lastAggregationDate' since events ara available until $firstEventDate")
config.copy(until = Some(lastAggregationDate))
} else {
@@ -167,17 +170,17 @@
require(statsFromAnalyticsPlugin.schema == statsFromEvents.schema,
s""" Schemas from the stats collected from events and from the analytics datasets differs!!
- | From analytics plugin: ${statsFromAnalyticsPlugin.schema}
- | From gerrit events: ${statsFromEvents.schema}
+ | From analytics plugin: ${statsFromAnalyticsPlugin.schema}
+ | From gerrit events: ${statsFromEvents.schema}
""".stripMargin)
(statsFromAnalyticsPlugin union statsFromEvents).dashboardStats(aliasesDF)
}
- def loadEvents(implicit config: GerritEndpointConfig, spark: SparkSession): RDD[Either[NotParsableJsonEvent,GerritJsonEvent]] = { // toDF
+ def loadEvents(implicit config: GerritEndpointConfig, spark: SparkSession): RDD[Either[NotParsableJsonEvent, GerritJsonEvent]] = { // toDF
import com.gerritforge.analytics.engine.events.GerritEventsTransformations._
- config.eventsPath.fold(spark.sparkContext.emptyRDD[Either[NotParsableJsonEvent,GerritJsonEvent]]) { eventsPath =>
+ config.eventsPath.fold(spark.sparkContext.emptyRDD[Either[NotParsableJsonEvent, GerritJsonEvent]]) { eventsPath =>
spark
.read.textFile(eventsPath).rdd
.parseEvents(EventParser)
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
index 899e56a..c3ea2be 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
@@ -15,7 +15,9 @@
package com.gerritforge.analytics.model
import java.time.format.DateTimeFormatter
-import java.time.{LocalDate, ZoneId}
+import java.time.{LocalDate, ZoneOffset}
+
+import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
case class GerritEndpointConfig(baseUrl: String = "",
prefix: Option[String] = None,
@@ -38,7 +40,7 @@
}
@transient
- private lazy val format = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"))
+ private lazy val format: DateTimeFormatter = AnalyticsDateTimeFormater.yyyy_MM_dd.withZone(ZoneOffset.UTC)
val queryString = Seq("since" -> since.map(format.format), "until" -> until.map(format.format), "aggregate" -> aggregate)
.flatMap(queryOpt).mkString("?", "&", "")
diff --git a/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala b/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
new file mode 100644
index 0000000..f99697f
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOps.scala
@@ -0,0 +1,70 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+
+package com.gerritforge.analytics.support.ops
+
+import java.time.format.DateTimeFormatter
+import java.time.{LocalDateTime, ZoneOffset}
+
+package AnalyticsTimeOps {
+
+ import java.sql.Timestamp
+ import java.text.SimpleDateFormat
+ import java.time.{Instant, LocalDate, OffsetDateTime}
+
+ import scala.util.Try
+
+
+ object AnalyticsDateTimeFormater {
+
+ val yyyy_MM_dd_HHmmss_SSSSSSSSS: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS")
+ val yyyy_MM_dd: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
+
+ val yyyyMMddHH: SimpleDateFormat = new SimpleDateFormat("yyyyMMddHH")
+ val yyyyMMdd: SimpleDateFormat = new SimpleDateFormat("yyyyMMdd")
+ val yyyyMM: SimpleDateFormat = new SimpleDateFormat("yyyyMM")
+ val yyyy: SimpleDateFormat = new SimpleDateFormat("yyyy")
+ }
+
+ object CommonTimeOperations {
+ def nowEpoch: Long = Instant.now().getEpochSecond
+
+ def epochToSqlTimestampOps(epoch: Long) = new Timestamp(epoch)
+
+ def nowSqlTimestmap: Timestamp = epochToSqlTimestampOps(nowEpoch)
+
+ def utcDateTimeFromEpoch(epoch: Long): LocalDateTime = LocalDateTime.ofEpochSecond(epoch, 0, ZoneOffset.UTC)
+ }
+
+ object implicits {
+
+ implicit class LocalDateTimeOps(val localDateTime: LocalDateTime) extends AnyVal {
+ def convertToUTCEpochMillis: Long = localDateTime.atOffset(ZoneOffset.UTC).toInstant.toEpochMilli
+
+ def convertToUTCLocalDateTime: OffsetDateTime = localDateTime.atOffset(ZoneOffset.UTC)
+ }
+
+
+ implicit class StringToTimeParsingOps(val dateStr: String) extends AnyVal {
+ def parseStringToUTCEpoch(stringFormat: DateTimeFormatter): Option[Long] =
+ Try(LocalDateTime.parse(dateStr, stringFormat).convertToUTCEpochMillis).toOption
+
+ def parseStringToLocalDate(stringFormat: DateTimeFormatter): Option[LocalDate] =
+ Try(LocalDate.parse(dateStr, stringFormat)).toOption
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala b/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala
new file mode 100644
index 0000000..c76efdb
--- /dev/null
+++ b/src/test/scala/com/gerritforge/analytics/support/ops/AnalyticsTimeOpsSpec.scala
@@ -0,0 +1,98 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.support.ops
+
+import java.time.{LocalDate, LocalDateTime, ZoneOffset}
+
+import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
+import org.scalatest.{FlatSpec, Matchers}
+
+class AnalyticsTimeOpsSpec extends FlatSpec with Matchers {
+
+
+ "String parser - Given a correct string and date format" should "return an epoch value" in {
+ val epochValueUTC =
+ LocalDateTime
+ .of(2018, 1, 1, 12, 0, 0, 0)
+ .atOffset(ZoneOffset.UTC)
+ .toInstant
+ .toEpochMilli
+
+ val stringDate = "2018-01-01 12:00:00.000000000"
+ val dateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd_HHmmss_SSSSSSSSS
+
+ import AnalyticsTimeOps.implicits._
+
+ stringDate.parseStringToUTCEpoch(dateFormat).get should equal(epochValueUTC)
+ }
+ "String parser - Given a correct string and date format" should "return also a local date" in {
+ val utcLocalDate: LocalDate =
+ LocalDate.of(2018, 1, 1)
+
+ val stringDate = "2018-01-01"
+ val dateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd
+
+ import AnalyticsTimeOps.implicits._
+
+ stringDate.parseStringToLocalDate(dateFormat).get should equal(utcLocalDate)
+ }
+
+
+ "String parser - An incorrect string a given format" should "return None" in {
+ val stringDate = "2018-01-01 12:00:00.000000000"
+ val dateFormat = AnalyticsDateTimeFormater.yyyy_MM_dd
+
+ import AnalyticsTimeOps.implicits._
+ stringDate.parseStringToUTCEpoch(dateFormat) should equal(None)
+ }
+
+
+ "Simple Date Formats" should "convert to the correct strings" in {
+
+ val epochValueUTC =
+ LocalDateTime
+ .of(2018, 1, 1, 12, 0, 0, 0)
+ .atOffset(ZoneOffset.UTC)
+ .toInstant.toEpochMilli
+
+ val yyyyMMddHHStr = "2018010112"
+ val yyyyMMddStr = "20180101"
+ val yyyyMMStr = "201801"
+ val yyyyStr = "2018"
+
+ AnalyticsDateTimeFormater.yyyyMMddHH.format(epochValueUTC) should equal(yyyyMMddHHStr)
+ AnalyticsDateTimeFormater.yyyyMMdd.format(epochValueUTC) should equal(yyyyMMddStr)
+ AnalyticsDateTimeFormater.yyyyMM.format(epochValueUTC) should equal(yyyyMMStr)
+ AnalyticsDateTimeFormater.yyyy.format(epochValueUTC) should equal(yyyyStr)
+ }
+
+
+ "UTC conversion" should "check date operations return always UTC" in {
+ val dateTime =
+ LocalDateTime
+ .of(2018, 1, 1, 12, 0, 0, 0)
+
+ val etcDateTime = dateTime.atOffset(ZoneOffset.ofHours(9))
+ val utcDateTime = dateTime.atOffset(ZoneOffset.UTC)
+
+ import AnalyticsTimeOps.implicits._
+ dateTime.convertToUTCEpochMillis should equal(utcDateTime.toInstant.toEpochMilli)
+ dateTime.convertToUTCEpochMillis should not equal (etcDateTime.toInstant.toEpochMilli)
+
+ dateTime.convertToUTCLocalDateTime should equal(utcDateTime)
+ dateTime.convertToUTCLocalDateTime should not equal (etcDateTime)
+
+ }
+}