Add Basic Authentication handling

Allow connection to a gerrit instance when
Basic Authentication is needed. User and password will need
to be passed as input parameter

Feature: Issue 8836
Change-Id: I8f46ff7a48f73cf8c6323213280a51baeb0a3714
diff --git a/README.md b/README.md
index 6205ee7..b74e19f 100644
--- a/README.md
+++ b/README.md
@@ -17,6 +17,8 @@
     --events file:///tmp/gerrit-events-export.json
     --writeNotProcessedEventsTo file:///tmp/failed-events
     -e gerrit/analytics
+    --username gerrit-api-username
+    --password gerrit-api-password
 ```
 
 Should ElasticSearch need authentication (i.e.: if X-Pack is enabled), credentials can be
diff --git a/src/main/scala/com/gerritforge/analytics/api/gerritApiConnectivity.scala b/src/main/scala/com/gerritforge/analytics/api/gerritApiConnectivity.scala
new file mode 100644
index 0000000..8417f1d
--- /dev/null
+++ b/src/main/scala/com/gerritforge/analytics/api/gerritApiConnectivity.scala
@@ -0,0 +1,69 @@
+// Copyright (C) 2018 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.api
+
+import java.net.URL
+
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.commons.codec.binary.Base64
+
+import scala.io.{BufferedSource, Codec, Source}
+
+
+sealed trait HttpBasicAuthentication {
+
+  val BASIC = "Basic"
+  val AUTHORIZATION = "Authorization"
+
+  def encodeCredentials(username: String, password: String): String = {
+    new String(Base64.encodeBase64String((username + ":" + password).getBytes))
+  }
+
+  def getHeader(username: String, password: String): String =
+    BASIC + " " + encodeCredentials(username, password)
+}
+
+class GerritConnectivity(maybeUsername: Option[String], maybePassword: Option[String]) extends HttpBasicAuthentication with Serializable with LazyLogging {
+  private def createBasicSecuredConnection(url: String, username: String, password: String): BufferedSource = {
+    try {
+      val unsecureURL = new URL(url)
+      val endPointPath = unsecureURL.getFile
+      val basicAuthURL = unsecureURL.toString.replace(endPointPath, s"/a$endPointPath")
+
+      logger.info(s"Connecting to API $basicAuthURL with basic auth")
+
+      val connection = new URL(basicAuthURL).openConnection
+      connection.setRequestProperty(AUTHORIZATION, getHeader(username, password))
+      Source.fromInputStream(connection.getInputStream, Codec.UTF8.name)
+    }
+    catch {
+      case e: Exception => throw new Exception(s"Unable to connect to $url. $e")
+    }
+  }
+
+  private def createNonSecuredConnection(url: String): BufferedSource = {
+    logger.info(s"Connecting to API $url")
+    Source.fromURL(url, Codec.UTF8.name)
+  }
+
+  def getContentFromApi(url: String): BufferedSource = {
+    (
+      for {
+        username <- maybeUsername
+        password <- maybePassword
+      } yield (createBasicSecuredConnection(url, username, password))
+      ).getOrElse(createNonSecuredConnection(url))
+  }
+}
diff --git a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
index a34e702..ff75fb8 100644
--- a/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
+++ b/src/main/scala/com/gerritforge/analytics/engine/GerritAnalyticsTransformations.scala
@@ -14,18 +14,17 @@
 
 package com.gerritforge.analytics.engine
 
-import java.io.{BufferedReader, IOException, InputStreamReader}
-import java.net.URL
-import java.nio.charset.StandardCharsets
+import java.io.IOException
 import java.time.format.DateTimeFormatter
 import java.time.{LocalDateTime, ZoneId, ZoneOffset, ZonedDateTime}
 
+import com.gerritforge.analytics.api.GerritConnectivity
 import com.gerritforge.analytics.model._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.functions.{udf, _}
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 
-import scala.collection.JavaConverters._
+import scala.io.BufferedSource
 
 object GerritAnalyticsTransformations {
 
@@ -38,20 +37,18 @@
     }
   }
 
-  def getLinesFromURL(sourceURL: String): Iterator[String] = {
-    val is = new URL(sourceURL).openConnection.getInputStream
-    new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))
-      .lines.iterator().asScala
+  def getProjectJsonContributorsArray(project: String, sourceURL: Option[String], gerritApiConnection: GerritConnectivity): Array[(String, String)] = {
+    sourceURL.toArray.flatMap(getProjectJsonContributorsArrayFromUrl(project, _, gerritApiConnection))
+  }
+
+
+  def filterEmptyStrings(urlSource: BufferedSource): Iterator[String] =
+    urlSource.getLines()
       .filterNot(_.trim.isEmpty)
-  }
 
-  def getProjectJsonContributorsArray(project: String, sourceURL: Option[String]): Array[(String, String)] = {
-    sourceURL.toArray.flatMap(getProjectJsonContributorsArrayFromUrl(project, _))
-  }
-
-  def getProjectJsonContributorsArrayFromUrl(project: String, sourceURL: String): Array[(String, String)] = {
+  def getProjectJsonContributorsArrayFromUrl(project: String, sourceURL: String, gerritApiConnection: GerritConnectivity): Array[(String, String)] = {
     try {
-      getLinesFromURL(sourceURL)
+      filterEmptyStrings(gerritApiConnection.getContentFromApi(sourceURL))
         .map(s => (project, s))
         .toArray
     } catch {
@@ -97,7 +94,7 @@
     * Assumes the data frame contains the 'commits' column with an array of CommitInfo in it
     * and returns a DataSet[String] with the commits SHA1
     */
-  def extractCommits(df: DataFrame)(implicit spark: SparkSession) : Dataset[String] = {
+  def extractCommits(df: DataFrame)(implicit spark: SparkSession): Dataset[String] = {
     import spark.implicits._
 
     df
@@ -123,19 +120,19 @@
 
     def handleAliases(aliasesDF: Option[DataFrame])(implicit spark: SparkSession): DataFrame = {
       aliasesDF
-        .map{ eaDF =>
-                val renamedAliasesDF = eaDF
-                                        .withColumnRenamed("email", "email_alias")
-                                        .withColumnRenamed("author", "author_alias")
-                                        .withColumnRenamed("organization", "organization_alias")
+        .map { eaDF =>
+          val renamedAliasesDF = eaDF
+            .withColumnRenamed("email", "email_alias")
+            .withColumnRenamed("author", "author_alias")
+            .withColumnRenamed("organization", "organization_alias")
 
-                df.join(renamedAliasesDF, df("email") === renamedAliasesDF("email_alias"), "left_outer" )
-                  .withColumn("organization",
-                    when(renamedAliasesDF("organization_alias").notEqual(""), lower(renamedAliasesDF("organization_alias")))
-                      .otherwise(df("organization")))
-                  .withColumn("author", coalesce(renamedAliasesDF("author_alias"), df("author")))
-                  .drop("email_alias","author_alias", "organization_alias")
-            }
+          df.join(renamedAliasesDF, df("email") === renamedAliasesDF("email_alias"), "left_outer")
+            .withColumn("organization",
+              when(renamedAliasesDF("organization_alias").notEqual(""), lower(renamedAliasesDF("organization_alias")))
+                .otherwise(df("organization")))
+            .withColumn("author", coalesce(renamedAliasesDF("author_alias"), df("author")))
+            .drop("email_alias", "author_alias", "organization_alias")
+        }
         .getOrElse(df)
     }
 
@@ -146,11 +143,11 @@
     def addOrganization()(implicit spark: SparkSession): DataFrame =
       df.withColumn("organization", emailToDomainUdf(col("email")))
 
-    def commitSet(implicit spark: SparkSession) : Dataset[String] = {
+    def commitSet(implicit spark: SparkSession): Dataset[String] = {
       extractCommits(df)
     }
 
-    def dashboardStats(aliasesDFMaybe: Option[DataFrame])(implicit spark: SparkSession) : DataFrame = {
+    def dashboardStats(aliasesDFMaybe: Option[DataFrame])(implicit spark: SparkSession): DataFrame = {
       df
         .addOrganization()
         .handleAliases(aliasesDFMaybe)
@@ -168,9 +165,9 @@
 
   implicit class PimpedRDDProjectContributionSource(val projectsAndUrls: RDD[ProjectContributionSource]) extends AnyVal {
 
-    def fetchRawContributors(implicit spark: SparkSession): RDD[(String, String)] = {
+    def fetchRawContributors(gerritApiConnection: GerritConnectivity)(implicit spark: SparkSession): RDD[(String, String)] = {
       projectsAndUrls.flatMap {
-        p => getProjectJsonContributorsArray(p.name, p.contributorsUrl)
+        p => getProjectJsonContributorsArray(p.name, p.contributorsUrl, gerritApiConnection)
       }
     }
   }
@@ -185,12 +182,12 @@
       ZoneOffset.UTC, ZoneId.of("Z")
     ) format DateTimeFormatter.ISO_OFFSET_DATE_TIME
 
-  def getContributorStatsFromAnalyticsPlugin(projects: RDD[GerritProject], projectToContributorsAnalyticsUrlFactory: String => Option[String])(implicit spark: SparkSession) = {
+  def getContributorStatsFromAnalyticsPlugin(projects: RDD[GerritProject], projectToContributorsAnalyticsUrlFactory: String => Option[String], gerritApiConnection: GerritConnectivity)(implicit spark: SparkSession) = {
     import spark.sqlContext.implicits._ // toDF
 
     projects
       .enrichWithSource(projectToContributorsAnalyticsUrlFactory)
-      .fetchRawContributors
+      .fetchRawContributors(gerritApiConnection)
       .toDF("project", "json")
       .transformCommitterInfo
   }
diff --git a/src/main/scala/com/gerritforge/analytics/job/Main.scala b/src/main/scala/com/gerritforge/analytics/job/Main.scala
index b1ba47d..9549155 100644
--- a/src/main/scala/com/gerritforge/analytics/job/Main.scala
+++ b/src/main/scala/com/gerritforge/analytics/job/Main.scala
@@ -28,7 +28,7 @@
 import scopt.Read.reads
 import scopt.{OptionParser, Read}
 
-import scala.io.{Codec, Source}
+import scala.io.Codec
 import scala.util.control.NonFatal
 import scala.util.{Failure, Success}
 
@@ -86,6 +86,15 @@
     opt[String]("writeNotProcessedEventsTo") optional() action { (failedEventsPath, config) =>
       config.copy(eventsFailureOutputPath = Some(failedEventsPath))
     } text "location where to write a TSV file containing the events we couldn't process with a description fo the reason why"
+
+    opt[String]("username") optional() action { (input, c) =>
+      c.copy(username = Some(input))
+    } text "Gerrit API Username"
+
+    opt[String]("password") optional() action { (input, c) =>
+      c.copy(password = Some(input))
+    } text "Gerrit API Password"
+
   }
 
   cliOptionParser.parse(args, GerritEndpointConfig()) match {
@@ -109,7 +118,8 @@
   }
 }
 
-trait Job { self: LazyLogging with FetchProjects =>
+trait Job {
+  self: LazyLogging with FetchProjects =>
   implicit val codec = Codec.ISO8859
 
   def buildProjectStats()(implicit config: GerritEndpointConfig, spark: SparkSession): DataFrame = {
@@ -163,7 +173,7 @@
     }
 
     val statsFromAnalyticsPlugin =
-      getContributorStatsFromAnalyticsPlugin(spark.sparkContext.parallelize(projects), configWithOverriddenUntil.contributorsUrl)
+      getContributorStatsFromAnalyticsPlugin(spark.sparkContext.parallelize(projects), configWithOverriddenUntil.contributorsUrl, config.gerritApiConnection)
 
     val statsFromEvents = getContributorStatsFromGerritEvents(repositoryAlteringEvents, statsFromAnalyticsPlugin.commitSet.rdd, aggregationStrategy)
 
@@ -208,9 +218,7 @@
 }
 
 trait FetchRemoteProjects extends FetchProjects {
-
   def fetchProjects(config: GerritEndpointConfig): Seq[GerritProject] = {
-    config.gerritProjectsUrl.toSeq.flatMap { url => GerritProjectsSupport.parseJsonProjectListResponse(Source.fromURL(url)) }
+    config.gerritProjectsUrl.toSeq.flatMap { url => GerritProjectsSupport.parseJsonProjectListResponse(config.gerritApiConnection.getContentFromApi(url)) }
   }
-}
-
+}
\ No newline at end of file
diff --git a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
index 626a1f7..596e0a5 100644
--- a/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
+++ b/src/main/scala/com/gerritforge/analytics/model/GerritEndpointConfig.scala
@@ -17,6 +17,7 @@
 import java.time.format.DateTimeFormatter
 import java.time.{LocalDate, ZoneOffset}
 
+import com.gerritforge.analytics.api.GerritConnectivity
 import com.gerritforge.analytics.support.ops.AnalyticsTimeOps.AnalyticsDateTimeFormater
 
 case class GerritEndpointConfig(baseUrl: Option[String] = None,
@@ -28,9 +29,14 @@
                                 aggregate: Option[String] = None,
                                 emailAlias: Option[String] = None,
                                 eventsPath: Option[String] = None,
-                                eventsFailureOutputPath: Option[String] = None
+                                eventsFailureOutputPath: Option[String] = None,
+                                username: Option[String] = None,
+                                password: Option[String] = None
                                ) {
 
+
+  val gerritApiConnection: GerritConnectivity = new GerritConnectivity(username, password)
+
   val gerritProjectsUrl: Option[String] = baseUrl.map { url => s"${url}/projects/" + prefix.fold("")("?p=" + _) }
 
   def queryOpt(opt: (String, Option[String])): Option[String] = {
diff --git a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
index 187b5ce..8678f6b 100644
--- a/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
+++ b/src/test/scala/com/gerritforge/analytics/GerritAnalyticsTransformationsSpec.scala
@@ -14,9 +14,10 @@
 
 package com.gerritforge.analytics
 
-import java.io.{File, FileOutputStream, FileWriter, OutputStreamWriter}
+import java.io.{ByteArrayInputStream, File, FileOutputStream, OutputStreamWriter}
 import java.nio.charset.StandardCharsets
 
+import com.gerritforge.analytics.api.GerritConnectivity
 import com.gerritforge.analytics.engine.GerritAnalyticsTransformations._
 import com.gerritforge.analytics.model.{GerritProject, GerritProjectsSupport, ProjectContributionSource}
 import org.apache.spark.sql.Row
@@ -26,7 +27,7 @@
 import org.scalatest.{FlatSpec, Inside, Matchers}
 
 import scala.collection.mutable
-import scala.io.Source
+import scala.io.{Codec, Source}
 
 class GerritAnalyticsTransformationsSpec extends FlatSpec with Matchers with SparkTestSupport with Inside {
 
@@ -65,6 +66,22 @@
     }
   }
 
+  "filterEmptyStrings" should "Filter empty strings from BufferedSource" in {
+    val contentWithEmptyLines =
+      """LineOne
+        |
+        |LineTwo
+        |LineThree
+      """.stripMargin
+    val expectedResult = List("LineOne", "LineTwo", "LineThree")
+
+
+    val inputStream = new ByteArrayInputStream(contentWithEmptyLines.getBytes)
+    val contentWithoutEmptyLines = filterEmptyStrings(Source.fromInputStream(inputStream, Codec.UTF8.name))
+
+    contentWithoutEmptyLines.toList should contain only (expectedResult: _*)
+  }
+
   "fetchRawContributors" should "fetch file content from the initial list of project names and file names" in {
 
     val line1 = "foo" -> "bar"
@@ -76,7 +93,7 @@
     val projectSource2 = ProjectContributionSource("p2", newSource(line3b))
 
     val rawContributors = sc.parallelize(Seq(projectSource1, projectSource2))
-      .fetchRawContributors
+      .fetchRawContributors(new GerritConnectivity(None, None))(spark)
       .collect
 
     rawContributors should have size (4)
@@ -90,11 +107,11 @@
 
   it should "fetch file content from the initial list of project names and file names with non-latin chars" in {
     val rawContributors = sc.parallelize(Seq(ProjectContributionSource("p1", newSource("foo2" -> "bar2\u0100"))))
-      .fetchRawContributors
+      .fetchRawContributors(new GerritConnectivity(None, None))
       .collect
 
     rawContributors should have size (1)
-    rawContributors.head._2 should be ("""{"foo2":"bar2\u0100"}""")
+    rawContributors.head._2 should be("""{"foo2":"bar2\u0100"}""")
   }
 
   "transformCommitterInfo" should "transform a DataFrame with project and json to a workable DF with separated columns" in {