blob: 9cdc721a97a5b254ea31e9f19683496090525766 [file] [log] [blame]
// Copyright (C) 2018 GerritForge Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.gerritforge.analytics.auditlog.spark.dataframe.ops
import com.gerritforge.analytics.auditlog.broadcast.{
AdditionalUsersInfo,
GerritProjects,
GerritUserIdentifiers
}
import com.gerritforge.analytics.auditlog.spark.sql.udf.SparkExtractors.{
extractCommandArgumentsUDF,
extractCommandUDF,
extractSubCommandUDF
}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{udf, _}
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.Try
object DataFrameOps {
implicit class PimpedAuditLogDataFrame(dataFrame: DataFrame) {
private def hasColumn(path: String) = Try(dataFrame(path)).isSuccess
private def ifExistThenGetOrNull(column: String, targetColumn: => Column) =
if (hasColumn(column)) targetColumn else lit(null)
def hydrateWithUserIdentifierColumn(
userIdentifierCol: String,
gerritAccounts: GerritUserIdentifiers
): DataFrame = {
def extractIdentifier: UserDefinedFunction =
udf((who: Int) => gerritAccounts.getIdentifier(who))
dataFrame.withColumn(
userIdentifierCol,
ifExistThenGetOrNull("who", extractIdentifier(col("who")))
)
}
def withTimeBucketColumn(timeBucketCol: String, timeAggregation: String): DataFrame = {
dataFrame
.withColumn(
timeBucketCol,
date_trunc(format = timeAggregation, from_unixtime(col("time_at_start").divide(1000)))
)
}
def withCommandColumns(commandCol: String, commandArgsCol: String): DataFrame = {
dataFrame
.withColumn(
commandCol,
extractCommandUDF(
col("what"),
col("access_path"),
ifExistThenGetOrNull("http_method", col("http_method"))
)
)
.withColumn(commandArgsCol, extractCommandArgumentsUDF(col("what"), col("access_path")))
}
def withSubCommandColumns(subCommandCol: String): DataFrame = {
dataFrame.withColumn(
subCommandCol,
extractSubCommandUDF(
col("what"),
col("access_path")
)
)
}
def withProjectColumn(projectCol: String, gerritProjects: GerritProjects): DataFrame = {
def extractProjectUDF: UserDefinedFunction =
udf((what: String, accessPath: String) => gerritProjects.extractProject(what, accessPath))
dataFrame
.withColumn(projectCol, extractProjectUDF(col("what"), col("access_path")))
}
def withUserTypeColumn(
commandCol: String,
additionalUsersInfo: AdditionalUsersInfo
): DataFrame = {
def extractUserType: UserDefinedFunction =
udf((who: Int) => additionalUsersInfo.getUserType(who))
dataFrame.withColumn(commandCol, ifExistThenGetOrNull("who", extractUserType(col("who"))))
}
def aggregateNumEventsColumn(numEventsCol: String, cols: List[String]): DataFrame = {
dataFrame
.groupBy(cols.map(c => col(c)): _*)
.agg(
count("*")
.alias(numEventsCol)
)
}
}
}