No downtime when rebuilding ES indices

Introduce elasticsearch aliases to avoid downtime
when rebuilding the actual indices.

Feature: Issue 10555
Change-Id: I43477b9d763b28839f002493e8caf78057a8e756
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
index 0557983..612d563 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/job/Main.scala
@@ -23,7 +23,6 @@
 import com.gerritforge.analytics.common.api.GerritConnectivity
 import com.gerritforge.analytics.spark.SparkApp
 import com.typesafe.scalalogging.LazyLogging
-import org.elasticsearch.spark.sql._
 
 object Main extends SparkApp with App with LazyLogging {
   override val appName = "Gerrit AuditLog Analytics ETL"
@@ -57,6 +56,7 @@
         sys.exit(1)
       }
 
+      import com.gerritforge.analytics.infrastructure.ESSparkWriterImplicits.withAliasSwap
       spark
         .getEventsFromPath(config.eventsPath.get)
         .transformEvents(
@@ -66,11 +66,11 @@
           config.eventsTimeAggregation.get,
           TimeRange(config.since, config.until)
         )
-        .saveToEs(s"${config.elasticSearchIndex.get}/$DOCUMENT_TYPE")
+        .saveToEsWithAliasSwap(config.elasticSearchIndex.get, DOCUMENT_TYPE)
 
     case None =>
       logger.error("Could not parse command line arguments")
       sys.exit(1)
   }
 
-}
\ No newline at end of file
+}
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
index 9b05ce9..242c78a 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/model/ElasticSearchFields.scala
@@ -43,5 +43,5 @@
 
   val ALL_DOCUMENT_FIELDS: List[String] = FACETING_FIELDS :+ NUM_EVENTS_FIELD
 
-  val DOCUMENT_TYPE = "auditlog"
+  val DOCUMENT_TYPE: String = "auditlog"
 }
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
index ab1c8a4..14ca58b 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/AuditLogsTransformer.scala
@@ -40,7 +40,7 @@
       .toJsonString
       .toJsonTableDataFrame
       .hydrateWithUserIdentifierColumn(USER_IDENTIFIER_FIELD, broadcastUserIdentifiers.value)
-      .withTimeBucketColum(TIME_BUCKET_FIELD, timeAggregation)
+      .withTimeBucketColumn(TIME_BUCKET_FIELD, timeAggregation)
       .withCommandColumns(COMMAND_FIELD, COMMAND_ARGS_FIELD)
       .withSubCommandColumns(SUB_COMMAND_FIELD)
       .withUserTypeColumn(USER_TYPE_FIELD, broadcastAdditionalUsersInfo.value)
diff --git a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
index cba1d42..9ae3b0c 100644
--- a/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
+++ b/auditlog/src/main/scala/com/gerritforge/analytics/auditlog/spark/dataframe/ops/DataFrameOps.scala
@@ -38,7 +38,7 @@
 
     }
 
-    def withTimeBucketColum(timeBucketCol: String, timeAggregation: String): DataFrame = {
+    def withTimeBucketColumn(timeBucketCol: String, timeAggregation: String): DataFrame = {
       dataFrame
         .withColumn(timeBucketCol, date_trunc(format=timeAggregation, from_unixtime(col("time_at_start").divide(1000))))
     }
diff --git a/common/src/main/scala/com/gerritforge/analytics/common/api/ElasticSearchAliasOps.scala b/common/src/main/scala/com/gerritforge/analytics/common/api/ElasticSearchAliasOps.scala
new file mode 100644
index 0000000..81bf831
--- /dev/null
+++ b/common/src/main/scala/com/gerritforge/analytics/common/api/ElasticSearchAliasOps.scala
@@ -0,0 +1,63 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.common.api
+import com.sksamuel.elastic4s.Index
+import com.sksamuel.elastic4s.alias.{AddAliasActionRequest, RemoveAliasAction}
+import com.sksamuel.elastic4s.http.ElasticDsl._
+import com.sksamuel.elastic4s.http.index.admin.AliasActionResponse
+import com.sksamuel.elastic4s.http.{ElasticClient, Response}
+import com.typesafe.scalalogging.Logger
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+trait ElasticSearchAliasOps {
+
+  val esClient: ElasticClient
+
+  private val logger = Logger(classOf[ElasticSearchAliasOps])
+
+  def getIndicesFromAlias(aliasName: String): Future[Iterable[Index]] = {
+    logger.info(s"Getting indices from $aliasName")
+
+    esClient
+      .execute(
+        getAliases(aliasName, Seq.empty[String])
+      )
+      .map(_.result.mappings.keys)
+
+  }
+
+  def moveAliasToNewIndex(aliasName: String,
+                          newIndexName: String): Future[Response[AliasActionResponse]] = {
+    val oldIndices: Future[Iterable[Index]] = getIndicesFromAlias(aliasName)
+
+    oldIndices.flatMap { indices =>
+      val removeAliasActions: Iterable[RemoveAliasAction] = indices.map { idxName =>
+        removeAlias(aliasName) on s"${idxName.name}"
+      }
+      val addAliasAction: AddAliasActionRequest = addAlias(aliasName) on newIndexName
+
+      logger.info(
+        s"Replacing old indices (${indices.mkString(",")}) with $newIndexName from alias $aliasName")
+
+      esClient.execute {
+        aliases(
+          removeAliasActions ++ List(addAliasAction)
+        )
+      }
+    }
+  }
+}
diff --git a/common/src/main/scala/com/gerritforge/analytics/common/api/SparkEsClientProvider.scala b/common/src/main/scala/com/gerritforge/analytics/common/api/SparkEsClientProvider.scala
new file mode 100644
index 0000000..a030424
--- /dev/null
+++ b/common/src/main/scala/com/gerritforge/analytics/common/api/SparkEsClientProvider.scala
@@ -0,0 +1,38 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.common.api
+import com.sksamuel.elastic4s.http.ElasticClient
+import org.apache.http.HttpHost
+import org.apache.spark.sql.SparkSession
+import org.elasticsearch.client.RestClient
+import org.elasticsearch.hadoop.cfg.{PropertiesSettings, Settings}
+import org.elasticsearch.spark.cfg.SparkSettingsManager
+
+trait SparkEsClientProvider {
+
+  val esSparkSession: SparkSession
+
+  private lazy val sparkCfg =
+    new SparkSettingsManager()
+      .load(esSparkSession.sqlContext.sparkContext.getConf)
+
+  private lazy val esCfg: Settings = new PropertiesSettings()
+    .load(sparkCfg.save())
+
+  private lazy val restClient: RestClient =
+    RestClient.builder(new HttpHost(esCfg.getNodes, esCfg.getPort, "http")).build()
+
+  lazy val esClient: ElasticClient = ElasticClient.fromRestClient(restClient)
+}
\ No newline at end of file
diff --git a/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala b/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala
new file mode 100644
index 0000000..12a6d94
--- /dev/null
+++ b/common/src/main/scala/com/gerritforge/analytics/infrastructure/esSparkWriter.scala
@@ -0,0 +1,80 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.infrastructure
+import java.time.Instant
+
+import com.gerritforge.analytics.common.api.{ElasticSearchAliasOps, SparkEsClientProvider}
+import com.gerritforge.analytics.support.ops.IndexNameGenerator
+import com.sksamuel.elastic4s.http.index.admin.AliasActionResponse
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.elasticsearch.spark.sql._
+
+import scala.concurrent.Future
+import scala.util.Try
+
+object ESSparkWriterImplicits {
+  implicit def withAliasSwap[T](data: Dataset[T]): ElasticSearchPimpedWriter[T] =
+    new ElasticSearchPimpedWriter[T](data)
+}
+
+class ElasticSearchPimpedWriter[T](data: Dataset[T])
+    extends ElasticSearchAliasOps
+    with LazyLogging
+    with SparkEsClientProvider {
+
+  def saveToEsWithAliasSwap(aliasName: String,
+                            documentType: String): Future[Option[AliasActionResponse]] = {
+    val newIndexNameWithTime = IndexNameGenerator.timeBasedIndexName(aliasName, Instant.now())
+    val newPersistencePath   = s"$newIndexNameWithTime/$documentType"
+
+    logger.info(
+      s"Storing data into $newPersistencePath and swapping alias $aliasName to read from the new index")
+
+    import scala.concurrent.ExecutionContext.Implicits.global
+    // Save data
+    Try(
+      data
+        .toDF()
+        .saveToEs(newPersistencePath)
+    ).map { _ =>
+        logger.info(
+          s"Successfully stored the data into index $newIndexNameWithTime. Will now update the alias $aliasName")
+
+        val idxSwapResult: Future[Option[AliasActionResponse]] =
+          moveAliasToNewIndex(aliasName, newIndexNameWithTime).map { response =>
+            response.isSuccess match {
+              case true =>
+                response.result.success match {
+                  case true =>
+                    logger.info("Alias was updated successfully")
+                    Some(response.result)
+                  case false =>
+                    logger.error(
+                      s"Alias update failed with response result error ${response.result}")
+                    None
+                }
+              case false =>
+                logger.error(s"Alias update failed with response error ${response.error.`type`}")
+                None
+            }
+          }
+        idxSwapResult
+      }
+      .getOrElse(Future(None))
+  }
+
+  override val esSparkSession: SparkSession = data.sparkSession
+}
diff --git a/common/src/main/scala/com/gerritforge/analytics/support/ops/IndexNameGenerator.scala b/common/src/main/scala/com/gerritforge/analytics/support/ops/IndexNameGenerator.scala
new file mode 100644
index 0000000..96fdb0f
--- /dev/null
+++ b/common/src/main/scala/com/gerritforge/analytics/support/ops/IndexNameGenerator.scala
@@ -0,0 +1,28 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.support.ops
+import java.time.{Instant, LocalDateTime, ZoneId}
+
+object IndexNameGenerator {
+  def timeBasedIndexName(indexName: String, instant: Instant): String = {
+    val now: Long = instant.toEpochMilli
+    val dateWithStrFormat: String =
+      LocalDateTime
+        .ofInstant(Instant.ofEpochMilli(now), ZoneId.systemDefault())
+        .format(AnalyticsDateTimeFormatter.yyyy_MM_dd)
+
+    s"${indexName}_${dateWithStrFormat}_$now"
+  }
+}
diff --git a/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala b/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
index 18002ff..cc28598 100644
--- a/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
+++ b/common/src/test/scala/com/gerritforge/analytics/SparkTestSupport.scala
@@ -19,13 +19,13 @@
 import org.scalatest.{BeforeAndAfterAll, Suite}
 
 trait SparkTestSupport extends BeforeAndAfterAll { this: Suite =>
-
-  implicit val spark : SparkSession = SparkSession.builder()
+  implicit val spark: SparkSession = SparkSession
+    .builder()
     .master("local[4]")
     .getOrCreate()
 
   implicit lazy val sc: SparkContext = spark.sparkContext
-  implicit lazy val sql: SQLContext = spark.sqlContext
+  implicit lazy val sql: SQLContext  = spark.sqlContext
 
   override protected def afterAll() = {
     spark.close()
diff --git a/common/src/test/scala/com/gerritforge/analytics/support/ops/IndexNameGeneratorSpec.scala b/common/src/test/scala/com/gerritforge/analytics/support/ops/IndexNameGeneratorSpec.scala
new file mode 100644
index 0000000..542637c
--- /dev/null
+++ b/common/src/test/scala/com/gerritforge/analytics/support/ops/IndexNameGeneratorSpec.scala
@@ -0,0 +1,36 @@
+// Copyright (C) 2019 GerritForge Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.gerritforge.analytics.support.ops
+import java.time.{Instant, LocalDateTime, ZoneOffset}
+
+import org.scalatest.{FlatSpec, Matchers}
+
+class IndexNameGeneratorSpec extends FlatSpec with Matchers {
+
+  "Index name generator" should "return an index name based on current time" in {
+    val instantUTC: Instant =
+      LocalDateTime
+        .of(2019, 1, 1, 12, 0, 0, 0)
+        .atOffset(ZoneOffset.UTC)
+        .toInstant
+
+    val indexName = "index_name"
+
+    val timeBasedIndexName: String = IndexNameGenerator.timeBasedIndexName(indexName, instantUTC)
+
+    val expectedIndexName = s"${indexName}_2019-01-01_${instantUTC.toEpochMilli}"
+    timeBasedIndexName shouldEqual (expectedIndexName)
+  }
+}
\ No newline at end of file
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
index 9df2f9b..947220c 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/job/Main.scala
@@ -132,12 +132,9 @@
   }
 
   def saveES(df: DataFrame)(implicit config: GerritEndpointConfig) {
-    import org.elasticsearch.spark.sql._
     config.elasticIndex.foreach { esIndex =>
-      logger.info(
-        s"ES content created, saving it to elastic search instance at '${config.elasticIndex}/$indexType'")
-
-      df.saveToEs(s"$esIndex/$indexType")
+      import com.gerritforge.analytics.infrastructure.ESSparkWriterImplicits.withAliasSwap
+      df.saveToEsWithAliasSwap(esIndex, indexType)
     }
 
   }
diff --git a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
index 67fcc41..62d101a 100644
--- a/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
+++ b/gitcommits/src/main/scala/com/gerritforge/analytics/gitcommits/plugin/ProcessGitCommitsCommand.scala
@@ -97,12 +97,13 @@
       val projectStats = buildProjectStats().cache()
       val numRows      = projectStats.count()
 
-      import org.elasticsearch.spark.sql._
       config.elasticIndex.foreach { esIndex =>
         stdout.println(
           s"$numRows rows extracted. Posting Elasticsearch at '${config.elasticIndex}/$indexType'")
         stdout.flush()
-        projectStats.saveToEs(s"$esIndex/$indexType")
+        import com.gerritforge.analytics.infrastructure.ESSparkWriterImplicits.withAliasSwap
+        projectStats
+          .saveToEsWithAliasSwap(esIndex, indexType)
       }
 
       val elaspsedTs = (System.currentTimeMillis - startTs) / 1000L
diff --git a/project/SharedSettings.scala b/project/SharedSettings.scala
index 730debc..cf9ccd3 100644
--- a/project/SharedSettings.scala
+++ b/project/SharedSettings.scala
@@ -21,6 +21,10 @@
 import sbtdocker.DockerPlugin.autoImport._
 
 object SharedSettings {
+  val elastic4s = Seq(
+    "com.sksamuel.elastic4s" %% "elastic4s-core"              % Elastic4sVersion,
+    "com.sksamuel.elastic4s" %% "elastic4s-http"              % Elastic4sVersion
+  )
 
   private val dockerRepositoryPrefix = "gerrit-analytics-etl"
 
@@ -40,7 +44,7 @@
       "com.github.scopt"           %% "scopt"                  % scopt,
       "org.scalactic"              %% "scalactic"              % scalactic % "test",
       "org.scalatest"              %% "scalatest"              % scalaTest % "test"
-    )
+    ) ++ elastic4s
   )
 
   def commonDockerSettings(projectName: String): Seq[Def.Setting[_]] = {
@@ -92,6 +96,7 @@
 }
 
 object Versions {
+  val Elastic4sVersion = "6.5.1"
   val sparkVersion = "2.3.3"
   val gerritApiVersion = "2.13.7"
   val esSpark = "6.2.0"