Add ES integration test support

Use Testcontainers to facilitate the creation
of docker containers.

Feature: Issue 10555
Change-Id: I59e22a505b9de92c91f66499a664e0a0f81424bf
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
index 612d563..158435d 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
@@ -57,6 +57,7 @@
       }
 
       import com.gerritforge.analytics.infrastructure.ESSparkWriterImplicits.withAliasSwap
+      import scala.concurrent.ExecutionContext.Implicits.global
       spark
         .getEventsFromPath(config.eventsPath.get)
         .transformEvents(
@@ -67,6 +68,9 @@
           TimeRange(config.since, config.until)
         )
         .saveToEsWithAliasSwap(config.elasticSearchIndex.get, DOCUMENT_TYPE)
+        .futureAction
+        .map(actionRespose => logger.info(s"Completed index swap ${actionRespose}"))
+        .recover { case exception: Exception => logger.info(s"Index swap failed ${exception}") }
 
     case None =>
       logger.error("Could not parse command line arguments")
diff --git a/common/src/main/scala/com/gerritforge/analytics/common/api/SparkEsClientProvider.scala b/common/src/main/scala/com/gerritforge/analytics/common/api/SparkEsClientProvider.scala
index a030424..57beb76 100644
--- a/common/src/main/scala/com/gerritforge/analytics/common/api/SparkEsClientProvider.scala
+++ b/common/src/main/scala/com/gerritforge/analytics/common/api/SparkEsClientProvider.scala
@@ -35,4 +35,9 @@
     RestClient.builder(new HttpHost(esCfg.getNodes, esCfg.getPort, "http")).build()
 
   lazy val esClient: ElasticClient = ElasticClient.fromRestClient(restClient)
-}
\ No newline at end of file
+
+  def closeElasticsearchClientConn(): Unit = {
+    esClient.close()
+    restClient.close()
+  }
+}
diff --git a/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala b/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala
index 12a6d94..1c208aa 100644
--- a/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala
+++ b/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala
@@ -23,7 +23,8 @@
 import org.elasticsearch.spark.sql._
 
 import scala.concurrent.Future
-import scala.util.Try
+
+case class EnrichedAliasActionResponse(futureAction: Future[AliasActionResponse], path: String)
 
 object ESSparkWriterImplicits {
   implicit def withAliasSwap[T](data: Dataset[T]): ElasticSearchPimpedWriter[T] =
@@ -36,7 +37,7 @@
     with SparkEsClientProvider {
 
   def saveToEsWithAliasSwap(aliasName: String,
-                            documentType: String): Future[Option[AliasActionResponse]] = {
+                            documentType: String): EnrichedAliasActionResponse = {
     val newIndexNameWithTime = IndexNameGenerator.timeBasedIndexName(aliasName, Instant.now())
     val newPersistencePath   = s"$newIndexNameWithTime/$documentType"
 
@@ -45,35 +46,30 @@
 
     import scala.concurrent.ExecutionContext.Implicits.global
     // Save data
-    Try(
+    val futureResponse: Future[AliasActionResponse] = try {
       data
         .toDF()
         .saveToEs(newPersistencePath)
-    ).map { _ =>
-        logger.info(
-          s"Successfully stored the data into index $newIndexNameWithTime. Will now update the alias $aliasName")
 
-        val idxSwapResult: Future[Option[AliasActionResponse]] =
-          moveAliasToNewIndex(aliasName, newIndexNameWithTime).map { response =>
-            response.isSuccess match {
-              case true =>
-                response.result.success match {
-                  case true =>
-                    logger.info("Alias was updated successfully")
-                    Some(response.result)
-                  case false =>
-                    logger.error(
-                      s"Alias update failed with response result error ${response.result}")
-                    None
-                }
-              case false =>
-                logger.error(s"Alias update failed with response error ${response.error.`type`}")
-                None
-            }
-          }
-        idxSwapResult
+      logger.info(
+        s"Successfully stored the data into index $newIndexNameWithTime. Will now update the alias $aliasName")
+      moveAliasToNewIndex(aliasName, newIndexNameWithTime).flatMap { response =>
+        if (response.isSuccess && response.result.success) {
+          logger.info("Alias was updated successfully")
+          closeElasticsearchClientConn()
+          Future.successful(response.result)
+        } else {
+          closeElasticsearchClientConn()
+          logger.error(s"Alias update failed with response result error ${response.error}")
+          logger.error(s"Alias update failed with ES ACK: ${response.result.acknowledged}")
+          Future.failed(new Exception(s"Index alias $aliasName update failure ${response.error}"))
+        }
       }
-      .getOrElse(Future(None))
+    } catch {
+      case e: Exception =>
+        Future.failed[AliasActionResponse](e)
+    }
+    EnrichedAliasActionResponse(futureResponse, newPersistencePath)
   }
 
   override val esSparkSession: SparkSession = data.sparkSession
diff --git a/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala b/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
index cc28598..dd58538 100644
--- a/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
+++ b/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
@@ -19,8 +19,15 @@
 import org.scalatest.{BeforeAndAfterAll, Suite}
 
 trait SparkTestSupport extends BeforeAndAfterAll { this: Suite =>
-  implicit val spark: SparkSession = SparkSession
+
+  lazy val elasticSearchConnPort: Int    = 9200
+  lazy val elasticSearchConnHost: String = "localhost"
+
+  implicit lazy val spark: SparkSession = SparkSession
     .builder()
+    .config("es.nodes.wan.only", "true")
+    .config("es.nodes", elasticSearchConnHost)
+    .config("es.port", elasticSearchConnPort)
     .master("local[4]")
     .getOrCreate()
 
diff --git a/common/src/test/scala/com/gerritforge/analytics/infrastructure/ElasticSearchPimpedWriterIT.scala b/common/src/test/scala/com/gerritforge/analytics/infrastructure/ElasticSearchPimpedWriterIT.scala
new file mode 100644
index 0000000..272bc8c
--- /dev/null
+++ b/common/src/test/scala/com/gerritforge/analytics/infrastructure/ElasticSearchPimpedWriterIT.scala
@@ -0,0 +1,90 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.infrastructure
+import com.gerritforge.analytics.SparkTestSupport
+import com.gerritforge.analytics.support.ops.ElasticsearchTestITSupport
+import org.apache.spark.sql.Dataset
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class ElasticSearchPimpedWriterIT
+    extends FlatSpec
+    with Matchers
+    with BeforeAndAfterAll
+    with SparkTestSupport
+    with ElasticsearchTestITSupport {
+
+  override lazy val elasticSearchConnPort: Int = esHostHTTPExtPortMapping
+  import ESSparkWriterImplicits.withAliasSwap
+  import spark.implicits._
+
+  "Saving and reading from same ES alias" must "work while changing indices mapping" in {
+
+    val aliasName    = "the_alias"
+    val documentName = "doc"
+
+    // Writing into the first index
+    val dataIntoIndexOne: Dataset[String] = "Content in the first index".split(" ").toList.toDS()
+    Await.result(dataIntoIndexOne.saveToEsWithAliasSwap(aliasName, documentName).futureAction,
+                 2 seconds)
+    // Reading from the alias
+    val resultFromAliasFirst: Dataset[String] =
+      spark.read.format("es").load(s"$aliasName/$documentName").as[String]
+
+    // Written should equal Read
+    dataIntoIndexOne
+      .collect()
+      .toList should contain only (resultFromAliasFirst.collect().toList: _*)
+
+    // Writing into the second index
+    val dataIntoIndexTwo: Dataset[String] = "Content in the second index".split(" ").toList.toDS()
+    Await.result(dataIntoIndexTwo.saveToEsWithAliasSwap(aliasName, documentName).futureAction,
+                 2 seconds)
+    // Reading from the alias
+    val resultFromAliasSecond: Dataset[String] =
+      spark.read.format("es").load(s"$aliasName/$documentName").as[String]
+
+    // Written should equal Read
+    dataIntoIndexTwo
+      .collect()
+      .toList should contain only (resultFromAliasSecond.collect().toList: _*)
+  }
+
+  "Saving data into ES" must "succeed even if alias creation fails" in {
+    import org.elasticsearch.spark.sql._
+    val indexWithAliasName     = "alias_name"
+    val documentName           = "doc"
+    val indexWithAliasNameData = List("This", "index", "will", "make", "alias", "creation", "fail")
+
+    indexWithAliasNameData.toDS.saveToEs(s"$indexWithAliasName/$documentName")
+
+    val indexWithAliasData = List("An", "index", "with", "alias")
+    val aliasActionResponse: EnrichedAliasActionResponse =
+      indexWithAliasData.toDS.saveToEsWithAliasSwap(indexWithAliasName, documentName)
+
+    val oldIndexData =
+      spark.read.format("es").load(s"$indexWithAliasName/$documentName").as[String]
+    val newIndexData =
+      spark.read.format("es").load(aliasActionResponse.path).as[String]
+
+    assertThrows[NoSuchElementException] {
+      Await.result(aliasActionResponse.futureAction, 2 seconds)
+    }
+    newIndexData.collect().toList should contain only (indexWithAliasData: _*)
+    oldIndexData.collect().toList should contain only (indexWithAliasNameData: _*)
+  }
+}
diff --git a/common/src/test/scala/com/gerritforge/analytics/support/ops/ElasticsearchTestITSupport.scala b/common/src/test/scala/com/gerritforge/analytics/support/ops/ElasticsearchTestITSupport.scala
new file mode 100644
index 0000000..a1213ff
--- /dev/null
+++ b/common/src/test/scala/com/gerritforge/analytics/support/ops/ElasticsearchTestITSupport.scala
@@ -0,0 +1,34 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.support.ops
+import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer}
+import org.scalatest.{BeforeAndAfterAll, Suite}
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy
+
+trait ElasticsearchTestITSupport extends ForAllTestContainer with BeforeAndAfterAll { this: Suite =>
+
+  lazy val elasticsearchHTTPIntPort: Int = 9200
+  lazy val elasticsearchTCPIntPort: Int  = 9300
+
+  lazy val esHostHTTPExtPortMapping: Int = container.mappedPort(elasticsearchHTTPIntPort)
+  lazy val esHostTCPExtPortMapping: Int  = container.mappedPort(elasticsearchTCPIntPort)
+
+  override lazy val container = GenericContainer(
+    "elasticsearch:6.5.1",
+    waitStrategy = new HostPortWaitStrategy,
+    env = Map("discovery.type" -> "single-node"),
+    exposedPorts = Seq(elasticsearchHTTPIntPort, elasticsearchTCPIntPort)
+  )
+}
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
index 947220c..9b8ad85 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
@@ -14,7 +14,6 @@
 
 package com.gerritforge.analytics.gitcommits.job
 
-
 import java.time.LocalDate
 
 import com.gerritforge.analytics.gitcommits.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
@@ -90,7 +89,6 @@
 
   cliOptionParser.parse(args, GerritEndpointConfig()) match {
     case Some(config) =>
-
       implicit val _: GerritEndpointConfig = config
 
       logger.info(s"Starting analytics app with config $config")
@@ -132,9 +130,13 @@
   }
 
   def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
+    import scala.concurrent.ExecutionContext.Implicits.global
     config.elasticIndex.foreach { esIndex =>
       import com.gerritforge.analytics.infrastructure.ESSparkWriterImplicits.withAliasSwap
       df.saveToEsWithAliasSwap(esIndex, indexType)
+        .futureAction
+        .map(actionRespose => logger.info(s"Completed index swap ${actionRespose}"))
+        .recover { case exception: Exception => logger.info(s"Index swap failed ${exception}") }
     }
 
   }
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
index 62d101a..4bb9a97 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
@@ -102,8 +102,12 @@
           s"$numRows rows extracted. Posting Elasticsearch at '${config.elasticIndex}/$indexType'")
         stdout.flush()
         import com.gerritforge.analytics.infrastructure.ESSparkWriterImplicits.withAliasSwap
+        import scala.concurrent.ExecutionContext.Implicits.global
         projectStats
           .saveToEsWithAliasSwap(esIndex, indexType)
+          .futureAction
+          .map(actionRespose => logger.info(s"Completed index swap ${actionRespose}"))
+          .recover { case exception: Exception => logger.info(s"Index swap failed ${exception}") }
       }
 
       val elaspsedTs = (System.currentTimeMillis - startTs) / 1000L
diff --git a/project/SharedSettings.scala b/project/SharedSettings.scala
index cf9ccd3..53dd76f 100644
--- a/project/SharedSettings.scala
+++ b/project/SharedSettings.scala
@@ -43,7 +43,8 @@
       "com.typesafe.scala-logging" %% "scala-logging"          % scalaLogging,
       "com.github.scopt"           %% "scopt"                  % scopt,
       "org.scalactic"              %% "scalactic"              % scalactic % "test",
-      "org.scalatest"              %% "scalatest"              % scalaTest % "test"
+      "org.scalatest"              %% "scalatest"              % scalaTest % "test",
+      "com.dimafeng"               %% "testcontainers-scala"   % TestContainersScala % Test
     ) ++ elastic4s
   )
 
@@ -105,4 +106,5 @@
   val scalactic = "3.0.1"
   val scalaTest = "3.0.1"
   val json4s = "3.2.11"
+  val TestContainersScala = "0.23.0"
 }