Add Basic Authentication handling
Allow connection to a gerrit instance when
Basic Authentication is needed. User and password will need
to be passed as input parameter
Feature: Issue 8836
Change-Id: I8f46ff7a48f73cf8c6323213280a51baeb0a3714
diff --git a/README.md b/README.md
index 6205ee7..b74e19f 100644
--- a/README.md
+++ b/README.md
@@ -17,6 +17,8 @@
--events file:///tmp/gerrit-events-export.json
--writeNotProcessedEventsTo file:///tmp/failed-events
-e gerrit/analytics
+ --username gerrit-api-username
+ --password gerrit-api-password
```
Should ElasticSearch need authentication (i.e.: if X-Pack is enabled), credentials can be
diff --git a/src/main/scala/com/gerritforge/analytics/api/gerritApiConnectivity.scala b/src/main/scala/com/gerritforge/analytics/api/gerritApiConnectivity.scala
new file mode 100644
index 0000000..8417f1d
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/api/gerritApiConnectivity.scala
@@ -0,0 +1,69 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.api
+
+import java.net.URL
+
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.commons.codec.binary.Base64
+
+import scala.io.{BufferedSource, Codec, Source}
+
+
+sealed trait HttpBasicAuthentication {
+
+ val BASIC = "Basic"
+ val AUTHORIZATION = "Authorization"
+
+ def encodeCredentials(username: String, password: String): String = {
+ new String(Base64.encodeBase64String((username + ":" + password).getBytes))
+ }
+
+ def getHeader(username: String, password: String): String =
+ BASIC + " " + encodeCredentials(username, password)
+}
+
+class GerritConnectivity(maybeUsername: Option[String], maybePassword: Option[String]) extends HttpBasicAuthentication with Serializable with LazyLogging {
+ private def createBasicSecuredConnection(url: String, username: String, password: String): BufferedSource = {
+ try {
+ val unsecureURL = new URL(url)
+ val endPointPath = unsecureURL.getFile
+ val basicAuthURL = unsecureURL.toString.replace(endPointPath, s"/a$endPointPath")
+
+ logger.info(s"Connecting to API $basicAuthURL with basic auth")
+
+ val connection = new URL(basicAuthURL).openConnection
+ connection.setRequestProperty(AUTHORIZATION, getHeader(username, password))
+ Source.fromInputStream(connection.getInputStream, Codec.UTF8.name)
+ }
+ catch {
+ case e: Exception => throw new Exception(s"Unable to connect to $url. $e")
+ }
+ }
+
+ private def createNonSecuredConnection(url: String): BufferedSource = {
+ logger.info(s"Connecting to API $url")
+ Source.fromURL(url, Codec.UTF8.name)
+ }
+
+ def getContentFromApi(url: String): BufferedSource = {
+ (
+ for {
+ username <- maybeUsername
+ password <- maybePassword
+ } yield (createBasicSecuredConnection(url, username, password))
+ ).getOrElse(createNonSecuredConnection(url))
+ }
+}
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index a34e702..ff75fb8 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -14,18 +14,17 @@
package com.gerritforge.analytics.engine
-import java.io.{BufferedReader, IOException, InputStreamReader}
-import java.net.URL
-import java.nio.charset.StandardCharsets
+import java.io.IOException
import java.time.format.DateTimeFormatter
import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
+import com.gerritforge.analytics.api.GerritConnectivity
import com.gerritforge.analytics.model._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.{udf, _}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import scala.collection.JavaConverters._
+import scala.io.BufferedSource
object GerritAnalyticsTransformations {
@@ -38,20 +37,18 @@
}
}
- def getLinesFromURL(sourceURL: String): Iterator[String] = {
- val is = new URL(sourceURL).openConnection.getInputStream
- new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))
- .lines.iterator().asScala
+ def getProjectJsonContributorsArray(project: String, sourceURL: Option[String], gerritApiConnection: GerritConnectivity): Array[(String, String)] = {
+ sourceURL.toArray.flatMap(getProjectJsonContributorsArrayFromUrl(project, _, gerritApiConnection))
+ }
+
+
+ def filterEmptyStrings(urlSource: BufferedSource): Iterator[String] =
+ urlSource.getLines()
.filterNot(_.trim.isEmpty)
- }
- def getProjectJsonContributorsArray(project: String, sourceURL: Option[String]): Array[(String, String)] = {
- sourceURL.toArray.flatMap(getProjectJsonContributorsArrayFromUrl(project, _))
- }
-
- def getProjectJsonContributorsArrayFromUrl(project: String, sourceURL: String): Array[(String, String)] = {
+ def getProjectJsonContributorsArrayFromUrl(project: String, sourceURL: String, gerritApiConnection: GerritConnectivity): Array[(String, String)] = {
try {
- getLinesFromURL(sourceURL)
+ filterEmptyStrings(gerritApiConnection.getContentFromApi(sourceURL))
.map(s => (project, s))
.toArray
} catch {
@@ -97,7 +94,7 @@
* Assumes the data frame contains the 'commits' column with an array of CommitInfo in it
* and returns a DataSet[String] with the commits SHA1
*/
- def extractCommits(df: DataFrame)(implicit spark: SparkSession) : Dataset[String] = {
+ def extractCommits(df: DataFrame)(implicit spark: SparkSession): Dataset[String] = {
import spark.implicits._
df
@@ -123,19 +120,19 @@
def handleAliases(aliasesDF: Option[DataFrame])(implicit spark: SparkSession): DataFrame = {
aliasesDF
- .map{ eaDF =>
- val renamedAliasesDF = eaDF
- .withColumnRenamed("email", "email_alias")
- .withColumnRenamed("author", "author_alias")
- .withColumnRenamed("organization", "organization_alias")
+ .map { eaDF =>
+ val renamedAliasesDF = eaDF
+ .withColumnRenamed("email", "email_alias")
+ .withColumnRenamed("author", "author_alias")
+ .withColumnRenamed("organization", "organization_alias")
- df.join(renamedAliasesDF, df("email") === renamedAliasesDF("email_alias"), "left_outer" )
- .withColumn("organization",
- when(renamedAliasesDF("organization_alias").notEqual(""), lower(renamedAliasesDF("organization_alias")))
- .otherwise(df("organization")))
- .withColumn("author", coalesce(renamedAliasesDF("author_alias"), df("author")))
- .drop("email_alias","author_alias", "organization_alias")
- }
+ df.join(renamedAliasesDF, df("email") === renamedAliasesDF("email_alias"), "left_outer")
+ .withColumn("organization",
+ when(renamedAliasesDF("organization_alias").notEqual(""), lower(renamedAliasesDF("organization_alias")))
+ .otherwise(df("organization")))
+ .withColumn("author", coalesce(renamedAliasesDF("author_alias"), df("author")))
+ .drop("email_alias", "author_alias", "organization_alias")
+ }
.getOrElse(df)
}
@@ -146,11 +143,11 @@
def addOrganization()(implicit spark: SparkSession): DataFrame =
df.withColumn("organization", emailToDomainUdf(col("email")))
- def commitSet(implicit spark: SparkSession) : Dataset[String] = {
+ def commitSet(implicit spark: SparkSession): Dataset[String] = {
extractCommits(df)
}
- def dashboardStats(aliasesDFMaybe: Option[DataFrame])(implicit spark: SparkSession) : DataFrame = {
+ def dashboardStats(aliasesDFMaybe: Option[DataFrame])(implicit spark: SparkSession): DataFrame = {
df
.addOrganization()
.handleAliases(aliasesDFMaybe)
@@ -168,9 +165,9 @@
implicit class PimpedRDDProjectContributionSource(val projectsAndUrls: RDD[ProjectContributionSource]) extends AnyVal {
- def fetchRawContributors(implicit spark: SparkSession): RDD[(String, String)] = {
+ def fetchRawContributors(gerritApiConnection: GerritConnectivity)(implicit spark: SparkSession): RDD[(String, String)] = {
projectsAndUrls.flatMap {
- p => getProjectJsonContributorsArray(p.name, p.contributorsUrl)
+ p => getProjectJsonContributorsArray(p.name, p.contributorsUrl, gerritApiConnection)
}
}
}
@@ -185,12 +182,12 @@
ZoneOffset.UTC, ZoneId.of("Z")
) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
- def getContributorStatsFromAnalyticsPlugin(projects: RDD[GerritProject], projectToContributorsAnalyticsUrlFactory: String => Option[String])(implicit spark: SparkSession) = {
+ def getContributorStatsFromAnalyticsPlugin(projects: RDD[GerritProject], projectToContributorsAnalyticsUrlFactory: String => Option[String], gerritApiConnection: GerritConnectivity)(implicit spark: SparkSession) = {
import spark.sqlContext.implicits._ // toDF
projects
.enrichWithSource(projectToContributorsAnalyticsUrlFactory)
- .fetchRawContributors
+ .fetchRawContributors(gerritApiConnection)
.toDF("project", "json")
.transformCommitterInfo
}
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index b1ba47d..9549155 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -28,7 +28,7 @@
import scopt.Read.reads
import scopt.{OptionParser, Read}
-import scala.io.{Codec, Source}
+import scala.io.Codec
import scala.util.control.NonFatal
import scala.util.{Failure, Success}
@@ -86,6 +86,15 @@
opt[String]("writeNotProcessedEventsTo") optional() action { (failedEventsPath, config) =>
config.copy(eventsFailureOutputPath = Some(failedEventsPath))
} text "location where to write a TSV file containing the events we couldn't process with a description fo the reason why"
+
+ opt[String]("username") optional() action { (input, c) =>
+ c.copy(username = Some(input))
+ } text "Gerrit API Username"
+
+ opt[String]("password") optional() action { (input, c) =>
+ c.copy(password = Some(input))
+ } text "Gerrit API Password"
+
}
cliOptionParser.parse(args, GerritEndpointConfig()) match {
@@ -109,7 +118,8 @@
}
}
-trait Job { self: LazyLogging with FetchProjects =>
+trait Job {
+ self: LazyLogging with FetchProjects =>
implicit val codec = Codec.ISO8859
def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
@@ -163,7 +173,7 @@
}
val statsFromAnalyticsPlugin =
- getContributorStatsFromAnalyticsPlugin(spark.sparkContext.parallelize(projects), configWithOverriddenUntil.contributorsUrl)
+ getContributorStatsFromAnalyticsPlugin(spark.sparkContext.parallelize(projects), configWithOverriddenUntil.contributorsUrl, config.gerritApiConnection)
val statsFromEvents = getContributorStatsFromGerritEvents(repositoryAlteringEvents, statsFromAnalyticsPlugin.commitSet.rdd, aggregationStrategy)
@@ -208,9 +218,7 @@
}
trait FetchRemoteProjects extends FetchProjects {
-
def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
- config.gerritProjectsUrl.toSeq.flatMap { url => GerritProjectsSupport.parseJsonProjectListResponse(Source.fromURL(url)) }
+ config.gerritProjectsUrl.toSeq.flatMap { url => GerritProjectsSupport.parseJsonProjectListResponse(config.gerritApiConnection.getContentFromApi(url)) }
}
-}
-
+}
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
index 626a1f7..596e0a5 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
@@ -17,6 +17,7 @@
import java.time.format.DateTimeFormatter
import java.time.{LocalDate, ZoneOffset}
+import com.gerritforge.analytics.api.GerritConnectivity
import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
case class GerritEndpointConfig(baseUrl: Option[String] = None,
@@ -28,9 +29,14 @@
aggregate: Option[String] = None,
emailAlias: Option[String] = None,
eventsPath: Option[String] = None,
- eventsFailureOutputPath: Option[String] = None
+ eventsFailureOutputPath: Option[String] = None,
+ username: Option[String] = None,
+ password: Option[String] = None
) {
+
+ val gerritApiConnection: GerritConnectivity = new GerritConnectivity(username, password)
+
val gerritProjectsUrl: Option[String] = baseUrl.map { url => s"${url}/projects/" + prefix.fold("")("?p=" + _) }
def queryOpt(opt: (String, Option[String])): Option[String] = {
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 187b5ce..8678f6b 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -14,9 +14,10 @@
package com.gerritforge.analytics
-import java.io.{File, FileOutputStream, FileWriter, OutputStreamWriter}
+import java.io.{ByteArrayInputStream, File, FileOutputStream, OutputStreamWriter}
import java.nio.charset.StandardCharsets
+import com.gerritforge.analytics.api.GerritConnectivity
import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
import com.gerritforge.analytics.model.{GerritProject, GerritProjectsSupport, ProjectContributionSource}
import org.apache.spark.sql.Row
@@ -26,7 +27,7 @@
import org.scalatest.{FlatSpec, Inside, Matchers}
import scala.collection.mutable
-import scala.io.Source
+import scala.io.{Codec, Source}
class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers with SparkTestSupport with Inside {
@@ -65,6 +66,22 @@
}
}
+ "filterEmptyStrings" should "Filter empty strings from BufferedSource" in {
+ val contentWithEmptyLines =
+ """LineOne
+ |
+ |LineTwo
+ |LineThree
+ """.stripMargin
+ val expectedResult = List("LineOne", "LineTwo", "LineThree")
+
+
+ val inputStream = new ByteArrayInputStream(contentWithEmptyLines.getBytes)
+ val contentWithoutEmptyLines = filterEmptyStrings(Source.fromInputStream(inputStream, Codec.UTF8.name))
+
+ contentWithoutEmptyLines.toList should contain only (expectedResult: _*)
+ }
+
"fetchRawContributors" should "fetch file content from the initial list of project names and file names" in {
val line1 = "foo" -> "bar"
@@ -76,7 +93,7 @@
val projectSource2 = ProjectContributionSource("p2", newSource(line3b))
val rawContributors = sc.parallelize(Seq(projectSource1, projectSource2))
- .fetchRawContributors
+ .fetchRawContributors(new GerritConnectivity(None, None))(spark)
.collect
rawContributors should have size (4)
@@ -90,11 +107,11 @@
it should "fetch file content from the initial list of project names and file names with non-latin chars" in {
val rawContributors = sc.parallelize(Seq(ProjectContributionSource("p1", newSource("foo2" -> "bar2\u0100"))))
- .fetchRawContributors
+ .fetchRawContributors(new GerritConnectivity(None, None))
.collect
rawContributors should have size (1)
- rawContributors.head._2 should be ("""{"foo2":"bar2\u0100"}""")
+ rawContributors.head._2 should be("""{"foo2":"bar2\u0100"}""")
}
"transformCommitterInfo" should "transform a DataFrame with project and json to a workable DF with separated columns" in {