Allow kinesis-events to be configured as events-broker

Originally kafka was the only implementation of events-broker, so it
made sense that the local setup of a multi-site environment was
hardcoding kafka as the only events-broker implementation.

With the upcoming of additional implementations however, the user should
be able to specify which flavour of broker implementation should be
installed.

Introduce the ability to install kinesis as opposed to kafka.

Bug: Issue 14274
Change-Id: I9c6be9ffeaca02116cd17759d5951581f3585336
diff --git a/setup_local_env/README.md b/setup_local_env/README.md
index 5b99996..0b1d00c 100644
--- a/setup_local_env/README.md
+++ b/setup_local_env/README.md
@@ -4,7 +4,8 @@
 The environment is composed by:
 
 - 2 gerrit instances deployed by default in /tmp
-- 1 kafka node and 1 zookeeper node
+- 1 zookeeper node
+- 1 Broker node (either kafka or kinesis)
 - 1 HA-PROXY
 
 ## Requirements
@@ -14,15 +15,27 @@
 - wget
 - envsubst
 - haproxy
+- aws-cli (only when broker_type is "kinesis")
 
 ## Examples
 
-Simplest setup with all default values and cleanup previous deployment
+Simplest setup with all default values and cleanup previous deployment. This
+will deploy kafka broker
 
 ```bash
 sh setup_local_env/setup.sh --release-war-file /path/to/gerrit.war --multisite-lib-file /path/to/multi-site.jar
 ```
 
+Deploy Kinesis broker
+
+```bash
+sh setup_local_env/setup.sh \
+    --release-war-file /path/to/gerrit.war \
+    --multisite-lib-file /path/to/multi-site.jar \
+    --broker-type kinesis
+```
+
+
 Cleanup the previous deployments
 
 ```bash
@@ -59,6 +72,8 @@
 [--just-cleanup-env]            Cleans up previous deployment; default false
 
 [--enabled-https]               Enabled https; default true
+
+[--broker_type]                 events broker type; either 'kafka' or 'kinesis'. Default 'kafka'
 ```
 
 ## Limitations
diff --git a/setup_local_env/configs/gerrit.config b/setup_local_env/configs/gerrit.config
index f9eca89..0f19578 100644
--- a/setup_local_env/configs/gerrit.config
+++ b/setup_local_env/configs/gerrit.config
@@ -41,13 +41,20 @@
     directory = $FAKE_NFS
 [plugin "kafka-events"]
     sendAsync = true
-    bootstrapServers = localhost:$KAFKA_PORT
-    groupId = $KAFKA_GROUP_ID
+    bootstrapServers = localhost:$BROKER_PORT
+    groupId = $GROUP_ID
     numberOfSubscribers = 6
     securityProtocol = PLAINTEXT
     pollingIntervalMs = 1000
     enableAutoCommit = true
     autoCommitIntervalMs = 1000
     autoOffsetReset = latest
+[plugin "kinesis-events"]
+    numberOfSubscribers = 6
+    pollingIntervalMs = 1000
+    region = us-east-1
+    endpoint = http://localhost:$BROKER_PORT
+    applicationName = $GROUP_ID
+    initialPosition = trim_horizon
 [plugin "metrics-reporter-prometheus"]
     prometheusBearerToken = token
diff --git a/setup_local_env/docker-compose.yaml b/setup_local_env/docker-compose-core.yaml
similarity index 61%
rename from setup_local_env/docker-compose.yaml
rename to setup_local_env/docker-compose-core.yaml
index c386d46..dab168d 100644
--- a/setup_local_env/docker-compose.yaml
+++ b/setup_local_env/docker-compose-core.yaml
@@ -5,15 +5,8 @@
     ports:
       - "2181:2181"
     container_name: zk_test_node
-  kafka:
-    image: wurstmeister/kafka:2.12-2.1.0
-    ports:
-      - "9092:9092"
-    container_name: kafka_test_node
-    environment:
-      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
   prometheus:
+    container_name: prometheus_test_node
     image: prom/prometheus:v2.16.0
     user: root
     volumes:
diff --git a/setup_local_env/docker-compose-kafka.yaml b/setup_local_env/docker-compose-kafka.yaml
new file mode 100644
index 0000000..8a31502
--- /dev/null
+++ b/setup_local_env/docker-compose-kafka.yaml
@@ -0,0 +1,15 @@
+version: '3'
+services:
+  kafka:
+    image: wurstmeister/kafka:2.12-2.1.0
+    ports:
+      - "9092:9092"
+    container_name: kafka_test_node
+    environment:
+      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+    networks:
+      - setup_local_env_default
+networks:
+  setup_local_env_default:
+    external: true
diff --git a/setup_local_env/docker-compose-kinesis.yaml b/setup_local_env/docker-compose-kinesis.yaml
new file mode 100644
index 0000000..321d760
--- /dev/null
+++ b/setup_local_env/docker-compose-kinesis.yaml
@@ -0,0 +1,16 @@
+version: '3'
+services:
+  kinesis:
+    image: localstack/localstack:0.12.8
+    ports:
+      - "4566:4566"
+      - "4751:4751"
+    container_name: kinesis_test_node
+    environment:
+      SERVICES: dynamodb,cloudwatch,kinesis
+      AWS_REGION: us-east-1
+    networks:
+      - setup_local_env_default
+networks:
+  setup_local_env_default:
+    external: true
\ No newline at end of file
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index f25281c..5c9de6f 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -30,6 +30,9 @@
   type wget >/dev/null 2>&1 || { echo >&2 "Require wget but it's not installed. Aborting."; exit 1; }
   type envsubst >/dev/null 2>&1 || { echo >&2 "Require envsubst but it's not installed. Aborting."; exit 1; }
   type openssl >/dev/null 2>&1 || { echo >&2 "Require openssl but it's not installed. Aborting."; exit 1; }
+  if [ "$BROKER_TYPE" = "kinesis" ];  then
+    type aws >/dev/null 2>&1 || { echo >&2 "Require aws-cli but it's not installed. Aborting."; exit 1; }
+  fi
 }
 
 function get_replication_url {
@@ -67,7 +70,7 @@
     export GERRIT_HOSTNAME=$7
     export REPLICATION_HOSTNAME=$8
     export REMOTE_DEBUG_PORT=$9
-    export KAFKA_GROUP_ID=${10}
+    export GROUP_ID=${10}
     export REPLICATION_URL=$(get_replication_url $REPLICATION_LOCATION_TEST_SITE $REPLICATION_HOSTNAME)
 
     echo "Replacing variables for file $file and copying to $CONFIG_TEST_SITE/$file_name"
@@ -99,10 +102,15 @@
   haproxy -f $HA_PROXY_CONFIG_DIR/haproxy.cfg &
 }
 
-function deploy_config_files {
-  # KAFKA configuration
-  export KAFKA_PORT=9092
+function export_broker_port {
+  if [ "$BROKER_TYPE" = "kinesis" ]; then
+    export BROKER_PORT=4566
+  elif [ "$BROKER_TYPE" =  "kafka" ]; then
+    export BROKER_PORT=9092
+  fi
+}
 
+function deploy_config_files {
   # ZK configuration
   export ZK_PORT=2181
 
@@ -112,21 +120,21 @@
   GERRIT_SITE1_SSHD_PORT=$3
   CONFIG_TEST_SITE_1=$LOCATION_TEST_SITE_1/etc
   GERRIT_SITE1_REMOTE_DEBUG_PORT="5005"
-  GERRIT_SITE1_KAFKA_GROUP_ID="instance-1"
+  GERRIT_SITE1_GROUP_ID="instance-1"
   # SITE 2
   GERRIT_SITE2_HOSTNAME=$4
   GERRIT_SITE2_HTTPD_PORT=$5
   GERRIT_SITE2_SSHD_PORT=$6
   CONFIG_TEST_SITE_2=$LOCATION_TEST_SITE_2/etc
   GERRIT_SITE2_REMOTE_DEBUG_PORT="5006"
-  GERRIT_SITE2_KAFKA_GROUP_ID="instance-2"
+  GERRIT_SITE2_GROUP_ID="instance-2"
 
   # Set config SITE1
-  copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_KAFKA_GROUP_ID
+  copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_GROUP_ID
 
 
   # Set config SITE2
-  copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_KAFKA_GROUP_ID
+  copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_GROUP_ID
 }
 
 function is_docker_desktop {
@@ -142,13 +150,32 @@
   fi
 }
 
+function create_kinesis_streams {
+  for stream in "gerrit_batch_index" "gerrit_cache_eviction" "gerrit_index" "gerrit_list_project" "gerrit_stream" "gerrit_web_session" "gerrit"
+  do
+    create_kinesis_stream $stream
+  done
+}
+
+function create_kinesis_stream {
+  local stream=$1
+
+  echo "[KINESIS] Create stream $stream"
+  until aws --endpoint-url=http://localhost:$BROKER_PORT kinesis create-stream --shard-count 1 --stream-name "$stream"
+  do
+      echo "[KINESIS stream $stream] Creation failed. Retrying in 5 seconds..."
+      sleep 5s
+  done
+}
 
 function cleanup_environment {
   echo "Killing existing HA-PROXY setup"
   kill $(ps -ax | grep haproxy | grep "gerrit_setup/ha-proxy-config" | awk '{print $1}') 2> /dev/null
 
-  echo "Stopping docker containers"
-  docker-compose -f $SCRIPT_DIR/docker-compose.yaml down 2> /dev/null
+  echo "Stopping $BROKER_TYPE docker container"
+  docker-compose -f "${SCRIPT_DIR}/docker-compose-${BROKER_TYPE}.yaml" down 2> /dev/null
+  echo "Stopping core docker containers"
+  docker-compose -f "${SCRIPT_DIR}/docker-compose-core.yaml" down 2> /dev/null
 
   echo "Stopping GERRIT instances"
   $1/bin/gerrit.sh stop 2> /dev/null
@@ -158,8 +185,32 @@
   rm -rf $3 2> /dev/null
 }
 
-function check_if_kafka_is_running {
-  echo $(docker inspect kafka_test_node 2> /dev/null | grep '"Running": true' | wc -l)
+function check_if_container_is_running {
+  local container=$1;
+  echo $(docker inspect "$container" 2> /dev/null | grep '"Running": true' | wc -l)
+}
+
+function ensure_docker_compose_is_up_and_running {
+  local log_label=$1
+  local container_name=$2
+  local docker_compose_file=$3
+
+  local is_container_running=$(check_if_container_is_running "$container_name")
+  if [ "$is_container_running" -lt 1 ];then
+    echo "[$log_label] Starting docker containers"
+    docker-compose -f "${SCRIPT_DIR}/${docker_compose_file}" up -d
+
+    echo "[$log_label] Waiting for docker containers to start..."
+    while [[ $(check_if_container_is_running "$container_name") -lt 1 ]];do sleep 10s; done
+  else
+    echo "[$log_label] Containers already running, nothing to do"
+  fi
+}
+
+function prepare_broker_data {
+  if [ "$BROKER_TYPE" = "kinesis" ]; then
+    create_kinesis_streams
+  fi
 }
 
 while [ $# -ne 0 ]
@@ -194,6 +245,8 @@
     echo
     echo "[--enabled-https]               Enabled https; default true"
     echo
+    echo "[--broker-type]                 events broker type; either 'kafka' or 'kinesis'. Default 'kafka'"
+    echo
     exit 0
   ;;
   "--new-deployment")
@@ -281,6 +334,15 @@
     shift
     shift
   ;;
+ "--broker-type" )
+    BROKER_TYPE=$2
+    shift
+    shift
+    if [ ! "$BROKER_TYPE" = "kafka" ] && [ ! "$BROKER_TYPE" = "kinesis" ]; then
+      echo >&2 "broker type: '$BROKER_TYPE' not valid. Please supply 'kafka' or 'kinesis'. Aborting"
+      exit 1
+    fi
+  ;;
   *     )
     echo "Unknown option argument: $1"
     shift
@@ -309,6 +371,7 @@
 export REPLICATION_DELAY_SEC=${REPLICATION_DELAY_SEC:-"5"}
 export SSH_ADVERTISED_PORT=${SSH_ADVERTISED_PORT:-"29418"}
 HTTPS_ENABLED=${HTTPS_ENABLED:-"false"}
+BROKER_TYPE=${BROKER_TYPE:-"kafka"}
 
 export COMMON_LOCATION=$DEPLOYMENT_LOCATION/gerrit_setup
 LOCATION_TEST_SITE_1=$COMMON_LOCATION/instance-1
@@ -367,10 +430,19 @@
   -O $DEPLOYMENT_LOCATION/events-broker.jar || { echo >&2 "Cannot download events-broker library: Check internet connection. Abort\
 ing"; exit 1; }
 
+if [ "$BROKER_TYPE" = "kafka" ]; then
 echo "Downloading kafka-events plugin $GERRIT_BRANCH"
   wget $GERRIT_CI/plugin-kafka-events-bazel-$GERRIT_BRANCH/$LAST_BUILD/kafka-events/kafka-events.jar \
   -O $DEPLOYMENT_LOCATION/kafka-events.jar || { echo >&2 "Cannot download kafka-events plugin: Check internet connection. Abort\
 ing"; exit 1; }
+fi
+
+if [ "$BROKER_TYPE" = "kinesis" ]; then
+echo "Downloading kinesis-events plugin master (TODO: replace with $GERRIT_BRANCH, once we have build)"
+  wget $GERRIT_CI/plugin-kinesis-events-gh-bazel-master/$LAST_BUILD/kinesis-events/kinesis-events.jar \
+  -O $DEPLOYMENT_LOCATION/kinesis-events.jar || { echo >&2 "Cannot download kinesis-events plugin: Check internet connection. Abort\
+ing"; exit 1; }
+fi
 
 echo "Downloading metrics-reporter-prometheus plugin $GERRIT_BRANCH"
   wget $GERRIT_CI/plugin-metrics-reporter-prometheus-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/metrics-reporter-prometheus/metrics-reporter-prometheus.jar \
@@ -422,8 +494,8 @@
   echo "Copy events broker library"
   cp -f $DEPLOYMENT_LOCATION/events-broker.jar $LOCATION_TEST_SITE_1/lib/events-broker.jar
 
-  echo "Copy kafka events plugin"
-  cp -f $DEPLOYMENT_LOCATION/kafka-events.jar $LOCATION_TEST_SITE_1/plugins/kafka-events.jar
+  echo "Copy $BROKER_TYPE events plugin"
+  cp -f $DEPLOYMENT_LOCATION/$BROKER_TYPE-events.jar $LOCATION_TEST_SITE_1/plugins/$BROKER_TYPE-events.jar
 
   echo "Copy metrics-reporter-prometheus plugin"
   cp -f $DEPLOYMENT_LOCATION/metrics-reporter-prometheus.jar $LOCATION_TEST_SITE_1/plugins/metrics-reporter-prometheus.jar
@@ -455,14 +527,10 @@
 
 cat $SCRIPT_DIR/configs/prometheus.yml | envsubst > $COMMON_LOCATION/prometheus.yml
 
-IS_KAFKA_RUNNING=$(check_if_kafka_is_running)
-if [ $IS_KAFKA_RUNNING -lt 1 ];then
-
-  echo "Starting zk and kafka"
-  docker-compose -f $SCRIPT_DIR/docker-compose.yaml up -d
-  echo "Waiting for kafka to start..."
-  while [[ $(check_if_kafka_is_running) -lt 1 ]];do sleep 10s; done
-fi
+export_broker_port
+ensure_docker_compose_is_up_and_running "core" "prometheus_test_node" "docker-compose-core.yaml"
+ensure_docker_compose_is_up_and_running "$BROKER_TYPE" "${BROKER_TYPE}_test_node" "docker-compose-$BROKER_TYPE.yaml"
+prepare_broker_data
 
 echo "Re-deploying configuration files"
 deploy_config_files $GERRIT_1_HOSTNAME $GERRIT_1_HTTPD_PORT $GERRIT_1_SSHD_PORT $GERRIT_2_HOSTNAME $GERRIT_2_HTTPD_PORT $GERRIT_2_SSHD_PORT