blob: 1c208aa52dc536704d6fb9a1e42530cca7c3960f [file] [log] [blame]
// Copyright (C) 2019 GerritForge Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.gerritforge.analytics.infrastructure
import java.time.Instant
import com.gerritforge.analytics.common.api.{ElasticSearchAliasOps, SparkEsClientProvider}
import com.gerritforge.analytics.support.ops.IndexNameGenerator
import com.sksamuel.elastic4s.http.index.admin.AliasActionResponse
import com.typesafe.scalalogging.LazyLogging
import org.apache.spark.sql.{Dataset, SparkSession}
import org.elasticsearch.spark.sql._
import scala.concurrent.Future
case class EnrichedAliasActionResponse(futureAction: Future[AliasActionResponse], path: String)
object ESSparkWriterImplicits {
implicit def withAliasSwap[T](data: Dataset[T]): ElasticSearchPimpedWriter[T] =
new ElasticSearchPimpedWriter[T](data)
}
class ElasticSearchPimpedWriter[T](data: Dataset[T])
extends ElasticSearchAliasOps
with LazyLogging
with SparkEsClientProvider {
def saveToEsWithAliasSwap(aliasName: String,
documentType: String): EnrichedAliasActionResponse = {
val newIndexNameWithTime = IndexNameGenerator.timeBasedIndexName(aliasName, Instant.now())
val newPersistencePath = s"$newIndexNameWithTime/$documentType"
logger.info(
s"Storing data into $newPersistencePath and swapping alias $aliasName to read from the new index")
import scala.concurrent.ExecutionContext.Implicits.global
// Save data
val futureResponse: Future[AliasActionResponse] = try {
data
.toDF()
.saveToEs(newPersistencePath)
logger.info(
s"Successfully stored the data into index $newIndexNameWithTime. Will now update the alias $aliasName")
moveAliasToNewIndex(aliasName, newIndexNameWithTime).flatMap { response =>
if (response.isSuccess && response.result.success) {
logger.info("Alias was updated successfully")
closeElasticsearchClientConn()
Future.successful(response.result)
} else {
closeElasticsearchClientConn()
logger.error(s"Alias update failed with response result error ${response.error}")
logger.error(s"Alias update failed with ES ACK: ${response.result.acknowledged}")
Future.failed(new Exception(s"Index alias $aliasName update failure ${response.error}"))
}
}
} catch {
case e: Exception =>
Future.failed[AliasActionResponse](e)
}
EnrichedAliasActionResponse(futureResponse, newPersistencePath)
}
override val esSparkSession: SparkSession = data.sparkSession
}