blob: dcc9f9211b9c1b9de39d037244d31a6ec0f1ea99 [file] [log] [blame]
// Copyright (C) 2018 GerritForge Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.gerritforge.analytics.auditlog
import java.sql
import com.gerritforge.analytics.SparkTestSupport
import com.gerritforge.analytics.auditlog.broadcast.GerritUserIdentifiers
import com.gerritforge.analytics.auditlog.model.{ElasticSearchFields, HttpAuditEvent, SshAuditEvent}
import com.gerritforge.analytics.auditlog.spark.AuditLogsTransformer
import com.gerritforge.analytics.support.ops.CommonTimeOperations._
import org.apache.spark.sql.Row
import org.scalatest.{FlatSpec, Matchers}
class AuditLogsTransformerSpec extends FlatSpec with Matchers with SparkTestSupport with TestFixtures {
behavior of "AuditLogsTransformer"
it should "process an anonymous http audit entry" in {
val events = Seq(anonymousHttpAuditEvent)
val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
val expectedAggregatedCount = 1
dataFrame.collect should contain only Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
HttpAuditEvent.auditType,
null, // no user identifier
anonymousHttpAuditEvent.accessPath.get,
GIT_UPLOAD_PACK,
anonymousHttpAuditEvent.what,
expectedAggregatedCount
)
}
it should "process an authenticated http audit entry where gerrit account couldn't be identified" in {
val events = Seq(authenticatedHttpAuditEvent)
val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
val expectedAggregatedCount = 1
dataFrame.collect should contain only Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
HttpAuditEvent.auditType,
s"${authenticatedHttpAuditEvent.who.get}",
authenticatedHttpAuditEvent.accessPath.get,
GIT_UPLOAD_PACK,
authenticatedHttpAuditEvent.what,
expectedAggregatedCount
)
}
it should "process an authenticated http audit entry where gerrit account could be identified" in {
val events = Seq(authenticatedHttpAuditEvent)
val gerritUserIdentifier = "Antonio Barone"
val dataFrame =
AuditLogsTransformer(GerritUserIdentifiers(Map(authenticatedHttpAuditEvent.who.get -> gerritUserIdentifier)))
.transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
val expectedAggregatedCount = 1
dataFrame.collect should contain only Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
HttpAuditEvent.auditType,
gerritUserIdentifier,
authenticatedHttpAuditEvent.accessPath.get,
GIT_UPLOAD_PACK,
authenticatedHttpAuditEvent.what,
expectedAggregatedCount
)
}
it should "process an SSH audit entry" in {
val events = Seq(sshAuditEvent)
val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
val expectedAggregatedCount = 1
dataFrame.collect should contain only Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
SshAuditEvent.auditType,
s"${sshAuditEvent.who.get}",
sshAuditEvent.accessPath.get,
SSH_GERRIT_COMMAND,
SSH_GERRIT_COMMAND_ARGUMENTS,
expectedAggregatedCount
)
}
it should "group ssh events from the same user together, if they fall within the same time bucket (hour)" in {
val events = Seq(sshAuditEvent, sshAuditEvent.copy(timeAtStart = timeAtStart + 1000))
val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
val expectedAggregatedCount = 2
dataFrame.collect should contain only Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
SshAuditEvent.auditType,
s"${sshAuditEvent.who.get}",
sshAuditEvent.accessPath.get,
SSH_GERRIT_COMMAND,
SSH_GERRIT_COMMAND_ARGUMENTS,
expectedAggregatedCount
)
}
it should "group ssh events from different users separately, even if they fall within the same time bucket (hour)" in {
val user2Id = sshAuditEvent.who.map(_ + 1)
val events = Seq(sshAuditEvent, sshAuditEvent.copy(who=user2Id))
val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
val expectedAggregatedCount = 1
dataFrame.collect should contain allOf (
Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
SshAuditEvent.auditType,
s"${sshAuditEvent.who.get}",
sshAuditEvent.accessPath.get,
SSH_GERRIT_COMMAND,
SSH_GERRIT_COMMAND_ARGUMENTS,
expectedAggregatedCount
),
Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(timeAtStart),
SshAuditEvent.auditType,
s"${user2Id.get}",
sshAuditEvent.accessPath.get,
SSH_GERRIT_COMMAND,
SSH_GERRIT_COMMAND_ARGUMENTS,
expectedAggregatedCount
)
)
}
it should "group different event types separately, event if they fall within the same time bucket (hour)" in {
val events = Seq(sshAuditEvent, authenticatedHttpAuditEvent.copy(timeAtStart = sshAuditEvent.timeAtStart + 1000))
val dataFrame = AuditLogsTransformer().transform(sc.parallelize(events), timeAggregation="hour")
dataFrame.columns should contain theSameElementsAs ElasticSearchFields.ALL_DOCUMENT_FIELDS
val expectedSshAggregatedCount = 1
val expectedHttpAggregatedCount = 1
dataFrame.collect should contain allOf (
Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(events.head.timeAtStart),
SshAuditEvent.auditType,
s"${sshAuditEvent.who.get}",
sshAuditEvent.accessPath.get,
SSH_GERRIT_COMMAND,
SSH_GERRIT_COMMAND_ARGUMENTS,
expectedSshAggregatedCount
),
Row(
AuditLogsTransformerSpec.epochMillisToNearestHour(events.last.timeAtStart),
HttpAuditEvent.auditType,
s"${authenticatedHttpAuditEvent.who.get}",
authenticatedHttpAuditEvent.accessPath.get,
GIT_UPLOAD_PACK,
authenticatedHttpAuditEvent.what,
expectedHttpAggregatedCount
)
)
}
}
object AuditLogsTransformerSpec {
def epochMillisToNearestHour(epochMillis: Long): sql.Timestamp = {
val millisInOneHour = 3600000
epochToSqlTimestampOps(epochMillis - (epochMillis % millisInOneHour))
}
}