blob: 01e6dc5388aac88b7c60ad698c26eee73583e9c4 [file] [log] [blame]
package com.gerritforge.analytics.engine
import java.io.{BufferedReader, IOException, InputStreamReader}
import java.net.URL
import java.nio.charset.StandardCharsets
import java.time.format.DateTimeFormatter
import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
import com.gerritforge.analytics.model.{GerritEndpointConfig, ProjectContribution, ProjectContributionSource}
import org.apache.spark.rdd.RDD
import org.json4s.JsonAST.{JField, JInt, JString}
import org.json4s.JsonDSL._
import org.json4s._
import org.json4s.native.JsonMethods._
import scala.collection.JavaConverters._
object GerritAnalyticsTrasformations {
private[analytics] def longDateToISO(in: Number): String =
ZonedDateTime.ofInstant(
LocalDateTime.ofEpochSecond(in.longValue() / 1000L, 0, ZoneOffset.UTC),
ZoneOffset.UTC, ZoneId.of("Z")
) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
private[analytics] val emailToDomain: PartialFunction[JValue,String] = {
case JString(email) if email.contains("@") && !email.endsWith("@") =>
email.split("@").last.toLowerCase
}
private[analytics] def transformLongDateToISO(in: String): JObject = {
parse(in).transformField {
case JField(fieldName, JInt(v)) if (fieldName=="date" || fieldName=="last_commit_date") =>
JField(fieldName, JString(longDateToISO(v)))
}.asInstanceOf[JObject]
}
private[analytics] def transformAddOrganization(in: JObject): JObject = {
Some(in \ "email")
.collect(emailToDomain)
.fold(in)(org => ("organization" -> org) ~ in)
}
def getFileContentAsProjectContributions(sourceUrl: String, projectName: String): Iterator[ProjectContribution] = {
val is = new URL(sourceUrl).openConnection.getInputStream
new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))
.lines.iterator().asScala
.filterNot(_.trim.isEmpty)
.map(transformLongDateToISO)
.map(transformAddOrganization)
.map(ProjectContribution(projectName, _))
}
implicit class PimpedRddOfProjectContributionSource(val rdd: RDD[ProjectContributionSource]) extends AnyVal {
def fetchContributors: RDD[ProjectContribution] = {
rdd.flatMap {
case ProjectContributionSource(projectName, sourceUrl) =>
try {
getFileContentAsProjectContributions(sourceUrl, projectName)
} catch {
case ioex: IOException => None
}
}
}
}
implicit class PimpedRddOfProjects(val rdd: RDD[String]) extends AnyVal {
def enrichWithSource(config: GerritEndpointConfig) = {
rdd.map { projectName =>
ProjectContributionSource(projectName, config.contributorsUrl(projectName))
}
}
}
implicit class PimpedRddOfProjects2Json(val rdd: RDD[ProjectContribution]) extends AnyVal {
def toJson() = {
rdd.map(pc =>
compact(render(
("project" -> pc.projectName) ~ pc.authorContribution)
)
)
}
}
}