Add ES integration test support
Use Testcontainers to facilitate the creation
of docker containers.
Feature: Issue 10555
Change-Id: I59e22a505b9de92c91f66499a664e0a0f81424bf
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
index 612d563..158435d 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
@@ -57,6 +57,7 @@
}
import com.gerritforge.analytics.infrastructure.ESSparkWriterImplicits.withAliasSwap
+ import scala.concurrent.ExecutionContext.Implicits.global
spark
.getEventsFromPath(config.eventsPath.get)
.transformEvents(
@@ -67,6 +68,9 @@
TimeRange(config.since, config.until)
)
.saveToEsWithAliasSwap(config.elasticSearchIndex.get, DOCUMENT_TYPE)
+ .futureAction
+ .map(actionRespose => logger.info(s"Completed index swap ${actionRespose}"))
+ .recover { case exception: Exception => logger.info(s"Index swap failed ${exception}") }
case None =>
logger.error("Could not parse command line arguments")
diff --git a/common/src/main/scala/com/gerritforge/analytics/common/api/SparkEsClientProvider.scala b/common/src/main/scala/com/gerritforge/analytics/common/api/SparkEsClientProvider.scala
index a030424..57beb76 100644
--- a/common/src/main/scala/com/gerritforge/analytics/common/api/SparkEsClientProvider.scala
+++ b/common/src/main/scala/com/gerritforge/analytics/common/api/SparkEsClientProvider.scala
@@ -35,4 +35,9 @@
RestClient.builder(new HttpHost(esCfg.getNodes, esCfg.getPort, "http")).build()
lazy val esClient: ElasticClient = ElasticClient.fromRestClient(restClient)
-}
\ No newline at end of file
+
+ def closeElasticsearchClientConn(): Unit = {
+ esClient.close()
+ restClient.close()
+ }
+}
diff --git a/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala b/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala
index 12a6d94..1c208aa 100644
--- a/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala
+++ b/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala
@@ -23,7 +23,8 @@
import org.elasticsearch.spark.sql._
import scala.concurrent.Future
-import scala.util.Try
+
+case class EnrichedAliasActionResponse(futureAction: Future[AliasActionResponse], path: String)
object ESSparkWriterImplicits {
implicit def withAliasSwap[T](data: Dataset[T]): ElasticSearchPimpedWriter[T] =
@@ -36,7 +37,7 @@
with SparkEsClientProvider {
def saveToEsWithAliasSwap(aliasName: String,
- documentType: String): Future[Option[AliasActionResponse]] = {
+ documentType: String): EnrichedAliasActionResponse = {
val newIndexNameWithTime = IndexNameGenerator.timeBasedIndexName(aliasName, Instant.now())
val newPersistencePath = s"$newIndexNameWithTime/$documentType"
@@ -45,35 +46,30 @@
import scala.concurrent.ExecutionContext.Implicits.global
// Save data
- Try(
+ val futureResponse: Future[AliasActionResponse] = try {
data
.toDF()
.saveToEs(newPersistencePath)
- ).map { _ =>
- logger.info(
- s"Successfully stored the data into index $newIndexNameWithTime. Will now update the alias $aliasName")
- val idxSwapResult: Future[Option[AliasActionResponse]] =
- moveAliasToNewIndex(aliasName, newIndexNameWithTime).map { response =>
- response.isSuccess match {
- case true =>
- response.result.success match {
- case true =>
- logger.info("Alias was updated successfully")
- Some(response.result)
- case false =>
- logger.error(
- s"Alias update failed with response result error ${response.result}")
- None
- }
- case false =>
- logger.error(s"Alias update failed with response error ${response.error.`type`}")
- None
- }
- }
- idxSwapResult
+ logger.info(
+ s"Successfully stored the data into index $newIndexNameWithTime. Will now update the alias $aliasName")
+ moveAliasToNewIndex(aliasName, newIndexNameWithTime).flatMap { response =>
+ if (response.isSuccess && response.result.success) {
+ logger.info("Alias was updated successfully")
+ closeElasticsearchClientConn()
+ Future.successful(response.result)
+ } else {
+ closeElasticsearchClientConn()
+ logger.error(s"Alias update failed with response result error ${response.error}")
+ logger.error(s"Alias update failed with ES ACK: ${response.result.acknowledged}")
+ Future.failed(new Exception(s"Index alias $aliasName update failure ${response.error}"))
+ }
}
- .getOrElse(Future(None))
+ } catch {
+ case e: Exception =>
+ Future.failed[AliasActionResponse](e)
+ }
+ EnrichedAliasActionResponse(futureResponse, newPersistencePath)
}
override val esSparkSession: SparkSession = data.sparkSession
diff --git a/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala b/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
index cc28598..dd58538 100644
--- a/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
+++ b/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
@@ -19,8 +19,15 @@
import org.scalatest.{BeforeAndAfterAll, Suite}
trait SparkTestSupport extends BeforeAndAfterAll { this: Suite =>
- implicit val spark: SparkSession = SparkSession
+
+ lazy val elasticSearchConnPort: Int = 9200
+ lazy val elasticSearchConnHost: String = "localhost"
+
+ implicit lazy val spark: SparkSession = SparkSession
.builder()
+ .config("es.nodes.wan.only", "true")
+ .config("es.nodes", elasticSearchConnHost)
+ .config("es.port", elasticSearchConnPort)
.master("local[4]")
.getOrCreate()
diff --git a/common/src/test/scala/com/gerritforge/analytics/infrastructure/ElasticSearchPimpedWriterIT.scala b/common/src/test/scala/com/gerritforge/analytics/infrastructure/ElasticSearchPimpedWriterIT.scala
new file mode 100644
index 0000000..272bc8c
--- /dev/null
+++ b/common/src/test/scala/com/gerritforge/analytics/infrastructure/ElasticSearchPimpedWriterIT.scala
@@ -0,0 +1,90 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.infrastructure
+import com.gerritforge.analytics.SparkTestSupport
+import com.gerritforge.analytics.support.ops.ElasticsearchTestITSupport
+import org.apache.spark.sql.Dataset
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class ElasticSearchPimpedWriterIT
+ extends FlatSpec
+ with Matchers
+ with BeforeAndAfterAll
+ with SparkTestSupport
+ with ElasticsearchTestITSupport {
+
+ override lazy val elasticSearchConnPort: Int = esHostHTTPExtPortMapping
+ import ESSparkWriterImplicits.withAliasSwap
+ import spark.implicits._
+
+ "Saving and reading from same ES alias" must "work while changing indices mapping" in {
+
+ val aliasName = "the_alias"
+ val documentName = "doc"
+
+ // Writing into the first index
+ val dataIntoIndexOne: Dataset[String] = "Content in the first index".split(" ").toList.toDS()
+ Await.result(dataIntoIndexOne.saveToEsWithAliasSwap(aliasName, documentName).futureAction,
+ 2 seconds)
+ // Reading from the alias
+ val resultFromAliasFirst: Dataset[String] =
+ spark.read.format("es").load(s"$aliasName/$documentName").as[String]
+
+ // Written should equal Read
+ dataIntoIndexOne
+ .collect()
+ .toList should contain only (resultFromAliasFirst.collect().toList: _*)
+
+ // Writing into the second index
+ val dataIntoIndexTwo: Dataset[String] = "Content in the second index".split(" ").toList.toDS()
+ Await.result(dataIntoIndexTwo.saveToEsWithAliasSwap(aliasName, documentName).futureAction,
+ 2 seconds)
+ // Reading from the alias
+ val resultFromAliasSecond: Dataset[String] =
+ spark.read.format("es").load(s"$aliasName/$documentName").as[String]
+
+ // Written should equal Read
+ dataIntoIndexTwo
+ .collect()
+ .toList should contain only (resultFromAliasSecond.collect().toList: _*)
+ }
+
+ "Saving data into ES" must "succeed even if alias creation fails" in {
+ import org.elasticsearch.spark.sql._
+ val indexWithAliasName = "alias_name"
+ val documentName = "doc"
+ val indexWithAliasNameData = List("This", "index", "will", "make", "alias", "creation", "fail")
+
+ indexWithAliasNameData.toDS.saveToEs(s"$indexWithAliasName/$documentName")
+
+ val indexWithAliasData = List("An", "index", "with", "alias")
+ val aliasActionResponse: EnrichedAliasActionResponse =
+ indexWithAliasData.toDS.saveToEsWithAliasSwap(indexWithAliasName, documentName)
+
+ val oldIndexData =
+ spark.read.format("es").load(s"$indexWithAliasName/$documentName").as[String]
+ val newIndexData =
+ spark.read.format("es").load(aliasActionResponse.path).as[String]
+
+ assertThrows[NoSuchElementException] {
+ Await.result(aliasActionResponse.futureAction, 2 seconds)
+ }
+ newIndexData.collect().toList should contain only (indexWithAliasData: _*)
+ oldIndexData.collect().toList should contain only (indexWithAliasNameData: _*)
+ }
+}
diff --git a/common/src/test/scala/com/gerritforge/analytics/support/ops/ElasticsearchTestITSupport.scala b/common/src/test/scala/com/gerritforge/analytics/support/ops/ElasticsearchTestITSupport.scala
new file mode 100644
index 0000000..a1213ff
--- /dev/null
+++ b/common/src/test/scala/com/gerritforge/analytics/support/ops/ElasticsearchTestITSupport.scala
@@ -0,0 +1,34 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.support.ops
+import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer}
+import org.scalatest.{BeforeAndAfterAll, Suite}
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy
+
+trait ElasticsearchTestITSupport extends ForAllTestContainer with BeforeAndAfterAll { this: Suite =>
+
+ lazy val elasticsearchHTTPIntPort: Int = 9200
+ lazy val elasticsearchTCPIntPort: Int = 9300
+
+ lazy val esHostHTTPExtPortMapping: Int = container.mappedPort(elasticsearchHTTPIntPort)
+ lazy val esHostTCPExtPortMapping: Int = container.mappedPort(elasticsearchTCPIntPort)
+
+ override lazy val container = GenericContainer(
+ "elasticsearch:6.5.1",
+ waitStrategy = new HostPortWaitStrategy,
+ env = Map("discovery.type" -> "single-node"),
+ exposedPorts = Seq(elasticsearchHTTPIntPort, elasticsearchTCPIntPort)
+ )
+}
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
index 947220c..9b8ad85 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
@@ -14,7 +14,6 @@
package com.gerritforge.analytics.gitcommits.job
-
import java.time.LocalDate
import com.gerritforge.analytics.gitcommits.model.{GerritEndpointConfig, GerritProject, GerritProjectsSupport}
@@ -90,7 +89,6 @@
cliOptionParser.parse(args, GerritEndpointConfig()) match {
case Some(config) =>
-
implicit val _: GerritEndpointConfig = config
logger.info(s"Starting analytics app with config $config")
@@ -132,9 +130,13 @@
}
def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
+ import scala.concurrent.ExecutionContext.Implicits.global
config.elasticIndex.foreach { esIndex =>
import com.gerritforge.analytics.infrastructure.ESSparkWriterImplicits.withAliasSwap
df.saveToEsWithAliasSwap(esIndex, indexType)
+ .futureAction
+ .map(actionRespose => logger.info(s"Completed index swap ${actionRespose}"))
+ .recover { case exception: Exception => logger.info(s"Index swap failed ${exception}") }
}
}
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
index 62d101a..4bb9a97 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
@@ -102,8 +102,12 @@
s"$numRows rows extracted. Posting Elasticsearch at '${config.elasticIndex}/$indexType'")
stdout.flush()
import com.gerritforge.analytics.infrastructure.ESSparkWriterImplicits.withAliasSwap
+ import scala.concurrent.ExecutionContext.Implicits.global
projectStats
.saveToEsWithAliasSwap(esIndex, indexType)
+ .futureAction
+ .map(actionRespose => logger.info(s"Completed index swap ${actionRespose}"))
+ .recover { case exception: Exception => logger.info(s"Index swap failed ${exception}") }
}
val elaspsedTs = (System.currentTimeMillis - startTs) / 1000L
diff --git a/project/SharedSettings.scala b/project/SharedSettings.scala
index cf9ccd3..53dd76f 100644
--- a/project/SharedSettings.scala
+++ b/project/SharedSettings.scala
@@ -43,7 +43,8 @@
"com.typesafe.scala-logging" %% "scala-logging" % scalaLogging,
"com.github.scopt" %% "scopt" % scopt,
"org.scalactic" %% "scalactic" % scalactic % "test",
- "org.scalatest" %% "scalatest" % scalaTest % "test"
+ "org.scalatest" %% "scalatest" % scalaTest % "test",
+ "com.dimafeng" %% "testcontainers-scala" % TestContainersScala % Test
) ++ elastic4s
)
@@ -105,4 +106,5 @@
val scalactic = "3.0.1"
val scalaTest = "3.0.1"
val json4s = "3.2.11"
+ val TestContainersScala = "0.23.0"
}