blob: 9ae3b0cfcb092887f22d8228138fc220fd21fdc4 [file] [log] [blame]
// Copyright (C) 2018 GerritForge Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.gerritforge.analytics.auditlog.spark.dataframe.ops
import com.gerritforge.analytics.auditlog.broadcast.{AdditionalUsersInfo, GerritProjects, GerritUserIdentifiers}
import com.gerritforge.analytics.auditlog.spark.sql.udf.SparkExtractors.{extractCommandArgumentsUDF, extractCommandUDF, extractSubCommandUDF}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{udf, _}
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.Try
object DataFrameOps {
implicit class PimpedAuditLogDataFrame(dataFrame: DataFrame) {
private def hasColumn(path: String) = Try(dataFrame(path)).isSuccess
private def ifExistThenGetOrNull(column: String, targetColumn: => Column) = if (hasColumn(column)) targetColumn else lit(null)
def hydrateWithUserIdentifierColumn(userIdentifierCol: String, gerritAccounts: GerritUserIdentifiers): DataFrame = {
def extractIdentifier: UserDefinedFunction = udf((who: Int) => gerritAccounts.getIdentifier(who))
dataFrame.withColumn(userIdentifierCol, ifExistThenGetOrNull("who", extractIdentifier(col("who"))))
}
def withTimeBucketColumn(timeBucketCol: String, timeAggregation: String): DataFrame = {
dataFrame
.withColumn(timeBucketCol, date_trunc(format=timeAggregation, from_unixtime(col("time_at_start").divide(1000))))
}
def withCommandColumns(commandCol: String, commandArgsCol: String): DataFrame = {
dataFrame
.withColumn(commandCol,
extractCommandUDF(
col("what"),
col("access_path"),
ifExistThenGetOrNull("http_method", col("http_method"))))
.withColumn(commandArgsCol, extractCommandArgumentsUDF(col("what"), col("access_path")))
}
def withSubCommandColumns(subCommandCol: String): DataFrame = {
dataFrame.withColumn(subCommandCol,
extractSubCommandUDF(
col("what"),
col("access_path")
)
)
}
def withProjectColumn(projectCol: String, gerritProjects: GerritProjects): DataFrame = {
def extractProjectUDF: UserDefinedFunction = udf((what: String, accessPath: String) => gerritProjects.extractProject(what, accessPath))
dataFrame
.withColumn(projectCol, extractProjectUDF(col("what"), col("access_path")))
}
def withUserTypeColumn(commandCol: String, additionalUsersInfo: AdditionalUsersInfo): DataFrame = {
def extractUserType: UserDefinedFunction = udf((who: Int) => additionalUsersInfo.getUserType(who))
dataFrame.withColumn(commandCol, ifExistThenGetOrNull("who", extractUserType(col("who"))))
}
def aggregateNumEventsColumn(numEventsCol: String, cols: List[String]): DataFrame = {
dataFrame.groupBy(cols.map(c => col(c)): _*)
.agg(count("*")
.alias(numEventsCol))
}
}
}