No downtime when rebuilding ES indices
Introduce elasticsearch aliases to avoid downtime
when rebuilding the actual indices.
Feature: Issue 10555
Change-Id: I43477b9d763b28839f002493e8caf78057a8e756
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
index 0557983..612d563 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
@@ -23,7 +23,6 @@
import com.gerritforge.analytics.common.api.GerritConnectivity
import com.gerritforge.analytics.spark.SparkApp
import com.typesafe.scalalogging.LazyLogging
-import org.elasticsearch.spark.sql._
object Main extends SparkApp with App with LazyLogging {
override val appName = "Gerrit AuditLog Analytics ETL"
@@ -57,6 +56,7 @@
sys.exit(1)
}
+ import com.gerritforge.analytics.infrastructure.ESSparkWriterImplicits.withAliasSwap
spark
.getEventsFromPath(config.eventsPath.get)
.transformEvents(
@@ -66,11 +66,11 @@
config.eventsTimeAggregation.get,
TimeRange(config.since, config.until)
)
- .saveToEs(s"${config.elasticSearchIndex.get}/$DOCUMENT_TYPE")
+ .saveToEsWithAliasSwap(config.elasticSearchIndex.get, DOCUMENT_TYPE)
case None =>
logger.error("Could not parse command line arguments")
sys.exit(1)
}
-}
\ No newline at end of file
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
index 9b05ce9..242c78a 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
@@ -43,5 +43,5 @@
val ALL_DOCUMENT_FIELDS: List[String] = FACETING_FIELDS :+ NUM_EVENTS_FIELD
- val DOCUMENT_TYPE = "auditlog"
+ val DOCUMENT_TYPE: String = "auditlog"
}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
index ab1c8a4..14ca58b 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
@@ -40,7 +40,7 @@
.toJsonString
.toJsonTableDataFrame
.hydrateWithUserIdentifierColumn(USER_IDENTIFIER_FIELD, broadcastUserIdentifiers.value)
- .withTimeBucketColum(TIME_BUCKET_FIELD, timeAggregation)
+ .withTimeBucketColumn(TIME_BUCKET_FIELD, timeAggregation)
.withCommandColumns(COMMAND_FIELD, COMMAND_ARGS_FIELD)
.withSubCommandColumns(SUB_COMMAND_FIELD)
.withUserTypeColumn(USER_TYPE_FIELD, broadcastAdditionalUsersInfo.value)
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
index cba1d42..9ae3b0c 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
@@ -38,7 +38,7 @@
}
- def withTimeBucketColum(timeBucketCol: String, timeAggregation: String): DataFrame = {
+ def withTimeBucketColumn(timeBucketCol: String, timeAggregation: String): DataFrame = {
dataFrame
.withColumn(timeBucketCol, date_trunc(format=timeAggregation, from_unixtime(col("time_at_start").divide(1000))))
}
diff --git a/common/src/main/scala/com/gerritforge/analytics/common/api/ElasticSearchAliasOps.scala b/common/src/main/scala/com/gerritforge/analytics/common/api/ElasticSearchAliasOps.scala
new file mode 100644
index 0000000..81bf831
--- /dev/null
+++ b/common/src/main/scala/com/gerritforge/analytics/common/api/ElasticSearchAliasOps.scala
@@ -0,0 +1,63 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.common.api
+import com.sksamuel.elastic4s.Index
+import com.sksamuel.elastic4s.alias.{AddAliasActionRequest, RemoveAliasAction}
+import com.sksamuel.elastic4s.http.ElasticDsl._
+import com.sksamuel.elastic4s.http.index.admin.AliasActionResponse
+import com.sksamuel.elastic4s.http.{ElasticClient, Response}
+import com.typesafe.scalalogging.Logger
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+trait ElasticSearchAliasOps {
+
+ val esClient: ElasticClient
+
+ private val logger = Logger(classOf[ElasticSearchAliasOps])
+
+ def getIndicesFromAlias(aliasName: String): Future[Iterable[Index]] = {
+ logger.info(s"Getting indices from $aliasName")
+
+ esClient
+ .execute(
+ getAliases(aliasName, Seq.empty[String])
+ )
+ .map(_.result.mappings.keys)
+
+ }
+
+ def moveAliasToNewIndex(aliasName: String,
+ newIndexName: String): Future[Response[AliasActionResponse]] = {
+ val oldIndices: Future[Iterable[Index]] = getIndicesFromAlias(aliasName)
+
+ oldIndices.flatMap { indices =>
+ val removeAliasActions: Iterable[RemoveAliasAction] = indices.map { idxName =>
+ removeAlias(aliasName) on s"${idxName.name}"
+ }
+ val addAliasAction: AddAliasActionRequest = addAlias(aliasName) on newIndexName
+
+ logger.info(
+ s"Replacing old indices (${indices.mkString(",")}) with $newIndexName from alias $aliasName")
+
+ esClient.execute {
+ aliases(
+ removeAliasActions ++ List(addAliasAction)
+ )
+ }
+ }
+ }
+}
diff --git a/common/src/main/scala/com/gerritforge/analytics/common/api/SparkEsClientProvider.scala b/common/src/main/scala/com/gerritforge/analytics/common/api/SparkEsClientProvider.scala
new file mode 100644
index 0000000..a030424
--- /dev/null
+++ b/common/src/main/scala/com/gerritforge/analytics/common/api/SparkEsClientProvider.scala
@@ -0,0 +1,38 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.common.api
+import com.sksamuel.elastic4s.http.ElasticClient
+import org.apache.http.HttpHost
+import org.apache.spark.sql.SparkSession
+import org.elasticsearch.client.RestClient
+import org.elasticsearch.hadoop.cfg.{PropertiesSettings, Settings}
+import org.elasticsearch.spark.cfg.SparkSettingsManager
+
+trait SparkEsClientProvider {
+
+ val esSparkSession: SparkSession
+
+ private lazy val sparkCfg =
+ new SparkSettingsManager()
+ .load(esSparkSession.sqlContext.sparkContext.getConf)
+
+ private lazy val esCfg: Settings = new PropertiesSettings()
+ .load(sparkCfg.save())
+
+ private lazy val restClient: RestClient =
+ RestClient.builder(new HttpHost(esCfg.getNodes, esCfg.getPort, "http")).build()
+
+ lazy val esClient: ElasticClient = ElasticClient.fromRestClient(restClient)
+}
\ No newline at end of file
diff --git a/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala b/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala
new file mode 100644
index 0000000..12a6d94
--- /dev/null
+++ b/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala
@@ -0,0 +1,80 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.infrastructure
+import java.time.Instant
+
+import com.gerritforge.analytics.common.api.{ElasticSearchAliasOps, SparkEsClientProvider}
+import com.gerritforge.analytics.support.ops.IndexNameGenerator
+import com.sksamuel.elastic4s.http.index.admin.AliasActionResponse
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.elasticsearch.spark.sql._
+
+import scala.concurrent.Future
+import scala.util.Try
+
+object ESSparkWriterImplicits {
+ implicit def withAliasSwap[T](data: Dataset[T]): ElasticSearchPimpedWriter[T] =
+ new ElasticSearchPimpedWriter[T](data)
+}
+
+class ElasticSearchPimpedWriter[T](data: Dataset[T])
+ extends ElasticSearchAliasOps
+ with LazyLogging
+ with SparkEsClientProvider {
+
+ def saveToEsWithAliasSwap(aliasName: String,
+ documentType: String): Future[Option[AliasActionResponse]] = {
+ val newIndexNameWithTime = IndexNameGenerator.timeBasedIndexName(aliasName, Instant.now())
+ val newPersistencePath = s"$newIndexNameWithTime/$documentType"
+
+ logger.info(
+ s"Storing data into $newPersistencePath and swapping alias $aliasName to read from the new index")
+
+ import scala.concurrent.ExecutionContext.Implicits.global
+ // Save data
+ Try(
+ data
+ .toDF()
+ .saveToEs(newPersistencePath)
+ ).map { _ =>
+ logger.info(
+ s"Successfully stored the data into index $newIndexNameWithTime. Will now update the alias $aliasName")
+
+ val idxSwapResult: Future[Option[AliasActionResponse]] =
+ moveAliasToNewIndex(aliasName, newIndexNameWithTime).map { response =>
+ response.isSuccess match {
+ case true =>
+ response.result.success match {
+ case true =>
+ logger.info("Alias was updated successfully")
+ Some(response.result)
+ case false =>
+ logger.error(
+ s"Alias update failed with response result error ${response.result}")
+ None
+ }
+ case false =>
+ logger.error(s"Alias update failed with response error ${response.error.`type`}")
+ None
+ }
+ }
+ idxSwapResult
+ }
+ .getOrElse(Future(None))
+ }
+
+ override val esSparkSession: SparkSession = data.sparkSession
+}
diff --git a/common/src/main/scala/com/gerritforge/analytics/support/ops/IndexNameGenerator.scala b/common/src/main/scala/com/gerritforge/analytics/support/ops/IndexNameGenerator.scala
new file mode 100644
index 0000000..96fdb0f
--- /dev/null
+++ b/common/src/main/scala/com/gerritforge/analytics/support/ops/IndexNameGenerator.scala
@@ -0,0 +1,28 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.support.ops
+import java.time.{Instant, LocalDateTime, ZoneId}
+
+object IndexNameGenerator {
+ def timeBasedIndexName(indexName: String, instant: Instant): String = {
+ val now: Long = instant.toEpochMilli
+ val dateWithStrFormat: String =
+ LocalDateTime
+ .ofInstant(Instant.ofEpochMilli(now), ZoneId.systemDefault())
+ .format(AnalyticsDateTimeFormatter.yyyy_MM_dd)
+
+ s"${indexName}_${dateWithStrFormat}_$now"
+ }
+}
diff --git a/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala b/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
index 18002ff..cc28598 100644
--- a/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
+++ b/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
@@ -19,13 +19,13 @@
import org.scalatest.{BeforeAndAfterAll, Suite}
trait SparkTestSupport extends BeforeAndAfterAll { this: Suite =>
-
- implicit val spark : SparkSession = SparkSession.builder()
+ implicit val spark: SparkSession = SparkSession
+ .builder()
.master("local[4]")
.getOrCreate()
implicit lazy val sc: SparkContext = spark.sparkContext
- implicit lazy val sql: SQLContext = spark.sqlContext
+ implicit lazy val sql: SQLContext = spark.sqlContext
override protected def afterAll() = {
spark.close()
diff --git a/common/src/test/scala/com/gerritforge/analytics/support/ops/IndexNameGeneratorSpec.scala b/common/src/test/scala/com/gerritforge/analytics/support/ops/IndexNameGeneratorSpec.scala
new file mode 100644
index 0000000..542637c
--- /dev/null
+++ b/common/src/test/scala/com/gerritforge/analytics/support/ops/IndexNameGeneratorSpec.scala
@@ -0,0 +1,36 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.support.ops
+import java.time.{Instant, LocalDateTime, ZoneOffset}
+
+import org.scalatest.{FlatSpec, Matchers}
+
+class IndexNameGeneratorSpec extends FlatSpec with Matchers {
+
+ "Index name generator" should "return an index name based on current time" in {
+ val instantUTC: Instant =
+ LocalDateTime
+ .of(2019, 1, 1, 12, 0, 0, 0)
+ .atOffset(ZoneOffset.UTC)
+ .toInstant
+
+ val indexName = "index_name"
+
+ val timeBasedIndexName: String = IndexNameGenerator.timeBasedIndexName(indexName, instantUTC)
+
+ val expectedIndexName = s"${indexName}_2019-01-01_${instantUTC.toEpochMilli}"
+ timeBasedIndexName shouldEqual (expectedIndexName)
+ }
+}
\ No newline at end of file
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
index 9df2f9b..947220c 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
@@ -132,12 +132,9 @@
}
def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
- import org.elasticsearch.spark.sql._
config.elasticIndex.foreach { esIndex =>
- logger.info(
- s"ES content created, saving it to elastic search instance at '${config.elasticIndex}/$indexType'")
-
- df.saveToEs(s"$esIndex/$indexType")
+ import com.gerritforge.analytics.infrastructure.ESSparkWriterImplicits.withAliasSwap
+ df.saveToEsWithAliasSwap(esIndex, indexType)
}
}
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
index 67fcc41..62d101a 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
@@ -97,12 +97,13 @@
val projectStats = buildProjectStats().cache()
val numRows = projectStats.count()
- import org.elasticsearch.spark.sql._
config.elasticIndex.foreach { esIndex =>
stdout.println(
s"$numRows rows extracted. Posting Elasticsearch at '${config.elasticIndex}/$indexType'")
stdout.flush()
- projectStats.saveToEs(s"$esIndex/$indexType")
+ import com.gerritforge.analytics.infrastructure.ESSparkWriterImplicits.withAliasSwap
+ projectStats
+ .saveToEsWithAliasSwap(esIndex, indexType)
}
val elaspsedTs = (System.currentTimeMillis - startTs) / 1000L
diff --git a/project/SharedSettings.scala b/project/SharedSettings.scala
index 730debc..cf9ccd3 100644
--- a/project/SharedSettings.scala
+++ b/project/SharedSettings.scala
@@ -21,6 +21,10 @@
import sbtdocker.DockerPlugin.autoImport._
object SharedSettings {
+ val elastic4s = Seq(
+ "com.sksamuel.elastic4s" %% "elastic4s-core" % Elastic4sVersion,
+ "com.sksamuel.elastic4s" %% "elastic4s-http" % Elastic4sVersion
+ )
private val dockerRepositoryPrefix = "gerrit-analytics-etl"
@@ -40,7 +44,7 @@
"com.github.scopt" %% "scopt" % scopt,
"org.scalactic" %% "scalactic" % scalactic % "test",
"org.scalatest" %% "scalatest" % scalaTest % "test"
- )
+ ) ++ elastic4s
)
def commonDockerSettings(projectName: String): Seq[Def.Setting[_]] = {
@@ -92,6 +96,7 @@
}
object Versions {
+ val Elastic4sVersion = "6.5.1"
val sparkVersion = "2.3.3"
val gerritApiVersion = "2.13.7"
val esSpark = "6.2.0"