blob: 272bc8cc2f39d0035417dedb65bf9160dc9120ff [file] [log] [blame]
// Copyright (C) 2019 GerritForge Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.gerritforge.analytics.infrastructure
import com.gerritforge.analytics.SparkTestSupport
import com.gerritforge.analytics.support.ops.ElasticsearchTestITSupport
import org.apache.spark.sql.Dataset
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import scala.concurrent.Await
import scala.concurrent.duration._
class ElasticSearchPimpedWriterIT
extends FlatSpec
with Matchers
with BeforeAndAfterAll
with SparkTestSupport
with ElasticsearchTestITSupport {
override lazy val elasticSearchConnPort: Int = esHostHTTPExtPortMapping
import ESSparkWriterImplicits.withAliasSwap
import spark.implicits._
"Saving and reading from same ES alias" must "work while changing indices mapping" in {
val aliasName = "the_alias"
val documentName = "doc"
// Writing into the first index
val dataIntoIndexOne: Dataset[String] = "Content in the first index".split(" ").toList.toDS()
Await.result(dataIntoIndexOne.saveToEsWithAliasSwap(aliasName, documentName).futureAction,
2 seconds)
// Reading from the alias
val resultFromAliasFirst: Dataset[String] =
spark.read.format("es").load(s"$aliasName/$documentName").as[String]
// Written should equal Read
dataIntoIndexOne
.collect()
.toList should contain only (resultFromAliasFirst.collect().toList: _*)
// Writing into the second index
val dataIntoIndexTwo: Dataset[String] = "Content in the second index".split(" ").toList.toDS()
Await.result(dataIntoIndexTwo.saveToEsWithAliasSwap(aliasName, documentName).futureAction,
2 seconds)
// Reading from the alias
val resultFromAliasSecond: Dataset[String] =
spark.read.format("es").load(s"$aliasName/$documentName").as[String]
// Written should equal Read
dataIntoIndexTwo
.collect()
.toList should contain only (resultFromAliasSecond.collect().toList: _*)
}
"Saving data into ES" must "succeed even if alias creation fails" in {
import org.elasticsearch.spark.sql._
val indexWithAliasName = "alias_name"
val documentName = "doc"
val indexWithAliasNameData = List("This", "index", "will", "make", "alias", "creation", "fail")
indexWithAliasNameData.toDS.saveToEs(s"$indexWithAliasName/$documentName")
val indexWithAliasData = List("An", "index", "with", "alias")
val aliasActionResponse: EnrichedAliasActionResponse =
indexWithAliasData.toDS.saveToEsWithAliasSwap(indexWithAliasName, documentName)
val oldIndexData =
spark.read.format("es").load(s"$indexWithAliasName/$documentName").as[String]
val newIndexData =
spark.read.format("es").load(aliasActionResponse.path).as[String]
assertThrows[NoSuchElementException] {
Await.result(aliasActionResponse.futureAction, 2 seconds)
}
newIndexData.collect().toList should contain only (indexWithAliasData: _*)
oldIndexData.collect().toList should contain only (indexWithAliasNameData: _*)
}
}