blob: 3b5fefcbdb316d69c2d3e89df97c72565fb1da20 [file] [log] [blame]
// Copyright (C) 2018 GerritForge Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.gerritforge.analytics.auditlog
import java.sql
import com.gerritforge.analytics.SparkTestSupport
import com.gerritforge.analytics.auditlog.broadcast._
import com.gerritforge.analytics.auditlog.model.{ElasticSearchFields, HttpAuditEvent, SshAuditEvent}
import com.gerritforge.analytics.auditlog.spark.AuditLogsTransformer
import com.gerritforge.analytics.support.ops.CommonTimeOperations._
import org.apache.spark.sql.Row
import org.scalatest.{FlatSpec, Matchers}
class AuditLogsTransformerSpec extends FlatSpec with Matchers with SparkTestSupport with TestFixtures {
behavior of "AuditLogsTransformer"
it should "process an anonymous http audit entry" in {
val events = Seq(anonymousHttpAuditEvent)
val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
val expectedAggregatedCount = 1
dataFrame.collect should contain only Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
HttpAuditEvent.auditType,
null, // no user identifier
null, // no user type
anonymousHttpAuditEvent.accessPath.get,
GIT_UPLOAD_PACK,
anonymousHttpAuditEvent.what,
null, // no project
anonymousHttpAuditEvent.result,
expectedAggregatedCount
)
}
it should "process an authenticated http audit entry where gerrit account couldn't be identified" in {
val events = Seq(authenticatedHttpAuditEvent)
val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
val expectedAggregatedCount = 1
dataFrame.collect should contain only Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
HttpAuditEvent.auditType,
s"${authenticatedHttpAuditEvent.who.get}",
AdditionalUserInfo.DEFAULT_USER_TYPE,
authenticatedHttpAuditEvent.accessPath.get,
GIT_UPLOAD_PACK,
authenticatedHttpAuditEvent.what,
null, // no project
authenticatedHttpAuditEvent.result,
expectedAggregatedCount
)
}
it should "process an authenticated http audit entry where gerrit account could be identified" in {
val events = Seq(authenticatedHttpAuditEvent)
val gerritUserIdentifier = "Antonio Barone"
val dataFrame =
AuditLogsTransformer(GerritUserIdentifiers(Map(authenticatedHttpAuditEvent.who.get -> gerritUserIdentifier)))
.transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
val expectedAggregatedCount = 1
dataFrame.collect should contain only Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
HttpAuditEvent.auditType,
gerritUserIdentifier,
AdditionalUserInfo.DEFAULT_USER_TYPE,
authenticatedHttpAuditEvent.accessPath.get,
GIT_UPLOAD_PACK,
authenticatedHttpAuditEvent.what,
null, // no project
authenticatedHttpAuditEvent.result,
expectedAggregatedCount
)
}
it should "process an SSH audit entry" in {
val events = Seq(sshAuditEvent)
val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
val expectedAggregatedCount = 1
dataFrame.collect should contain only Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
SshAuditEvent.auditType,
s"${sshAuditEvent.who.get}",
AdditionalUserInfo.DEFAULT_USER_TYPE,
sshAuditEvent.accessPath.get,
SSH_GERRIT_COMMAND,
SSH_GERRIT_COMMAND_ARGUMENTS,
null, // no project
sshAuditEvent.result,
expectedAggregatedCount
)
}
it should "group ssh events from the same user together, if they fall within the same time bucket (hour)" in {
val events = Seq(sshAuditEvent, sshAuditEvent.copy(timeAtStart = timeAtStart + 1000))
val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
val expectedAggregatedCount = 2
dataFrame.collect should contain only Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
SshAuditEvent.auditType,
s"${sshAuditEvent.who.get}",
AdditionalUserInfo.DEFAULT_USER_TYPE,
sshAuditEvent.accessPath.get,
SSH_GERRIT_COMMAND,
SSH_GERRIT_COMMAND_ARGUMENTS,
null, // no project
sshAuditEvent.result,
expectedAggregatedCount
)
}
it should "group ssh events from different users separately, even if they fall within the same time bucket (hour)" in {
val user2Id = sshAuditEvent.who.map(_ + 1)
val events = Seq(sshAuditEvent, sshAuditEvent.copy(who=user2Id))
val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
val expectedAggregatedCount = 1
dataFrame.collect should contain allOf (
Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
SshAuditEvent.auditType,
s"${sshAuditEvent.who.get}",
AdditionalUserInfo.DEFAULT_USER_TYPE,
sshAuditEvent.accessPath.get,
SSH_GERRIT_COMMAND,
SSH_GERRIT_COMMAND_ARGUMENTS,
null, // no project
sshAuditEvent.result,
expectedAggregatedCount
),
Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
SshAuditEvent.auditType,
s"${user2Id.get}",
AdditionalUserInfo.DEFAULT_USER_TYPE,
sshAuditEvent.accessPath.get,
SSH_GERRIT_COMMAND,
SSH_GERRIT_COMMAND_ARGUMENTS,
null, // no project
sshAuditEvent.result,
expectedAggregatedCount
)
)
}
it should "group different event types separately, event if they fall within the same time bucket (hour)" in {
val events = Seq(sshAuditEvent, authenticatedHttpAuditEvent.copy(timeAtStart = sshAuditEvent.timeAtStart + 1000))
val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
val expectedSshAggregatedCount = 1
val expectedHttpAggregatedCount = 1
dataFrame.collect should contain allOf (
Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(events.head.timeAtStart),
SshAuditEvent.auditType,
s"${sshAuditEvent.who.get}",
AdditionalUserInfo.DEFAULT_USER_TYPE,
sshAuditEvent.accessPath.get,
SSH_GERRIT_COMMAND,
SSH_GERRIT_COMMAND_ARGUMENTS,
null, // no project
sshAuditEvent.result,
expectedSshAggregatedCount
),
Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(events.last.timeAtStart),
HttpAuditEvent.auditType,
s"${authenticatedHttpAuditEvent.who.get}",
AdditionalUserInfo.DEFAULT_USER_TYPE,
authenticatedHttpAuditEvent.accessPath.get,
GIT_UPLOAD_PACK,
authenticatedHttpAuditEvent.what,
null, // no project
authenticatedHttpAuditEvent.result,
expectedHttpAggregatedCount
)
)
}
it should "process user type" in {
val events = Seq(authenticatedHttpAuditEvent)
val userType = "nonDefaultUserType"
val additionalUserInfo = AdditionalUserInfo(authenticatedHttpAuditEvent.who.get, userType)
val dataFrame = AuditLogsTransformer(additionalUsersInfo = AdditionalUsersInfo(Map(authenticatedHttpAuditEvent.who.get -> additionalUserInfo))).transform(
auditEventsRDD = sc.parallelize(events),
timeAggregation = "hour"
)
dataFrame.collect.length shouldBe 1
dataFrame.collect.head.getAs[String](ElasticSearchFields.USER_TYPE_FIELD) shouldBe userType
}
it should "process user type when gerrit account could be identified" in {
val events = Seq(authenticatedHttpAuditEvent)
val gerritUserIdentifier = "Antonio Barone"
val userType = "nonDefaultUserType"
val additionalUserInfo = AdditionalUserInfo(authenticatedHttpAuditEvent.who.get, userType)
val dataFrame =
AuditLogsTransformer(
gerritIdentifiers = GerritUserIdentifiers(Map(authenticatedHttpAuditEvent.who.get -> gerritUserIdentifier)),
additionalUsersInfo = AdditionalUsersInfo(Map(authenticatedHttpAuditEvent.who.get -> additionalUserInfo))
).transform(
auditEventsRDD = sc.parallelize(events),
timeAggregation = "hour"
)
dataFrame.collect.length shouldBe 1
dataFrame.collect.head.getAs[String](ElasticSearchFields.USER_TYPE_FIELD) shouldBe userType
}
it should "extract gerrit project from an http event" in {
val events = Seq(authenticatedHttpAuditEvent)
val dataFrame = AuditLogsTransformer(gerritProjects = GerritProjects(Map(project -> GerritProject(project))))
.transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.collect.length shouldBe 1
dataFrame.collect.head.getAs[String](ElasticSearchFields.PROJECT_FIELD) shouldBe project
}
it should "extract gerrit project from an ssh event" in {
val events = Seq(sshAuditEvent)
val dataFrame = AuditLogsTransformer(gerritProjects = GerritProjects(Map(project -> GerritProject(project))))
.transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.collect.length shouldBe 1
dataFrame.collect.head.getAs[String](ElasticSearchFields.PROJECT_FIELD) shouldBe project
}
}
object AuditLogsTransformerSpec {
def epochMillisToNearestHour(epochMillis: Long): sql.Timestamp = {
val millisInOneHour = 3600000
epochToSqlTimestampOps(epochMillis - (epochMillis % millisInOneHour))
}
}